This commit is contained in:
rainfall 2024-10-31 18:54:41 -05:00
parent 0db8540981
commit 5c58323242

View File

@ -15,6 +15,8 @@ const zlib = @import("zlib");
const Self = @This(); const Self = @This();
var heart = Heart{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 };
const Opcode = enum(u4) { const Opcode = enum(u4) {
Dispatch = 0, Dispatch = 0,
Heartbeat = 1, Heartbeat = 1,
@ -193,13 +195,12 @@ const Heart = struct {
client: ws.Client, client: ws.Client,
token: []const u8, token: []const u8,
intents: Intents, intents: Intents,
heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 }, //heart: Heart =
allocator: mem.Allocator, allocator: mem.Allocator,
resume_gateway_url: ?[]const u8 = null, resume_gateway_url: ?[]const u8 = null,
info: GatewayBotInfo, info: GatewayBotInfo,
mutex: std.Thread.Mutex = .{}, mutex: std.Thread.Mutex = .{},
listen_semaphore: std.Thread.Semaphore = .{ .permits = 1 }, listen_semaphore: std.Thread.Semaphore = .{ .permits = 1 },
send_semaphore: std.Thread.Semaphore = .{ .permits = 1 },
session_id: ?[]const u8, session_id: ?[]const u8,
sequence: *u64, sequence: *u64,
@ -276,7 +277,6 @@ pub fn init(allocator: mem.Allocator, args: struct { token: []const u8, intents:
.client = try Self._connect_ws(allocator, parsed.value.url["wss://".len..]), .client = try Self._connect_ws(allocator, parsed.value.url["wss://".len..]),
.session_id = undefined, .session_id = undefined,
.sequence = &counter, .sequence = &counter,
.heart = undefined,
.info = parsed.value, .info = parsed.value,
}; };
} }
@ -307,6 +307,9 @@ pub fn readMessage(self: *Self) !void {
try self.client.readTimeout(0); try self.client.readTimeout(0);
while (true) { while (true) {
self.listen_semaphore.wait();
defer self.listen_semaphore.post();
const msg = self.read() orelse { const msg = self.read() orelse {
continue; continue;
}; };
@ -342,14 +345,16 @@ pub fn readMessage(self: *Self) !void {
// PARSE NEW URL IN READY // PARSE NEW URL IN READY
self.heart = Heart{ heart = Heart{
// TODO: fix bug // TODO: fix bug
.heartbeatInterval = helloPayload.heartbeat_interval, .heartbeatInterval = helloPayload.heartbeat_interval,
.ack = false, .ack = false,
.lastBeat = 0, .lastBeat = 0,
}; };
std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval}); std.debug.print("starting heart beater. seconds:{d}...\n", .{heart.heartbeatInterval});
try self.heartbeat();
var self_mut = self.*; var self_mut = self.*;
const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{&self_mut}); const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{&self_mut});
@ -360,15 +365,18 @@ pub fn readMessage(self: *Self) !void {
return; return;
} else { } else {
try self.identify(); try self.identify();
try self.heartbeat();
} }
}, },
Opcode.HeartbeatACK => { Opcode.HeartbeatACK => {
self.mutex.lock();
defer self.mutex.unlock();
// perhaps this needs a mutex? // perhaps this needs a mutex?
std.debug.print("got heartbeat ack\n", .{}); std.debug.print("got heartbeat ack\n", .{});
self.heart.ack = true; heart.ack = true;
}, },
Opcode.Heartbeat => { Opcode.Heartbeat => {
self.mutex.lock();
defer self.mutex.unlock();
try self.heartbeat(); try self.heartbeat();
}, },
Opcode.Reconnect => { Opcode.Reconnect => {
@ -402,15 +410,15 @@ pub fn heartbeat(self: *Self) !void {
} }
pub fn heartbeat_wait(self: *Self) !void { pub fn heartbeat_wait(self: *Self) !void {
std.debug.print("zzz for {d}\n", .{self.heart.heartbeatInterval}); std.debug.print("zzz for {d}\n", .{heart.heartbeatInterval});
std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * self.heart.heartbeatInterval))); std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * heart.heartbeatInterval)));
self.send_semaphore.wait(); self.listen_semaphore.wait();
defer self.send_semaphore.post(); defer self.listen_semaphore.post();
std.debug.print(">> ♥ and ack received: {}\n", .{self.heart.ack}); std.debug.print(">> ♥ and ack received: {}\n", .{heart.ack});
if (self.heart.ack) { if (heart.ack == true) {
self.heartbeat() catch unreachable; self.heartbeat() catch unreachable;
} else { } else {
self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable;
@ -450,9 +458,6 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void
} }
pub fn read(self: *Self) ?ws.proto.Message { pub fn read(self: *Self) ?ws.proto.Message {
self.listen_semaphore.wait();
defer self.listen_semaphore.post();
const msg = self.client.read() catch |err| switch (err) { const msg = self.client.read() catch |err| switch (err) {
error.Closed => return null, error.Closed => return null,
else => return null, else => return null,