This commit is contained in:
yuzu 2025-05-16 11:35:47 -05:00
parent 636d53fc26
commit 8c427231de
2 changed files with 42 additions and 12 deletions

View File

@ -110,7 +110,7 @@ pub const FetchReq = struct {
pub fn delete(self: *FetchReq, path: []const u8) !Result(void) { pub fn delete(self: *FetchReq, path: []const u8) !Result(void) {
const result = try self.makeRequest(.DELETE, path, null); const result = try self.makeRequest(.DELETE, path, null);
if (result.status != .no_content) if (result.status != .no_content)
return try json_helpers.tryParse(DiscordError, void, self.allocator, try self.body.toOwnedSlice()); return try json_helpers.parseRight(DiscordError, void, self.allocator, try self.body.toOwnedSlice());
return .ok({}); return .ok({});
} }

View File

@ -26,7 +26,7 @@ const http = std.http;
// todo use this to read compressed messages // todo use this to read compressed messages
const zlib = @import("zlib"); const zlib = @import("zlib");
const json = @import("std").json; const json = std.json;
const IdentifyProperties = @import("util.zig").IdentifyProperties; const IdentifyProperties = @import("util.zig").IdentifyProperties;
const GatewayInfo = @import("util.zig").GatewayInfo; const GatewayInfo = @import("util.zig").GatewayInfo;
@ -214,13 +214,26 @@ pub fn deinit(self: *Self) void {
self.client.deinit(); self.client.deinit();
} }
const ReadMessageError = mem.Allocator.Error || zlib.Error || json.ParseError(json.Scanner); pub const ReadError =
std.posix.SetSockOptError
|| json.ParseError(json.Scanner)
|| tls.Client.InitError(tls.Client.StreamInterface)
|| zlib.Error
|| mem.Allocator.Error
|| std.Thread.SpawnError ;
/// listens for messages /// listens for messages
fn readMessage(self: *Self, _: anytype) !void { fn readMessage(self: *Self, _: anytype) (ReadError || SendError || ReconnectError)!void {
try self.client.readTimeout(0); try self.client.readTimeout(0);
while (try self.client.read()) |msg| { // check your intents, dumbass while (true) {
const msg = self.client.read() catch |err| {
self.logif(
\\couldn't read message because {s}
\\exiting thread
, .{@errorName(err)});
return;
} orelse unreachable;
defer self.client.done(msg); defer self.client.done(msg);
try self.packets.appendSlice(self.allocator, msg.data); try self.packets.appendSlice(self.allocator, msg.data);
@ -230,7 +243,10 @@ fn readMessage(self: *Self, _: anytype) !void {
continue; continue;
const buf = try self.packets.toOwnedSlice(self.allocator); const buf = try self.packets.toOwnedSlice(self.allocator);
const decompressed = try self.inflator.decompressAllAlloc(buf); const decompressed = self.inflator.decompressAllAlloc(buf) catch |err| {
self.logif("couldn't decompress because {s}", .{@errorName(err)});
continue;
};
defer self.allocator.free(decompressed); defer self.allocator.free(decompressed);
// std.debug.print("Decompressed: {s}\n", .{decompressed}); // std.debug.print("Decompressed: {s}\n", .{decompressed});
@ -278,7 +294,7 @@ fn readMessage(self: *Self, _: anytype) !void {
// run thread pool // run thread pool
if (self.sharder_pool) |sharder_pool| { if (self.sharder_pool) |sharder_pool| {
try sharder_pool.spawn(handleEventNoError, .{ self, name, payload, &scanner }); try sharder_pool.spawn(handleEventNoError, .{ self, name, payload, &scanner });
} else try self.handleEvent(name, payload.*); } else self.handleEventNoError(name, payload, &scanner);
} }
}, },
.Hello => { .Hello => {
@ -305,6 +321,9 @@ fn readMessage(self: *Self, _: anytype) !void {
var prng = std.Random.DefaultPrng.init(0); var prng = std.Random.DefaultPrng.init(0);
const jitter = std.Random.float(prng.random(), f64); const jitter = std.Random.float(prng.random(), f64);
self.heart.lastBeat = std.time.milliTimestamp(); self.heart.lastBeat = std.time.milliTimestamp();
self.logif("new heartbeater for shard #{d}", .{self.id});
const heartbeat_writer = try std.Thread.spawn(.{}, heartbeat, .{ self, jitter }); const heartbeat_writer = try std.Thread.spawn(.{}, heartbeat, .{ self, jitter });
heartbeat_writer.detach(); heartbeat_writer.detach();
}, },
@ -356,6 +375,8 @@ pub fn heartbeat(self: *Self, initial_jitter: f64) SendHeartbeatError!void {
std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intFromFloat(timeout))); std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intFromFloat(timeout)));
} }
self.logif("heartbeating on shard {d}", .{self.id});
self.rw_mutex.lock(); self.rw_mutex.lock();
const last = self.heart.lastBeat; const last = self.heart.lastBeat;
self.rw_mutex.unlock(); self.rw_mutex.unlock();
@ -367,7 +388,7 @@ pub fn heartbeat(self: *Self, initial_jitter: f64) SendHeartbeatError!void {
if ((std.time.milliTimestamp() - last) > (5000 * self.heart.heartbeatInterval)) { if ((std.time.milliTimestamp() - last) > (5000 * self.heart.heartbeatInterval)) {
try self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection"); try self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection");
@panic("zombied conn\n"); @panic("zombied conn");
} }
jitter = 1.0; jitter = 1.0;
@ -394,13 +415,22 @@ pub fn connect(self: *Self) ConnectError!void {
self.readMessage(null) catch |err| switch (err) { self.readMessage(null) catch |err| switch (err) {
// weird Windows error // weird Windows error
// https://github.com/ziglang/zig/issues/21492 // https://github.com/ziglang/zig/issues/21492
std.net.Stream.ReadError.NotOpenForReading, error.Closed => { net.Stream.ReadError.NotOpenForReading => {
std.debug.panic("Shard {d}: Stream closed unexpectedly", .{self.id}); // still check your intents std.debug.panic("Shard {d}: Stream closed unexpectedly", .{self.id}); // still check your intents
}, },
// debug this further
error.DataError => |e| {
std.debug.panic("Shard {d} DATA ERROR: panicked with error {s} code {d} in thread {any}", .{
self.id,
@errorName(e),
@intFromError(e),
std.Thread.getCurrentId(),
});
},
else => { else => {
// log that the connection died, but don't stop the bot // log that the connection died, but don't stop the bot
self.logif("Shard {d} closed with error: {s}\n", .{self.id, @errorName(err)}); self.logif("Shard {d} closed with error: {s}", .{self.id, @errorName(err)});
self.logif("Attempting to reconnect...\n", .{}); self.logif("Attempting to reconnect...", .{});
// reconnect // reconnect
self.reconnect() catch unreachable; self.reconnect() catch unreachable;
} }
@ -420,7 +450,7 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) Close
.reason = reason, //[]const u8 .reason = reason, //[]const u8
}) catch { }) catch {
// log that the connection died, but don't stop the bot // log that the connection died, but don't stop the bot
self.logif("Shard {d} closed with error: {s} # {d}\n", .{self.id, reason, @intFromEnum(code)}); self.logif("Shard {d} closed with error: {s} # {d}", .{self.id, reason, @intFromEnum(code)});
}; };
} }