From 5c583232420ce4557177ece9bd3a4144d24f1e44 Mon Sep 17 00:00:00 2001 From: rainfall Date: Thu, 31 Oct 2024 18:54:41 -0500 Subject: [PATCH] i did it --- src/discord.zig | 37 +++++++++++++++++++++---------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/src/discord.zig b/src/discord.zig index 479a5fd..407a558 100644 --- a/src/discord.zig +++ b/src/discord.zig @@ -15,6 +15,8 @@ const zlib = @import("zlib"); const Self = @This(); +var heart = Heart{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }; + const Opcode = enum(u4) { Dispatch = 0, Heartbeat = 1, @@ -193,13 +195,12 @@ const Heart = struct { client: ws.Client, token: []const u8, intents: Intents, -heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }, +//heart: Heart = allocator: mem.Allocator, resume_gateway_url: ?[]const u8 = null, info: GatewayBotInfo, mutex: std.Thread.Mutex = .{}, listen_semaphore: std.Thread.Semaphore = .{ .permits = 1 }, -send_semaphore: std.Thread.Semaphore = .{ .permits = 1 }, session_id: ?[]const u8, sequence: *u64, @@ -276,7 +277,6 @@ pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents: .client = try Self._connect_ws(allocator, parsed.value.url["wss://".len..]), .session_id = undefined, .sequence = &counter, - .heart = undefined, .info = parsed.value, }; } @@ -307,6 +307,9 @@ pub fn readMessage(self: *Self) !void { try self.client.readTimeout(0); while (true) { + self.listen_semaphore.wait(); + defer self.listen_semaphore.post(); + const msg = self.read() orelse { continue; }; @@ -342,14 +345,16 @@ pub fn readMessage(self: *Self) !void { // PARSE NEW URL IN READY - self.heart = Heart{ + heart = Heart{ // TODO: fix bug .heartbeatInterval = helloPayload.heartbeat_interval, .ack = false, .lastBeat = 0, }; - std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); + std.debug.print("starting heart beater. seconds:{d}...\n", .{heart.heartbeatInterval}); + + try self.heartbeat(); var self_mut = self.*; const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{&self_mut}); @@ -360,15 +365,18 @@ pub fn readMessage(self: *Self) !void { return; } else { try self.identify(); - try self.heartbeat(); } }, Opcode.HeartbeatACK => { + self.mutex.lock(); + defer self.mutex.unlock(); // perhaps this needs a mutex? std.debug.print("got heartbeat ack\n", .{}); - self.heart.ack = true; + heart.ack = true; }, Opcode.Heartbeat => { + self.mutex.lock(); + defer self.mutex.unlock(); try self.heartbeat(); }, Opcode.Reconnect => { @@ -402,15 +410,15 @@ pub fn heartbeat(self: *Self) !void { } pub fn heartbeat_wait(self: *Self) !void { - std.debug.print("zzz for {d}\n", .{self.heart.heartbeatInterval}); - std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * self.heart.heartbeatInterval))); + std.debug.print("zzz for {d}\n", .{heart.heartbeatInterval}); + std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * heart.heartbeatInterval))); - self.send_semaphore.wait(); - defer self.send_semaphore.post(); + self.listen_semaphore.wait(); + defer self.listen_semaphore.post(); - std.debug.print(">> ♥ and ack received: {}\n", .{self.heart.ack}); + std.debug.print(">> ♥ and ack received: {}\n", .{heart.ack}); - if (self.heart.ack) { + if (heart.ack == true) { self.heartbeat() catch unreachable; } else { self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; @@ -450,9 +458,6 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void } pub fn read(self: *Self) ?ws.proto.Message { - self.listen_semaphore.wait(); - defer self.listen_semaphore.post(); - const msg = self.client.read() catch |err| switch (err) { error.Closed => return null, else => return null,