From 8c427231de54cb31f667d135cae53d6ba4ef6c16 Mon Sep 17 00:00:00 2001 From: yuzu Date: Fri, 16 May 2025 11:35:47 -0500 Subject: [PATCH] fix --- src/http/http.zig | 2 +- src/shard/shard.zig | 52 +++++++++++++++++++++++++++++++++++---------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/src/http/http.zig b/src/http/http.zig index b647853..09eb8b6 100644 --- a/src/http/http.zig +++ b/src/http/http.zig @@ -110,7 +110,7 @@ pub const FetchReq = struct { pub fn delete(self: *FetchReq, path: []const u8) !Result(void) { const result = try self.makeRequest(.DELETE, path, null); if (result.status != .no_content) - return try json_helpers.tryParse(DiscordError, void, self.allocator, try self.body.toOwnedSlice()); + return try json_helpers.parseRight(DiscordError, void, self.allocator, try self.body.toOwnedSlice()); return .ok({}); } diff --git a/src/shard/shard.zig b/src/shard/shard.zig index 1fc5e91..09b43c5 100644 --- a/src/shard/shard.zig +++ b/src/shard/shard.zig @@ -26,7 +26,7 @@ const http = std.http; // todo use this to read compressed messages const zlib = @import("zlib"); -const json = @import("std").json; +const json = std.json; const IdentifyProperties = @import("util.zig").IdentifyProperties; const GatewayInfo = @import("util.zig").GatewayInfo; @@ -214,13 +214,26 @@ pub fn deinit(self: *Self) void { self.client.deinit(); } -const ReadMessageError = mem.Allocator.Error || zlib.Error || json.ParseError(json.Scanner); +pub const ReadError = +std.posix.SetSockOptError +|| json.ParseError(json.Scanner) +|| tls.Client.InitError(tls.Client.StreamInterface) +|| zlib.Error +|| mem.Allocator.Error +|| std.Thread.SpawnError ; /// listens for messages -fn readMessage(self: *Self, _: anytype) !void { +fn readMessage(self: *Self, _: anytype) (ReadError || SendError || ReconnectError)!void { try self.client.readTimeout(0); - while (try self.client.read()) |msg| { // check your intents, dumbass + while (true) { + const msg = self.client.read() catch |err| { + self.logif( + \\couldn't read message because {s} + \\exiting thread + , .{@errorName(err)}); + return; + } orelse unreachable; defer self.client.done(msg); try self.packets.appendSlice(self.allocator, msg.data); @@ -230,7 +243,10 @@ fn readMessage(self: *Self, _: anytype) !void { continue; const buf = try self.packets.toOwnedSlice(self.allocator); - const decompressed = try self.inflator.decompressAllAlloc(buf); + const decompressed = self.inflator.decompressAllAlloc(buf) catch |err| { + self.logif("couldn't decompress because {s}", .{@errorName(err)}); + continue; + }; defer self.allocator.free(decompressed); // std.debug.print("Decompressed: {s}\n", .{decompressed}); @@ -278,7 +294,7 @@ fn readMessage(self: *Self, _: anytype) !void { // run thread pool if (self.sharder_pool) |sharder_pool| { try sharder_pool.spawn(handleEventNoError, .{ self, name, payload, &scanner }); - } else try self.handleEvent(name, payload.*); + } else self.handleEventNoError(name, payload, &scanner); } }, .Hello => { @@ -305,6 +321,9 @@ fn readMessage(self: *Self, _: anytype) !void { var prng = std.Random.DefaultPrng.init(0); const jitter = std.Random.float(prng.random(), f64); self.heart.lastBeat = std.time.milliTimestamp(); + + self.logif("new heartbeater for shard #{d}", .{self.id}); + const heartbeat_writer = try std.Thread.spawn(.{}, heartbeat, .{ self, jitter }); heartbeat_writer.detach(); }, @@ -356,6 +375,8 @@ pub fn heartbeat(self: *Self, initial_jitter: f64) SendHeartbeatError!void { std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intFromFloat(timeout))); } + self.logif("heartbeating on shard {d}", .{self.id}); + self.rw_mutex.lock(); const last = self.heart.lastBeat; self.rw_mutex.unlock(); @@ -367,7 +388,7 @@ pub fn heartbeat(self: *Self, initial_jitter: f64) SendHeartbeatError!void { if ((std.time.milliTimestamp() - last) > (5000 * self.heart.heartbeatInterval)) { try self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection"); - @panic("zombied conn\n"); + @panic("zombied conn"); } jitter = 1.0; @@ -394,13 +415,22 @@ pub fn connect(self: *Self) ConnectError!void { self.readMessage(null) catch |err| switch (err) { // weird Windows error // https://github.com/ziglang/zig/issues/21492 - std.net.Stream.ReadError.NotOpenForReading, error.Closed => { + net.Stream.ReadError.NotOpenForReading => { std.debug.panic("Shard {d}: Stream closed unexpectedly", .{self.id}); // still check your intents }, + // debug this further + error.DataError => |e| { + std.debug.panic("Shard {d} DATA ERROR: panicked with error {s} code {d} in thread {any}", .{ + self.id, + @errorName(e), + @intFromError(e), + std.Thread.getCurrentId(), + }); + }, else => { // log that the connection died, but don't stop the bot - self.logif("Shard {d} closed with error: {s}\n", .{self.id, @errorName(err)}); - self.logif("Attempting to reconnect...\n", .{}); + self.logif("Shard {d} closed with error: {s}", .{self.id, @errorName(err)}); + self.logif("Attempting to reconnect...", .{}); // reconnect self.reconnect() catch unreachable; } @@ -420,7 +450,7 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) Close .reason = reason, //[]const u8 }) catch { // log that the connection died, but don't stop the bot - self.logif("Shard {d} closed with error: {s} # {d}\n", .{self.id, reason, @intFromEnum(code)}); + self.logif("Shard {d} closed with error: {s} # {d}", .{self.id, reason, @intFromEnum(code)}); }; }