From d5e0c938670efc8d52f8a9a56056263d91fab7ac Mon Sep 17 00:00:00 2001 From: rainfall Date: Sun, 3 Nov 2024 13:44:36 -0500 Subject: [PATCH] added zlib + fixed jitter --- src/discord.zig | 118 ++++++++++++++++++++++++++++++++---------------- src/main.zig | 1 + 2 files changed, 79 insertions(+), 40 deletions(-) diff --git a/src/discord.zig b/src/discord.zig index c07ef93..ad9683b 100644 --- a/src/discord.zig +++ b/src/discord.zig @@ -14,6 +14,7 @@ const zmpl = @import("zmpl"); const Discord = @import("raw_types.zig"); +const debug = std.log.scoped(.@"discord.zig"); const Self = @This(); const GatewayPayload = Discord.GatewayPayload; @@ -226,9 +227,12 @@ heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }, /// handler: GatewayDispatchEvent, +packets: std.ArrayList(u8), +inflator: zlib.Decompressor, ///useful for closing the conn mutex: std.Thread.Mutex = .{}, +log: Log = .no, fn parseJson(self: *Self, raw: []const u8) !zmpl.Data { var data = zmpl.Data.init(self.allocator); @@ -236,10 +240,6 @@ fn parseJson(self: *Self, raw: []const u8) !zmpl.Data { return data; } -inline fn jitter() i1 { - return 0; -} - pub inline fn resumable(self: *Self) bool { return self.resume_gateway_url != null and self.session_id != null and @@ -267,7 +267,7 @@ inline fn gateway_url(self: ?*Self) []const u8 { // identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps fn identify(self: *Self) !void { - std.debug.print("intents: {d}", .{self.intents.toRaw()}); + self.logif("intents: {d}", .{self.intents.toRaw()}); const data = .{ .op = @intFromEnum(Opcode.Identify), .d = .{ @@ -282,11 +282,14 @@ fn identify(self: *Self) !void { try self.send(data); } +const Log = union(enum) { yes, no }; + // asks /gateway/bot initializes both the ws client and the http client pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents: Intents, run: GatewayDispatchEvent, + log: Log, }) !Self { var req = FetchReq.init(allocator, args.token); defer req.deinit(); @@ -314,6 +317,9 @@ pub fn init(allocator: mem.Allocator, args: struct { .sequence = 0, .info = parsed.value, .handler = args.run, + .log = args.log, + .packets = std.ArrayList(u8).init(allocator), + .inflator = try zlib.Decompressor.init(allocator, .{ .header = .zlib_or_gzip }), }; } @@ -324,7 +330,7 @@ inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { .host = url, }); - conn.handshake("/?v=10&encoding=json", .{ + conn.handshake("/?v=10&encoding=json&compress=zlib-stream", .{ .timeout_ms = 1000, .headers = "host: gateway.discord.gg", }) catch unreachable; @@ -334,7 +340,20 @@ inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { pub fn deinit(self: *Self) void { self.client.deinit(); - std.debug.print("killing the whole bot\n", .{}); + self.logif("killing the whole bot\n", .{}); +} + +pub fn ensureCompressed(data: []const u8, comptime pattern: []const u8) bool { + if (data.len < pattern.len) { + return false; + } + + const start_index: usize = data.len - pattern.len; + + for (0..pattern.len) |i| { + if (data[start_index + i] != pattern[i]) return false; + } + return true; } // listens for messages @@ -342,13 +361,21 @@ pub fn readMessage(self: *Self, _: anytype) !void { try self.client.readTimeout(0); while (true) { - const msg = (try self.client.read()) orelse { - std.debug.print(".", .{}); + const msg = (try self.client.read()) orelse continue; - }; defer self.client.done(msg); + // self.logif("received: {?s}\n", .{msg.data}); + try self.packets.appendSlice(msg.data); + + if (!Self.ensureCompressed(msg.data, &[4]u8{ 0x00, 0x00, 0xFF, 0xFF })) + continue; + + // self.logif("{b}\n", .{self.packets.items}); + const buf = try self.packets.toOwnedSlice(); + const decompressed = try self.inflator.decompressAllAlloc(buf); + const raw = try json.parseFromSlice(struct { /// opcode for the payload op: isize, @@ -358,17 +385,15 @@ pub fn readMessage(self: *Self, _: anytype) !void { s: ?i64, /// The event name for this payload t: ?[]const u8, - }, self.allocator, msg.data, .{}); + }, self.allocator, decompressed, .{}); const payload = raw.value; - //std.debug.print("received: {?s} with content {?s}\n", .{ payload.t, msg.data }); - switch (@as(Opcode, @enumFromInt(payload.op))) { Opcode.Dispatch => { self.setSequence(payload.s orelse 0); // maybe use threads and call it instead from there - if (payload.t) |name| try self.handleEvent(name, msg.data); + if (payload.t) |name| try self.handleEvent(name, decompressed); }, Opcode.Hello => { { @@ -385,11 +410,14 @@ pub fn readMessage(self: *Self, _: anytype) !void { .lastBeat = 0, }; - std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); + self.logif("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); try self.heartbeat(); - const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{self}); + var prng = std.Random.DefaultPrng.init(0); + const jitter = std.Random.float(prng.random(), f64); + + const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{ self, jitter }); thread.detach(); if (self.resumable()) { @@ -402,7 +430,7 @@ pub fn readMessage(self: *Self, _: anytype) !void { }, Opcode.HeartbeatACK => { // perhaps this needs a mutex? - std.debug.print("got heartbeat ack\n", .{}); + self.logif("got heartbeat ack\n", .{}); self.mutex.lock(); defer self.mutex.unlock(); @@ -410,11 +438,11 @@ pub fn readMessage(self: *Self, _: anytype) !void { self.heart.ack = true; }, Opcode.Heartbeat => { - std.debug.print("sending requested heartbeat\n", .{}); + self.logif("sending requested heartbeat\n", .{}); try self.heartbeat(); }, Opcode.Reconnect => { - std.debug.print("reconnecting\n", .{}); + self.logif("reconnecting\n", .{}); try self.reconnect(); }, Opcode.Resume => { @@ -433,7 +461,7 @@ pub fn readMessage(self: *Self, _: anytype) !void { }, Opcode.InvalidSession => {}, else => { - std.debug.print("Unhandled {d} -- {s}", .{ payload.op, "none" }); + self.logif("Unhandled {d} -- {s}", .{ payload.op, "none" }); }, } } @@ -445,25 +473,28 @@ pub fn heartbeat(self: *Self) !void { try self.send(data); } -pub fn heartbeat_wait(self: *Self) !void { - while (true) { - std.debug.print("zzz for {d}\n", .{self.heart.heartbeatInterval}); - std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * self.heart.heartbeatInterval))); - - std.debug.print(">> ♥ and ack received: {}\n", .{self.heart.ack}); - - self.mutex.lock(); - defer self.mutex.unlock(); - - if (self.heart.ack == true) { - std.debug.print("sending unrequested heartbeat\n", .{}); - self.heartbeat() catch unreachable; - try self.client.readTimeout(1000); - } else { - self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; - @panic("zombied conn\n"); - } +pub fn heartbeat_wait(self: *Self, jitter: f64) !void { + if (jitter == 1.0) { + self.logif("zzz for {d}\n", .{self.heart.heartbeatInterval}); + std.Thread.sleep(std.time.ns_per_ms * self.heart.heartbeatInterval); + } else { + const timeout = @as(f64, @floatFromInt(self.heart.heartbeatInterval)) * jitter; + self.logif("zzz for {d} and jitter {d}\n", .{ @as(u64, @intFromFloat(timeout)), jitter }); + std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intFromFloat(timeout))); } + + self.logif(">> ♥ and ack received: {}\n", .{self.heart.ack}); + + if (self.heart.ack) { + self.logif("sending unrequested heartbeat\n", .{}); + try self.heartbeat(); + try self.client.readTimeout(1000); + } else { + self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; + @panic("zombied conn\n"); + } + + return heartbeat_wait(self, 1.0); } pub inline fn reconnect(self: *Self) !void { @@ -489,7 +520,7 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void self.mutex.lock(); defer self.mutex.unlock(); - std.debug.print("cooked closing ws conn...\n", .{}); + self.logif("cooked closing ws conn...\n", .{}); // Implement reconnection logic here try self.client.close(.{ .code = @intFromEnum(code), //u16 @@ -503,7 +534,7 @@ pub fn send(self: *Self, data: anytype) !void { var string = std.ArrayList(u8).init(fba.allocator()); try std.json.stringify(data, .{}, string.writer()); - //std.debug.print("{s}\n", .{string.items}); + //self.logif("{s}\n", .{string.items}); try self.client.write(string.items); } @@ -645,3 +676,10 @@ pub fn handleEvent(self: *Self, name: []const u8, payload: []const u8) !void { @call(.auto, self.handler.message_create, .{m}); } else {} } + +inline fn logif(self: *Self, comptime format: []const u8, args: anytype) void { + switch (self.log) { + .yes => debug.info(format, args), + .no => {}, + } +} diff --git a/src/main.zig b/src/main.zig index 5edb4a1..b0a76b2 100644 --- a/src/main.zig +++ b/src/main.zig @@ -18,6 +18,7 @@ pub fn main() !void { .token = TOKEN, .intents = Intents.fromRaw(37379), .run = Session.GatewayDispatchEvent{ .message_create = &message_create }, + .log = .yes, }); errdefer handler.deinit();