From 8526c158a65160c0cc0c37812e20a411f2b6555c Mon Sep 17 00:00:00 2001 From: rainfall Date: Thu, 31 Oct 2024 23:33:01 -0500 Subject: [PATCH] use my own forks --- 1 | 18 ------ build.zig | 2 +- lib/websocket.zig | 2 +- lib/zig-tls12 | 2 +- src/discord.zig | 43 +++++-------- src/scheduler.zig | 156 ---------------------------------------------- 6 files changed, 19 insertions(+), 204 deletions(-) delete mode 100644 1 delete mode 100644 src/scheduler.zig diff --git a/1 b/1 deleted file mode 100644 index c3726d5..0000000 --- a/1 +++ /dev/null @@ -1,18 +0,0 @@ -const Session = @import("discord.zig"); -const std = @import("std"); - -const TOKEN = "Bot MTI5ODgzOTgzMDY3OTEzMDE4OA.GNojts.iyblGKK0xTWU57QCG5n3hr2Be1whyylTGr44P0"; - -pub fn main() !void { - var gpa = std.heap.GeneralPurposeAllocator(.{}){}; - defer if (gpa.deinit() == .leak) { - std.log.warn("Has leaked\n", .{}); - }; - const alloc = gpa.allocator(); - - var handler = try Session.init(alloc, .{ .token = TOKEN, .intents = Session.Intents.fromRaw(513) }); - errdefer handler.deinit(); - - const t = try std.Thread.spawn(.{}, Session.readMessage, .{&handler}); - defer t.join(); -} diff --git a/build.zig b/build.zig index 76ec6a9..0cbf4b4 100644 --- a/build.zig +++ b/build.zig @@ -35,7 +35,7 @@ pub fn build(b: *std.Build) void { }); const zig_tls_http = b.createModule(.{ - .root_source_file = b.path("lib/zig-tls12/src/HttpClient.zig"), + .root_source_file = b.path("lib/zig-tls12/src/entry.zig"), .target = target, .optimize = optimize, }); diff --git a/lib/websocket.zig b/lib/websocket.zig index d8561ca..e93fa52 160000 --- a/lib/websocket.zig +++ b/lib/websocket.zig @@ -1 +1 @@ -Subproject commit d8561ca98eca4d904ac9383a7f30b2360bed4d3c +Subproject commit e93fa527f1d4deaeda5f74f51efaf7ec60cab396 diff --git a/lib/zig-tls12 b/lib/zig-tls12 index f2cbb84..56153d0 160000 --- a/lib/zig-tls12 +++ b/lib/zig-tls12 @@ -1 +1 @@ -Subproject commit f2cbb846f8a98cb5e19c8476a8e6cf3b9bbcdb0c +Subproject commit 56153d0f9f4551c7031b3b536845e90d23250d01 diff --git a/src/discord.zig b/src/discord.zig index 73afbde..546ba4a 100644 --- a/src/discord.zig +++ b/src/discord.zig @@ -4,12 +4,10 @@ const mem = std.mem; const http = std.http; const ws = @import("ws"); const builtin = @import("builtin"); -const HttpClient = @import("tls12"); +const HttpClient = @import("tls12").HttpClient; const net = std.net; const crypto = std.crypto; const tls = std.crypto.tls; -//const TlsClient = @import("tls12").TlsClient; -//const Certificate = @import("tls12").Certificate; // todo use this to read compressed messages const zlib = @import("zlib"); @@ -287,7 +285,6 @@ inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client { .tls = true, // important: zig.http doesn't support this, type shit .port = 443, .host = url, - //.ca_bundle = @import("tls12").Certificate.Bundle{}, }); conn.handshake("/?v=10&encoding=json", .{ @@ -308,15 +305,15 @@ pub fn readMessage(self: *Self) !void { try self.client.readTimeout(0); while (true) { - if (!self.rwSemaphore.tryLockShared()) { - // writer might be writing - std.debug.print("YIELDING THREAD\n", .{}); - try std.Thread.yield(); - continue; - } - defer self.rwSemaphore.unlockShared(); + //if (!self.rwSemaphore.tryLockShared()) { + //std.debug.print("YIELDING THREAD\n", .{}); + //try std.Thread.yield(); + //continue; + //} + //defer self.rwSemaphore.unlockShared(); - const msg = self.read() orelse { + const msg = (try self.client.read()) orelse { + std.debug.print(".", .{}); continue; }; @@ -374,15 +371,12 @@ pub fn readMessage(self: *Self) !void { } }, Opcode.HeartbeatACK => { - self.mutex.lock(); - defer self.mutex.unlock(); // perhaps this needs a mutex? std.debug.print("got heartbeat ack\n", .{}); heart.ack = true; }, Opcode.Heartbeat => { - self.mutex.lock(); - defer self.mutex.unlock(); + std.debug.print("sending requested heartbeat\n", .{}); try self.heartbeat(); }, Opcode.Reconnect => { @@ -419,13 +413,15 @@ pub fn heartbeat_wait(self: *Self) !void { std.debug.print("zzz for {d}\n", .{heart.heartbeatInterval}); std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * heart.heartbeatInterval))); - self.rwSemaphore.lock(); - defer self.rwSemaphore.unlock(); + //self.rwSemaphore.lock(); + //defer self.rwSemaphore.unlock(); std.debug.print(">> ♥ and ack received: {}\n", .{heart.ack}); if (heart.ack == true) { + std.debug.print("sending unrequested heartbeat\n", .{}); self.heartbeat() catch unreachable; + try self.client.readTimeout(1000); } else { self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable; @panic("zombied conn\n"); @@ -463,21 +459,14 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void }); } -pub fn read(self: *Self) ?ws.proto.Message { - const msg = self.client.read() catch |err| switch (err) { - error.Closed => return null, - else => return null, - } orelse unreachable; - - return msg; -} - pub fn send(self: *Self, data: anytype) !void { var buf: [1000]u8 = undefined; var fba = std.heap.FixedBufferAllocator.init(&buf); var string = std.ArrayList(u8).init(fba.allocator()); try std.json.stringify(data, .{}, string.writer()); + std.debug.print("{s}\n", .{string.items}); + try self.client.write(string.items); } diff --git a/src/scheduler.zig b/src/scheduler.zig deleted file mode 100644 index ce351ad..0000000 --- a/src/scheduler.zig +++ /dev/null @@ -1,156 +0,0 @@ -const std = @import("std"); - -const Thread = std.Thread; -const Allocator = std.mem.Allocator; - -fn Job(comptime T: type) type { - return struct { - at: i64, - task: T, - }; -} - -pub fn Scheduler(comptime T: type, comptime C: type) type { - return struct { - queue: Q, - running: bool, - mutex: Thread.Mutex, - cond: Thread.Condition, - thread: ?Thread, - - const Q = std.PriorityQueue(Job(T), void, compare); - - fn compare(_: void, a: Job(T), b: Job(T)) std.math.Order { - return std.math.order(a.at, b.at); - } - - const Self = @This(); - - pub fn init(allocator: Allocator) Self { - return .{ - .cond = .{}, - .mutex = .{}, - .thread = null, - .running = false, - .queue = Q.init(allocator, {}), - }; - } - - pub fn deinit(self: *Self) void { - self.stop(); - self.queue.deinit(); - } - - pub fn start(self: *Self, ctx: C) !void { - { - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.running == true) { - return error.AlreadyRunning; - } - self.running = true; - } - self.thread = try Thread.spawn(.{}, Self.run, .{ self, ctx }); - } - - pub fn stop(self: *Self) void { - { - self.mutex.lock(); - defer self.mutex.unlock(); - if (self.running == false) { - return; - } - self.running = false; - } - - self.cond.signal(); - self.thread.?.join(); - } - - pub fn scheduleIn(self: *Self, task: T, ms: i64) !void { - return self.schedule(task, std.time.milliTimestamp() + ms); - } - - pub fn schedule(self: *Self, task: T, at: i64) !void { - const job: Job(T) = .{ - .at = at, - .task = task, - }; - - var reschedule = false; - { - self.mutex.lock(); - defer self.mutex.unlock(); - - if (self.queue.peek()) |*next| { - if (at < next.at) { - reschedule = true; - } - } else { - reschedule = true; - } - try self.queue.add(job); - } - - if (reschedule) { - // Our new job is scheduled before our previous earlier job - // (or we had no previous jobs) - // We need to reset our schedule - self.cond.signal(); - } - } - - // this is running in a separate thread, started by start() - fn run(self: *Self, ctx: C) void { - self.mutex.lock(); - - while (true) { - const ms_until_next = self.processPending(ctx); - - // mutex is locked when returning for processPending - - if (self.running == false) { - self.mutex.unlock(); - return; - } - - if (ms_until_next) |timeout| { - const ns = @as(u64, @intCast(timeout * std.time.ns_per_ms)); - self.cond.timedWait(&self.mutex, ns) catch |err| { - std.debug.assert(err == error.Timeout); - // on success or error, cond locks mutex, which is what we want - }; - } else { - self.cond.wait(&self.mutex); - } - // if we woke up, it's because a new job was added with a more recent - // scheduled time. This new job MAY not be ready to run yet, and - // it's even possible for our cond variable to wake up randomly (as per - // the docs), but processPending is defensive and will check this for us. - } - } - - // we enter this function with mutex locked - // and we exit this function with the mutex locked - // importantly, we don't lock the mutex will process the task - fn processPending(self: *Self, ctx: C) ?i64 { - while (true) { - const next = self.queue.peek() orelse { - // yes, we must return this function with a locked mutex - return null; - }; - const seconds_until_next = next.at - std.time.milliTimestamp(); - if (seconds_until_next > 0) { - // this job isn't ready, yes, the mutex should remain locked! - return seconds_until_next; - } - - // delete the peeked job from the queue, because we're going to process it - const job = self.queue.remove(); - self.mutex.unlock(); - defer self.mutex.lock(); - job.task.run(ctx, next.at); - } - } - }; -}