diff --git a/1 b/1 index be63906..c3726d5 100644 --- a/1 +++ b/1 @@ -1,496 +1,18 @@ +const Session = @import("discord.zig"); const std = @import("std"); -const json = std.json; -const mem = std.mem; -const http = std.http; -const ws = @import("ws"); -const builtin = @import("builtin"); -const HttpClient = @import("tls12"); -const net = std.net; -const crypto = std.crypto; -const tls = std.crypto.tls; -//const TlsClient = @import("tls12").TlsClient; -//const Certificate = @import("tls12").Certificate; -// todo use this to read compressed messages -const zlib = @import("zlib"); -const Opcode = enum(u4) { - Dispatch = 0, - Heartbeat = 1, - Identify = 2, - PresenceUpdate = 3, - VoiceStateUpdate = 4, - Resume = 6, - Reconnect = 7, - RequestGuildMember = 8, - InvalidSession = 9, - Hello = 10, - HeartbeatACK = 11, -}; - -const ShardSocketCloseCodes = enum(u16) { - Shutdown = 3000, - ZombiedConnection = 3010, -}; - -const BASE_URL = "https://discord.com/api/v10"; - -pub const Intents = packed struct { - guilds: bool = false, - guild_members: bool = false, - guild_bans: bool = false, - guild_emojis: bool = false, - guild_integrations: bool = false, - guild_webhooks: bool = false, - guild_invites: bool = false, - guild_voice_states: bool = false, - - guild_presences: bool = false, - guild_messages: bool = false, - guild_message_reactions: bool = false, - guild_message_typing: bool = false, - direct_messages: bool = false, - direct_message_reactions: bool = false, - direct_message_typing: bool = false, - message_content: bool = false, - - guild_scheduled_events: bool = false, - _pad: u3 = 0, - auto_moderation_configuration: bool = false, - auto_moderation_execution: bool = false, - _pad2: u2 = 0, - - _pad3: u8 = 0, - - pub fn toRaw(self: Intents) u32 { - return @as(u32, @bitCast(self)); - } - - pub fn fromRaw(raw: u32) Intents { - return @as(Intents, @bitCast(raw)); - } - - pub fn jsonStringify(self: Intents, options: std.json.StringifyOptions, writer: anytype) !void { - _ = options; - try writer.print("{}", .{self.toRaw()}); - } -}; +const TOKEN = "Bot MTI5ODgzOTgzMDY3OTEzMDE4OA.GNojts.iyblGKK0xTWU57QCG5n3hr2Be1whyylTGr44P0"; pub fn main() !void { - const TOKEN = "Bot MTI5ODgzOTgzMDY3OTEzMDE4OA.GNojts.iyblGKK0xTWU57QCG5n3hr2Be1whyylTGr44P0"; var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer if (gpa.deinit() == .leak) { std.log.warn("Has leaked\n", .{}); }; const alloc = gpa.allocator(); - var handler = try Handler.init(alloc, .{ .token = TOKEN, .intents = Intents.fromRaw(513) }); - //errdefer handler.deinit(); + var handler = try Session.init(alloc, .{ .token = TOKEN, .intents = Session.Intents.fromRaw(513) }); + errdefer handler.deinit(); - // try handler.readMessage(); - const thread = try std.Thread.spawn(.{}, Handler.readMessage, .{&handler}); - thread.detach(); - //try handler.identify(); + const t = try std.Thread.spawn(.{}, Session.readMessage, .{&handler}); + defer t.join(); } - -const HeartbeatHandler = struct { - gateway: *Handler, - heartbeatInterval: u64, - lastHeartbeatAck: bool, - /// useful for calculating ping - lastBeat: u64 = 0, - mutex: std.Thread.Mutex = .{}, - thread: ?std.Thread, - - pub fn init(gateway: *Handler, interval: u64) !HeartbeatHandler { - var gateway_mut = gateway.*; - return .{ - .gateway = &gateway_mut, - .heartbeatInterval = interval, - .lastHeartbeatAck = false, - .lastBeat = 0, - .thread = null, - }; - } - - pub fn deinit(self: *HeartbeatHandler) void { - _ = self; - } - - pub fn run(self: *HeartbeatHandler) !void { - while (true) { - std.time.sleep(self.heartbeatInterval); - try self.gateway.heartbeat(false); - } - } - - pub fn loop(self: *HeartbeatHandler) !void { - //_ = self; - std.debug.print("start loop\n", .{}); - var self_mut = self.*; - self.thread = std.Thread.spawn(.{}, HeartbeatHandler.run, .{&self_mut}) catch null; - } - - pub fn readAck(self: *HeartbeatHandler) bool { - self.mutex.lock(); - defer self.mutex.lock(); - - return self.lastHeartbeatAck; - } - - pub fn updateAck(self: *HeartbeatHandler, ack: bool) void { - self.mutex.lock(); - defer self.mutex.lock(); - - self.lastHeartbeatAck = ack; - } -}; - -const FetchReq = struct { - allocator: mem.Allocator, - token: []const u8, - client: HttpClient, - body: std.ArrayList(u8), - - pub fn init(allocator: mem.Allocator, token: []const u8) FetchReq { - const client = HttpClient{ .allocator = allocator }; - return FetchReq{ - .allocator = allocator, - .client = client, - .body = std.ArrayList(u8).init(allocator), - .token = token, - }; - } - - pub fn deinit(self: *FetchReq) void { - self.client.deinit(); - self.body.deinit(); - } - - pub fn makeRequest(self: *FetchReq, method: http.Method, path: []const u8, body: ?[]const u8) !HttpClient.FetchResult { - var fetch_options = HttpClient.FetchOptions{ - .location = HttpClient.FetchOptions.Location{ - .url = path, - }, - .extra_headers = &[_]http.Header{ - http.Header{ .name = "Accept", .value = "application/json" }, - http.Header{ .name = "Content-Type", .value = "application/json" }, - http.Header{ .name = "Authorization", .value = self.token }, - }, - .method = method, - .response_storage = .{ .dynamic = &self.body }, - }; - - if (body != null) { - fetch_options.payload = body; - } - - const res = try self.client.fetch(fetch_options); - return res; - } -}; - -pub const Handler = struct { - /// - /// https://discord.com/developers/docs/topics/gateway#get-gateway - /// - const GatewayInfo = struct { - /// The WSS URL that can be used for connecting to the gateway - url: []const u8, - }; - - /// - /// https://discord.com/developers/docs/events/gateway#session-start-limit-object - /// - const GatewaySessionStartLimit = struct { - /// Total number of session starts the current user is allowed - total: u32, - /// Remaining number of session starts the current user is allowed - remaining: u32, - /// Number of milliseconds after which the limit resets - reset_after: u32, - /// Number of identify requests allowed per 5 seconds - max_concurrency: u32, - }; - - /// - /// https://discord.com/developers/docs/topics/gateway#get-gateway-bot - /// - const GatewayBotInfo = struct { - url: []const u8, - /// - /// The recommended number of shards to use when connecting - /// - /// See https://discord.com/developers/docs/topics/gateway#sharding - /// - shards: u32, - /// - /// Information on the current session start limit - /// - /// See https://discord.com/developers/docs/topics/gateway#session-start-limit-object - /// - session_start_limit: ?GatewaySessionStartLimit, - }; - - const IdentifyProperties = struct { - /// - /// Operating system the shard runs on. - /// - os: []const u8, - /// - /// The "browser" where this shard is running on. - /// - browser: []const u8, - /// - /// The device on which the shard is running. - /// - device: []const u8, - }; - - const _default_properties = IdentifyProperties{ - .os = @tagName(builtin.os.tag), - .browser = "seyfert", - .device = "seyfert", - }; - - client: ws.Client, - token: []const u8, - intents: Intents, - session_id: ?[]const u8, - sequence: ?u16, - heartbeater: ?HeartbeatHandler, - allocator: mem.Allocator, - resume_gateway_url: ?[]const u8 = null, - info: GatewayBotInfo, - - pub fn resume_conn(self: *Handler, out: anytype) !void { - const data = .{ .op = @intFromEnum(Opcode.Resume), .d = .{ - .token = self.token, - .session_id = self.session_id, - .seq = self.sequence, - } }; - - try json.stringify(data, .{}, out); - - try self.client.write(&out); - } - - inline fn gateway_url(self: ?*Handler) []const u8 { - // wtf is this? - if (self) |s| { - return s.resume_gateway_url orelse s.info.url; - } - - return "wss://gateway.discord.gg"; - } - - // identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps - fn identify(self: *Handler) !void { - std.debug.print("identifying now...\n", .{}); - const data = .{ - .op = @intFromEnum(Opcode.Identify), - .d = .{ - //.compress = false, - .intents = self.intents.toRaw(), - .properties = Handler._default_properties, - .token = self.token, - }, - }; - - var buf: [1000]u8 = undefined; - var fba = std.heap.FixedBufferAllocator.init(&buf); - var string = std.ArrayList(u8).init(fba.allocator()); - try std.json.stringify(data, .{}, string.writer()); - - std.debug.print("{s}\n", .{string.items}); - // try posting our shitty data - try self.client.write(string.items); - } - - // asks /gateway/bot initializes both the ws client and the http client - pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents: Intents }) !Handler { - var req = FetchReq.init(allocator, args.token); - defer req.deinit(); - - const res = try req.makeRequest(.GET, BASE_URL ++ "/gateway/bot", null); - const body = try req.body.toOwnedSlice(); - defer allocator.free(body); - - // check status idk - if (res.status != http.Status.ok) { - @panic("we are cooked"); - } - - const parsed = try json.parseFromSlice(GatewayBotInfo, allocator, body, .{}); - defer parsed.deinit(); - - return .{ - .allocator = allocator, - .token = args.token, - .intents = args.intents, - // maybe there is a better way to do this - .client = try Handler._connect_ws(allocator, parsed.value.url["wss://".len..]), - .session_id = undefined, - .sequence = undefined, - .heartbeater = null, - .info = parsed.value, - }; - } - - inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { - var conn = try ws.Client.init(allocator, .{ - .tls = true, // important: zig.http doesn't support this, type shit - .port = 443, - .host = url, - //.ca_bundle = @import("tls12").Certificate.Bundle{}, - }); - - try conn.handshake("/?v=10&encoding=json", .{ - .timeout_ms = 1000, - .headers = "host: gateway.discord.gg", - }); - - return conn; - } - - pub fn deinit(self: *Handler) void { - defer self.client.deinit(); - if (self.heartbeater) |*hb| { - hb.deinit(); - } - } - - // listens for messages - pub fn readMessage(self: *Handler) !void { - try self.client.readTimeout(std.time.ms_per_s * 1); - while (true) { - const msg = (try self.client.read()) orelse { - // no message after our 1 second - std.debug.print(".", .{}); - continue; - }; - - // must be called once you're done processing the request - defer self.client.done(msg); - - std.debug.print("received: {s}\n", .{msg.data}); - - const DiscordData = struct { - s: ?u16, //well figure it out - op: Opcode, - d: json.Value, // needs parsing - t: ?[]const u8, - }; - - const raw = try json.parseFromSlice(DiscordData, self.allocator, msg.data, .{}); - defer raw.deinit(); - - const payload = raw.value; - - if (payload.op == Opcode.Dispatch) { - self.sequence = @as(?u16, payload.s); - } - - switch (payload.op) { - Opcode.Dispatch => {}, - Opcode.Hello => { - const HelloPayload = struct { heartbeat_interval: u32, _trace: [][]const u8 }; - const parsed = try json.parseFromValue(HelloPayload, self.allocator, payload.d, .{}); - const helloPayload = parsed.value; - - // PARSE NEW URL IN READY - - if (self.heartbeater == null) { - var self_mut = self.*; - - self.heartbeater = try HeartbeatHandler.init( - // we cooking - &self_mut, helloPayload.heartbeat_interval); - } - - var heartbeater = self.heartbeater.?; - - heartbeater.heartbeatInterval = helloPayload.heartbeat_interval; - - try self.heartbeat(false); - try heartbeater.loop(); - try self.identify(); - }, // heartbeat_interval - Opcode.HeartbeatACK => { - if (self.heartbeater) |*hb| { - hb.lastHeartbeatAck = true; - } - }, // keep this shit alive otherwise kill it - Opcode.Reconnect => {}, - Opcode.Resume => { - const WithSequence = struct { - token: []const u8, - session_id: []const u8, - seq: ?u16, - }; - const parsed = try json.parseFromValue(WithSequence, self.allocator, payload.d, .{}); - const payload_new = parsed.value; - - self.sequence = @as(?u16, payload_new.seq); - self.session_id = payload_new.session_id; - }, - else => { - std.debug.print("Unhandled {} -- {s}", .{ payload.op, "none" }); - }, - } - //try client.write(message.data); - } - } - - pub fn heartbeat(self: *Handler, requested: bool) !void { - var heartbeater = self.heartbeater.?; - //std.time.sleep(heartbeater.heartbeatInterval); - - if (!requested) { - if (!heartbeater.readAck()) { - try self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection"); - heartbeater.deinit(); - return; - } - heartbeater.updateAck(false); - } - const data = .{ .op = @intFromEnum(Opcode.Heartbeat), .d = self.sequence }; - - heartbeater.mutex.lock(); - defer heartbeater.mutex.unlock(); - - var buf: [1000]u8 = undefined; - var fba = std.heap.FixedBufferAllocator.init(&buf); - var string = std.ArrayList(u8).init(fba.allocator()); - try std.json.stringify(data, .{}, string.writer()); - - // try posting our shitty data - std.debug.print("sending heartbeat rn\n", .{}); - try self.client.write(string.items); - } - - pub inline fn reconnect(self: *Handler) !void { - _ = self; - //try self.disconnect(); - //_ = try self.connect(); - } - - pub fn connect(self: *Handler) !Handler { - std.time.sleep(std.time.ms_per_s * 5); - self.client = try Handler._connect_ws(self.allocator, self.gateway_url()); - - return self.*; - } - - pub fn disconnect(self: *Handler) !void { - try self.close(ShardSocketCloseCodes.Shutdown, "Shard down request"); - } - - pub fn close(self: *Handler, code: ShardSocketCloseCodes, reason: []const u8) !void { - std.debug.print("cooked closing ws conn...\n", .{}); - // Implement reconnection logic here - try self.client.close(.{ - .code = @intFromEnum(code), //u16 - .reason = reason, //[]const u8 - }); - } -}; diff --git a/src/discord.zig b/src/discord.zig index 878cd5a..479a5fd 100644 --- a/src/discord.zig +++ b/src/discord.zig @@ -12,7 +12,8 @@ const tls = std.crypto.tls; //const Certificate = @import("tls12").Certificate; // todo use this to read compressed messages const zlib = @import("zlib"); -const Scheduler = @import("scheduler.zig").Scheduler; + +const Self = @This(); const Opcode = enum(u4) { Dispatch = 0, @@ -120,374 +121,361 @@ const FetchReq = struct { } }; -pub const Handler = struct { - /// - /// https://discord.com/developers/docs/topics/gateway#get-gateway - /// - const GatewayInfo = struct { - /// The WSS URL that can be used for connecting to the gateway - url: []const u8, - }; - - /// - /// https://discord.com/developers/docs/events/gateway#session-start-limit-object - /// - const GatewaySessionStartLimit = struct { - /// Total number of session starts the current user is allowed - total: u32, - /// Remaining number of session starts the current user is allowed - remaining: u32, - /// Number of milliseconds after which the limit resets - reset_after: u32, - /// Number of identify requests allowed per 5 seconds - max_concurrency: u32, - }; - - /// - /// https://discord.com/developers/docs/topics/gateway#get-gateway-bot - /// - const GatewayBotInfo = struct { - url: []const u8, - /// - /// The recommended number of shards to use when connecting - /// - /// See https://discord.com/developers/docs/topics/gateway#sharding - /// - shards: u32, - /// - /// Information on the current session start limit - /// - /// See https://discord.com/developers/docs/topics/gateway#session-start-limit-object - /// - session_start_limit: ?GatewaySessionStartLimit, - }; - - const IdentifyProperties = struct { - /// - /// Operating system the shard runs on. - /// - os: []const u8, - /// - /// The "browser" where this shard is running on. - /// - browser: []const u8, - /// - /// The device on which the shard is running. - /// - device: []const u8, - }; - - const _default_properties = IdentifyProperties{ - .os = @tagName(builtin.os.tag), - .browser = "discord.zig", - .device = "discord.zig", - }; - - const Heart = struct { - heartbeatInterval: u32, - ack: bool, - /// useful for calculating ping - lastBeat: u64, - }; - - const HeartTask = union(enum) { - heartbeat, - ack, - increment: u64, - - pub fn run(task: HeartTask, ctx: *Handler, next_at: i64) void { - std.debug.print("task successful {s}\n", .{@tagName(task)}); - _ = next_at; - switch (task) { - .heartbeat => { - std.debug.print(">> ♥ and ack received: {}\n", .{ctx.heart.ack}); - if (!ctx.heart.ack) { - ctx.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; - @panic("zombied conn\n"); - } - - //do we really have to send this? - //ctx.heartbeat() catch { - // @panic("we are fucking cooked didn't send heartbeat fuckfuckfuck"); - //}; - ctx.heart.ack = false; - }, - .ack => { - ctx.heart.ack = true; - std.debug.print("<< ♥\n", .{}); - }, - .increment => |seq| { - ctx.sequence.* = seq; - }, - } - } - }; - - client: ws.Client, - token: []const u8, - intents: Intents, - heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }, - allocator: mem.Allocator, - resume_gateway_url: ?[]const u8 = null, - info: GatewayBotInfo, - heart_task: Scheduler(HeartTask, *Handler), - mutex: std.Thread.Mutex = .{}, - - session_id: ?[]const u8, - sequence: *u64, - - inline fn jitter() i1 { - return 0; - } - - pub inline fn resumable(self: *Handler) bool { - return self.resume_gateway_url != null and - self.session_id != null and - self.getSequence() > 0; - } - - pub fn resume_(self: *Handler) !void { - const data = .{ .op = @intFromEnum(Opcode.Resume), .d = .{ - .token = self.token, - .session_id = self.session_id, - .seq = self.getSequence(), - } }; - - try self.send(data); - } - - inline fn gateway_url(self: ?*Handler) []const u8 { - // wtf is this? - if (self) |s| { - return s.resume_gateway_url orelse s.info.url; - } - - return "wss://gateway.discord.gg"; - } - - // identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps - fn identify(self: *Handler) !void { - std.debug.print("identifying now...\n", .{}); - const data = .{ - .op = @intFromEnum(Opcode.Identify), - .d = .{ - //.compress = false, - .intents = self.intents.toRaw(), - .properties = Handler._default_properties, - .token = self.token, - }, - }; - - // try posting our shitty data - try self.send(data); - } - - // asks /gateway/bot initializes both the ws client and the http client - pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents: Intents }) !Handler { - var req = FetchReq.init(allocator, args.token); - defer req.deinit(); - - const res = try req.makeRequest(.GET, BASE_URL ++ "/gateway/bot", null); - const body = try req.body.toOwnedSlice(); - defer allocator.free(body); - - // check status idk - if (res.status != http.Status.ok) { - @panic("we are cooked\n"); - } - - const parsed = try json.parseFromSlice(GatewayBotInfo, allocator, body, .{}); - defer parsed.deinit(); - - var counter: u64 = 0; - - return .{ - .allocator = allocator, - .token = args.token, - .intents = args.intents, - // maybe there is a better way to do this - .client = try Handler._connect_ws(allocator, parsed.value.url["wss://".len..]), - .session_id = undefined, - .sequence = &counter, - .heart = undefined, - .info = parsed.value, - .heart_task = Scheduler(HeartTask, *Handler).init(allocator), - }; - } - - inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { - var conn = try ws.Client.init(allocator, .{ - .tls = true, // important: zig.http doesn't support this, type shit - .port = 443, - .host = url, - //.ca_bundle = @import("tls12").Certificate.Bundle{}, - }); - - conn.handshake("/?v=10&encoding=json", .{ - .timeout_ms = 1000, - .headers = "host: gateway.discord.gg", - }) catch unreachable; - - return conn; - } - - pub fn deinit(self: *Handler) void { - self.heart_task.deinit(); - self.client.deinit(); - std.debug.print("killing the whole bot\n", .{}); - } - - // listens for messages - pub fn readMessage(self: *Handler) !void { - try self.client.readTimeout(0); - while (true) { - const msg = self.client.read() catch |err| switch (err) { - error.Closed => return, - else => return err, - } orelse { - std.debug.print(".", .{}); - continue; - }; - // must be called once you're done processing the request - defer self.client.done(msg); - - std.debug.print("type of: {?s}\n", .{@tagName(msg.type)}); - - const DiscordData = struct { - s: ?u64, //well figure it out - op: Opcode, - d: json.Value, // needs parsing - t: ?[]const u8, - }; - - const raw = json.parseFromSlice(DiscordData, self.allocator, msg.data, .{}) catch { - std.debug.print("unexpected payload, continuing: {s}\n", .{msg.data}); - continue; - }; - defer raw.deinit(); - - const payload = raw.value; - - std.debug.print("received: {?s}\n", .{payload.t}); - - if (payload.op == Opcode.Dispatch) { - std.debug.print("our sequence: {?d}\n", .{payload.s}); - try self.heart_task.scheduleIn(.{ .increment = payload.s orelse 0 }, 0); - } - - switch (payload.op) { - Opcode.Dispatch => {}, - Opcode.Hello => { - const HelloPayload = struct { heartbeat_interval: u32, _trace: [][]const u8 }; - const parsed = try json.parseFromValue(HelloPayload, self.allocator, payload.d, .{}); - defer parsed.deinit(); - const helloPayload = parsed.value; - - // PARSE NEW URL IN READY - - self.heart = Heart{ - // TODO: fix bug - .heartbeatInterval = helloPayload.heartbeat_interval, - .ack = false, - .lastBeat = 0, - }; - - std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); - var self_mut = self.*; - try self.heart_task.start(&self_mut); - errdefer self.heart_task.deinit(); - try self.heart_task.scheduleIn(.heartbeat, @as(i64, @intCast(self.heart.heartbeatInterval))); - if (self.resumable()) { - try self.resume_(); - return; - } else { - try self.identify(); - try self.heartbeat(); - } - }, - Opcode.HeartbeatACK => { - try self.heart_task.scheduleIn(.ack, 0); - std.debug.print("got heartbeat\n", .{}); - }, - Opcode.Heartbeat => { - try self.heartbeat(); - }, - Opcode.Reconnect => { - std.debug.print("reconnecting\n", .{}); - try self.reconnect(); - }, - Opcode.Resume => { - const WithSequence = struct { - token: []const u8, - session_id: []const u8, - seq: ?u64, - }; - const parsed = try json.parseFromValue(WithSequence, self.allocator, payload.d, .{}); - const payload_new = parsed.value; - - try self.heart_task.scheduleIn(.{ .increment = payload_new.seq orelse 0 }, 0); - self.session_id = payload_new.session_id; - }, - Opcode.InvalidSession => {}, - else => { - std.debug.print("Unhandled {} -- {s}", .{ payload.op, "none" }); - }, - } - } - } - - pub fn heartbeat(self: *Handler) !void { - //std.time.sleep(heartbeater.heartbeatInterval); - const data = .{ .op = @intFromEnum(Opcode.Heartbeat), .d = if (self.getSequence() > 0) self.getSequence() else null }; - - std.debug.print("sending heartbeat rn\n", .{}); - try self.send(data); - } - - pub inline fn reconnect(self: *Handler) !void { - try self.disconnect(); - _ = try self.connect(); - } - - pub fn connect(self: *Handler) !Handler { - self.mutex.lock(); - defer self.mutex.unlock(); - //std.time.sleep(std.time.ms_per_s * 5); - self.client = try Handler._connect_ws(self.allocator, self.gateway_url()); - - return self.*; - } - - pub fn disconnect(self: *Handler) !void { - try self.close(ShardSocketCloseCodes.Shutdown, "Shard down request"); - } - - pub fn close(self: *Handler, code: ShardSocketCloseCodes, reason: []const u8) !void { - self.mutex.lock(); - defer self.mutex.unlock(); - std.debug.print("cooked closing ws conn...\n", .{}); - // Implement reconnection logic here - try self.client.close(.{ - .code = @intFromEnum(code), //u16 - .reason = reason, //[]const u8 - }); - } - - pub fn send(self: *Handler, data: anytype) !void { - self.mutex.lock(); - defer self.mutex.unlock(); - - var buf: [1000]u8 = undefined; - var fba = std.heap.FixedBufferAllocator.init(&buf); - var string = std.ArrayList(u8).init(fba.allocator()); - try std.json.stringify(data, .{}, string.writer()); - - std.debug.print("data we are sending: {s}\n", .{string.items}); - - try self.client.write(string.items); - } - - pub inline fn getSequence(self: *Handler) u64 { - return self.sequence.*; - } +/// +/// https://discord.com/developers/docs/topics/gateway#get-gateway +/// +const GatewayInfo = struct { + /// The WSS URL that can be used for connecting to the gateway + url: []const u8, }; + +/// +/// https://discord.com/developers/docs/events/gateway#session-start-limit-object +/// +const GatewaySessionStartLimit = struct { + /// Total number of session starts the current user is allowed + total: u32, + /// Remaining number of session starts the current user is allowed + remaining: u32, + /// Number of milliseconds after which the limit resets + reset_after: u32, + /// Number of identify requests allowed per 5 seconds + max_concurrency: u32, +}; + +/// +/// https://discord.com/developers/docs/topics/gateway#get-gateway-bot +/// +const GatewayBotInfo = struct { + url: []const u8, + /// + /// The recommended number of shards to use when connecting + /// + /// See https://discord.com/developers/docs/topics/gateway#sharding + /// + shards: u32, + /// + /// Information on the current session start limit + /// + /// See https://discord.com/developers/docs/topics/gateway#session-start-limit-object + /// + session_start_limit: ?GatewaySessionStartLimit, +}; + +const IdentifyProperties = struct { + /// + /// Operating system the shard runs on. + /// + os: []const u8, + /// + /// The "browser" where this shard is running on. + /// + browser: []const u8, + /// + /// The device on which the shard is running. + /// + device: []const u8, +}; + +const _default_properties = IdentifyProperties{ + .os = @tagName(builtin.os.tag), + .browser = "discord.zig", + .device = "discord.zig", +}; + +const Heart = struct { + heartbeatInterval: u64, + ack: bool, + /// useful for calculating ping + lastBeat: u64, +}; + +client: ws.Client, +token: []const u8, +intents: Intents, +heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }, +allocator: mem.Allocator, +resume_gateway_url: ?[]const u8 = null, +info: GatewayBotInfo, +mutex: std.Thread.Mutex = .{}, +listen_semaphore: std.Thread.Semaphore = .{ .permits = 1 }, +send_semaphore: std.Thread.Semaphore = .{ .permits = 1 }, + +session_id: ?[]const u8, +sequence: *u64, + +inline fn jitter() i1 { + return 0; +} + +pub inline fn resumable(self: *Self) bool { + return self.resume_gateway_url != null and + self.session_id != null and + self.getSequence() > 0; +} + +pub fn resume_(self: *Self) !void { + const data = .{ .op = @intFromEnum(Opcode.Resume), .d = .{ + .token = self.token, + .session_id = self.session_id, + .seq = self.getSequence(), + } }; + + try self.send(data); +} + +inline fn gateway_url(self: ?*Self) []const u8 { + // wtf is this? + if (self) |s| { + return s.resume_gateway_url orelse s.info.url; + } + + return "wss://gateway.discord.gg"; +} + +// identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps +fn identify(self: *Self) !void { + const data = .{ + .op = @intFromEnum(Opcode.Identify), + .d = .{ + //.compress = false, + .intents = self.intents.toRaw(), + .properties = Self._default_properties, + .token = self.token, + }, + }; + + // try posting our shitty data + try self.send(data); +} + +// asks /gateway/bot initializes both the ws client and the http client +pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents: Intents }) !Self { + var req = FetchReq.init(allocator, args.token); + defer req.deinit(); + + const res = try req.makeRequest(.GET, BASE_URL ++ "/gateway/bot", null); + const body = try req.body.toOwnedSlice(); + defer allocator.free(body); + + // check status idk + if (res.status != http.Status.ok) { + @panic("we are cooked\n"); + } + + const parsed = try json.parseFromSlice(GatewayBotInfo, allocator, body, .{}); + defer parsed.deinit(); + + var counter: u64 = 0; + + return .{ + .allocator = allocator, + .token = args.token, + .intents = args.intents, + // maybe there is a better way to do this + .client = try Self._connect_ws(allocator, parsed.value.url["wss://".len..]), + .session_id = undefined, + .sequence = &counter, + .heart = undefined, + .info = parsed.value, + }; +} + +inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { + var conn = try ws.Client.init(allocator, .{ + .tls = true, // important: zig.http doesn't support this, type shit + .port = 443, + .host = url, + //.ca_bundle = @import("tls12").Certificate.Bundle{}, + }); + + conn.handshake("/?v=10&encoding=json", .{ + .timeout_ms = 1000, + .headers = "host: gateway.discord.gg", + }) catch unreachable; + + return conn; +} + +pub fn deinit(self: *Self) void { + self.client.deinit(); + std.debug.print("killing the whole bot\n", .{}); +} + +// listens for messages +pub fn readMessage(self: *Self) !void { + try self.client.readTimeout(0); + + while (true) { + const msg = self.read() orelse { + continue; + }; + + defer self.client.done(msg); + + const DiscordData = struct { + s: ?u64, //well figure it out + op: Opcode, + d: json.Value, // needs parsing + t: ?[]const u8, + }; + + const raw = try json.parseFromSlice(DiscordData, self.allocator, msg.data, .{}); + defer raw.deinit(); + + const payload = raw.value; + + std.debug.print("received: {?s}\n", .{payload.t}); + + if (payload.op == Opcode.Dispatch) { + // maybe use mutex + self.setSequence(payload.s orelse 0); + } + + switch (payload.op) { + Opcode.Dispatch => {}, + Opcode.Hello => { + const HelloPayload = struct { heartbeat_interval: u64, _trace: [][]const u8 }; + const parsed = try json.parseFromValue(HelloPayload, self.allocator, payload.d, .{}); + defer parsed.deinit(); + const helloPayload = parsed.value; + + // PARSE NEW URL IN READY + + self.heart = Heart{ + // TODO: fix bug + .heartbeatInterval = helloPayload.heartbeat_interval, + .ack = false, + .lastBeat = 0, + }; + + std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); + + var self_mut = self.*; + const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{&self_mut}); + thread.detach(); + + if (self.resumable()) { + try self.resume_(); + return; + } else { + try self.identify(); + try self.heartbeat(); + } + }, + Opcode.HeartbeatACK => { + // perhaps this needs a mutex? + std.debug.print("got heartbeat ack\n", .{}); + self.heart.ack = true; + }, + Opcode.Heartbeat => { + try self.heartbeat(); + }, + Opcode.Reconnect => { + std.debug.print("reconnecting\n", .{}); + try self.reconnect(); + }, + Opcode.Resume => { + const WithSequence = struct { + token: []const u8, + session_id: []const u8, + seq: ?u64, + }; + const parsed = try json.parseFromValue(WithSequence, self.allocator, payload.d, .{}); + const payload_new = parsed.value; + + self.setSequence(payload_new.seq orelse 0); + self.session_id = payload_new.session_id; + }, + Opcode.InvalidSession => {}, + else => { + std.debug.print("Unhandled {} -- {s}", .{ payload.op, "none" }); + }, + } + } +} + +pub fn heartbeat(self: *Self) !void { + const data = .{ .op = @intFromEnum(Opcode.Heartbeat), .d = if (self.getSequence() > 0) self.getSequence() else null }; + + try self.send(data); +} + +pub fn heartbeat_wait(self: *Self) !void { + std.debug.print("zzz for {d}\n", .{self.heart.heartbeatInterval}); + std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * self.heart.heartbeatInterval))); + + self.send_semaphore.wait(); + defer self.send_semaphore.post(); + + std.debug.print(">> ♥ and ack received: {}\n", .{self.heart.ack}); + + if (self.heart.ack) { + self.heartbeat() catch unreachable; + } else { + self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; + @panic("zombied conn\n"); + } +} + +pub inline fn reconnect(self: *Self) !void { + try self.disconnect(); + _ = try self.connect(); +} + +pub fn connect(self: *Self) !Self { + self.mutex.lock(); + defer self.mutex.unlock(); + + //std.time.sleep(std.time.ms_per_s * 5); + self.client = try Self._connect_ws(self.allocator, self.gateway_url()); + + return self.*; +} + +pub fn disconnect(self: *Self) !void { + try self.close(ShardSocketCloseCodes.Shutdown, "Shard down request"); +} + +pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void { + self.mutex.lock(); + defer self.mutex.unlock(); + + std.debug.print("cooked closing ws conn...\n", .{}); + // Implement reconnection logic here + try self.client.close(.{ + .code = @intFromEnum(code), //u16 + .reason = reason, //[]const u8 + }); +} + +pub fn read(self: *Self) ?ws.proto.Message { + self.listen_semaphore.wait(); + defer self.listen_semaphore.post(); + + const msg = self.client.read() catch |err| switch (err) { + error.Closed => return null, + else => return null, + } orelse unreachable; + + return msg; +} + +pub fn send(self: *Self, data: anytype) !void { + var buf: [1000]u8 = undefined; + var fba = std.heap.FixedBufferAllocator.init(&buf); + var string = std.ArrayList(u8).init(fba.allocator()); + try std.json.stringify(data, .{}, string.writer()); + + std.debug.print("data we are sending: {s}\n", .{string.items}); + + try self.client.write(string.items); +} + +pub inline fn getSequence(self: *Self) u64 { + return self.sequence.*; +} + +pub inline fn setSequence(self: *Self, new: u64) void { + self.sequence.* = new; +} diff --git a/src/main.zig b/src/main.zig index 4391f8a..c3726d5 100644 --- a/src/main.zig +++ b/src/main.zig @@ -1,5 +1,4 @@ -const Handler = @import("discord.zig").Handler; -const Intents = @import("discord.zig").Intents; +const Session = @import("discord.zig"); const std = @import("std"); const TOKEN = "Bot MTI5ODgzOTgzMDY3OTEzMDE4OA.GNojts.iyblGKK0xTWU57QCG5n3hr2Be1whyylTGr44P0"; @@ -11,9 +10,9 @@ pub fn main() !void { }; const alloc = gpa.allocator(); - var handler = try Handler.init(alloc, .{ .token = TOKEN, .intents = Intents.fromRaw(513) }); + var handler = try Session.init(alloc, .{ .token = TOKEN, .intents = Session.Intents.fromRaw(513) }); errdefer handler.deinit(); - const t = try std.Thread.spawn(.{}, Handler.readMessage, .{&handler}); + const t = try std.Thread.spawn(.{}, Session.readMessage, .{&handler}); defer t.join(); }