forked from yuzucchii/discord.zig
added zlib + fixed jitter
This commit is contained in:
parent
2e353870c1
commit
d5e0c93867
118
src/discord.zig
118
src/discord.zig
@ -14,6 +14,7 @@ const zmpl = @import("zmpl");
|
|||||||
|
|
||||||
const Discord = @import("raw_types.zig");
|
const Discord = @import("raw_types.zig");
|
||||||
|
|
||||||
|
const debug = std.log.scoped(.@"discord.zig");
|
||||||
const Self = @This();
|
const Self = @This();
|
||||||
|
|
||||||
const GatewayPayload = Discord.GatewayPayload;
|
const GatewayPayload = Discord.GatewayPayload;
|
||||||
@ -226,9 +227,12 @@ heart: Heart = .{ .heartbeatInterval = 45000, .ack = false, .lastBeat = 0 },
|
|||||||
|
|
||||||
///
|
///
|
||||||
handler: GatewayDispatchEvent,
|
handler: GatewayDispatchEvent,
|
||||||
|
packets: std.ArrayList(u8),
|
||||||
|
inflator: zlib.Decompressor,
|
||||||
|
|
||||||
///useful for closing the conn
|
///useful for closing the conn
|
||||||
mutex: std.Thread.Mutex = .{},
|
mutex: std.Thread.Mutex = .{},
|
||||||
|
log: Log = .no,
|
||||||
|
|
||||||
fn parseJson(self: *Self, raw: []const u8) !zmpl.Data {
|
fn parseJson(self: *Self, raw: []const u8) !zmpl.Data {
|
||||||
var data = zmpl.Data.init(self.allocator);
|
var data = zmpl.Data.init(self.allocator);
|
||||||
@ -236,10 +240,6 @@ fn parseJson(self: *Self, raw: []const u8) !zmpl.Data {
|
|||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
inline fn jitter() i1 {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub inline fn resumable(self: *Self) bool {
|
pub inline fn resumable(self: *Self) bool {
|
||||||
return self.resume_gateway_url != null and
|
return self.resume_gateway_url != null and
|
||||||
self.session_id != null and
|
self.session_id != null and
|
||||||
@ -267,7 +267,7 @@ inline fn gateway_url(self: ?*Self) []const u8 {
|
|||||||
|
|
||||||
// identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps
|
// identifies in order to connect to Discord and get the online status, this shall be done on hello perhaps
|
||||||
fn identify(self: *Self) !void {
|
fn identify(self: *Self) !void {
|
||||||
std.debug.print("intents: {d}", .{self.intents.toRaw()});
|
self.logif("intents: {d}", .{self.intents.toRaw()});
|
||||||
const data = .{
|
const data = .{
|
||||||
.op = @intFromEnum(Opcode.Identify),
|
.op = @intFromEnum(Opcode.Identify),
|
||||||
.d = .{
|
.d = .{
|
||||||
@ -282,11 +282,14 @@ fn identify(self: *Self) !void {
|
|||||||
try self.send(data);
|
try self.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const Log = union(enum) { yes, no };
|
||||||
|
|
||||||
// asks /gateway/bot initializes both the ws client and the http client
|
// asks /gateway/bot initializes both the ws client and the http client
|
||||||
pub fn init(allocator: mem.Allocator, args: struct {
|
pub fn init(allocator: mem.Allocator, args: struct {
|
||||||
token: []const u8,
|
token: []const u8,
|
||||||
intents: Intents,
|
intents: Intents,
|
||||||
run: GatewayDispatchEvent,
|
run: GatewayDispatchEvent,
|
||||||
|
log: Log,
|
||||||
}) !Self {
|
}) !Self {
|
||||||
var req = FetchReq.init(allocator, args.token);
|
var req = FetchReq.init(allocator, args.token);
|
||||||
defer req.deinit();
|
defer req.deinit();
|
||||||
@ -314,6 +317,9 @@ pub fn init(allocator: mem.Allocator, args: struct {
|
|||||||
.sequence = 0,
|
.sequence = 0,
|
||||||
.info = parsed.value,
|
.info = parsed.value,
|
||||||
.handler = args.run,
|
.handler = args.run,
|
||||||
|
.log = args.log,
|
||||||
|
.packets = std.ArrayList(u8).init(allocator),
|
||||||
|
.inflator = try zlib.Decompressor.init(allocator, .{ .header = .zlib_or_gzip }),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -324,7 +330,7 @@ inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client {
|
|||||||
.host = url,
|
.host = url,
|
||||||
});
|
});
|
||||||
|
|
||||||
conn.handshake("/?v=10&encoding=json", .{
|
conn.handshake("/?v=10&encoding=json&compress=zlib-stream", .{
|
||||||
.timeout_ms = 1000,
|
.timeout_ms = 1000,
|
||||||
.headers = "host: gateway.discord.gg",
|
.headers = "host: gateway.discord.gg",
|
||||||
}) catch unreachable;
|
}) catch unreachable;
|
||||||
@ -334,7 +340,20 @@ inline fn _connect_ws(allocator: mem.Allocator, url: []const u8) !ws.Client {
|
|||||||
|
|
||||||
pub fn deinit(self: *Self) void {
|
pub fn deinit(self: *Self) void {
|
||||||
self.client.deinit();
|
self.client.deinit();
|
||||||
std.debug.print("killing the whole bot\n", .{});
|
self.logif("killing the whole bot\n", .{});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ensureCompressed(data: []const u8, comptime pattern: []const u8) bool {
|
||||||
|
if (data.len < pattern.len) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
const start_index: usize = data.len - pattern.len;
|
||||||
|
|
||||||
|
for (0..pattern.len) |i| {
|
||||||
|
if (data[start_index + i] != pattern[i]) return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// listens for messages
|
// listens for messages
|
||||||
@ -342,13 +361,21 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
try self.client.readTimeout(0);
|
try self.client.readTimeout(0);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
const msg = (try self.client.read()) orelse {
|
const msg = (try self.client.read()) orelse
|
||||||
std.debug.print(".", .{});
|
|
||||||
continue;
|
continue;
|
||||||
};
|
|
||||||
|
|
||||||
defer self.client.done(msg);
|
defer self.client.done(msg);
|
||||||
|
|
||||||
|
// self.logif("received: {?s}\n", .{msg.data});
|
||||||
|
try self.packets.appendSlice(msg.data);
|
||||||
|
|
||||||
|
if (!Self.ensureCompressed(msg.data, &[4]u8{ 0x00, 0x00, 0xFF, 0xFF }))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// self.logif("{b}\n", .{self.packets.items});
|
||||||
|
const buf = try self.packets.toOwnedSlice();
|
||||||
|
const decompressed = try self.inflator.decompressAllAlloc(buf);
|
||||||
|
|
||||||
const raw = try json.parseFromSlice(struct {
|
const raw = try json.parseFromSlice(struct {
|
||||||
/// opcode for the payload
|
/// opcode for the payload
|
||||||
op: isize,
|
op: isize,
|
||||||
@ -358,17 +385,15 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
s: ?i64,
|
s: ?i64,
|
||||||
/// The event name for this payload
|
/// The event name for this payload
|
||||||
t: ?[]const u8,
|
t: ?[]const u8,
|
||||||
}, self.allocator, msg.data, .{});
|
}, self.allocator, decompressed, .{});
|
||||||
|
|
||||||
const payload = raw.value;
|
const payload = raw.value;
|
||||||
|
|
||||||
//std.debug.print("received: {?s} with content {?s}\n", .{ payload.t, msg.data });
|
|
||||||
|
|
||||||
switch (@as(Opcode, @enumFromInt(payload.op))) {
|
switch (@as(Opcode, @enumFromInt(payload.op))) {
|
||||||
Opcode.Dispatch => {
|
Opcode.Dispatch => {
|
||||||
self.setSequence(payload.s orelse 0);
|
self.setSequence(payload.s orelse 0);
|
||||||
// maybe use threads and call it instead from there
|
// maybe use threads and call it instead from there
|
||||||
if (payload.t) |name| try self.handleEvent(name, msg.data);
|
if (payload.t) |name| try self.handleEvent(name, decompressed);
|
||||||
},
|
},
|
||||||
Opcode.Hello => {
|
Opcode.Hello => {
|
||||||
{
|
{
|
||||||
@ -385,11 +410,14 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
.lastBeat = 0,
|
.lastBeat = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
std.debug.print("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval});
|
self.logif("starting heart beater. seconds:{d}...\n", .{self.heart.heartbeatInterval});
|
||||||
|
|
||||||
try self.heartbeat();
|
try self.heartbeat();
|
||||||
|
|
||||||
const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{self});
|
var prng = std.Random.DefaultPrng.init(0);
|
||||||
|
const jitter = std.Random.float(prng.random(), f64);
|
||||||
|
|
||||||
|
const thread = try std.Thread.spawn(.{}, Self.heartbeat_wait, .{ self, jitter });
|
||||||
thread.detach();
|
thread.detach();
|
||||||
|
|
||||||
if (self.resumable()) {
|
if (self.resumable()) {
|
||||||
@ -402,7 +430,7 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
},
|
},
|
||||||
Opcode.HeartbeatACK => {
|
Opcode.HeartbeatACK => {
|
||||||
// perhaps this needs a mutex?
|
// perhaps this needs a mutex?
|
||||||
std.debug.print("got heartbeat ack\n", .{});
|
self.logif("got heartbeat ack\n", .{});
|
||||||
|
|
||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
@ -410,11 +438,11 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
self.heart.ack = true;
|
self.heart.ack = true;
|
||||||
},
|
},
|
||||||
Opcode.Heartbeat => {
|
Opcode.Heartbeat => {
|
||||||
std.debug.print("sending requested heartbeat\n", .{});
|
self.logif("sending requested heartbeat\n", .{});
|
||||||
try self.heartbeat();
|
try self.heartbeat();
|
||||||
},
|
},
|
||||||
Opcode.Reconnect => {
|
Opcode.Reconnect => {
|
||||||
std.debug.print("reconnecting\n", .{});
|
self.logif("reconnecting\n", .{});
|
||||||
try self.reconnect();
|
try self.reconnect();
|
||||||
},
|
},
|
||||||
Opcode.Resume => {
|
Opcode.Resume => {
|
||||||
@ -433,7 +461,7 @@ pub fn readMessage(self: *Self, _: anytype) !void {
|
|||||||
},
|
},
|
||||||
Opcode.InvalidSession => {},
|
Opcode.InvalidSession => {},
|
||||||
else => {
|
else => {
|
||||||
std.debug.print("Unhandled {d} -- {s}", .{ payload.op, "none" });
|
self.logif("Unhandled {d} -- {s}", .{ payload.op, "none" });
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -445,25 +473,28 @@ pub fn heartbeat(self: *Self) !void {
|
|||||||
try self.send(data);
|
try self.send(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn heartbeat_wait(self: *Self) !void {
|
pub fn heartbeat_wait(self: *Self, jitter: f64) !void {
|
||||||
while (true) {
|
if (jitter == 1.0) {
|
||||||
std.debug.print("zzz for {d}\n", .{self.heart.heartbeatInterval});
|
self.logif("zzz for {d}\n", .{self.heart.heartbeatInterval});
|
||||||
std.Thread.sleep(@as(u64, @intCast(std.time.ns_per_ms * self.heart.heartbeatInterval)));
|
std.Thread.sleep(std.time.ns_per_ms * self.heart.heartbeatInterval);
|
||||||
|
} else {
|
||||||
std.debug.print(">> ♥ and ack received: {}\n", .{self.heart.ack});
|
const timeout = @as(f64, @floatFromInt(self.heart.heartbeatInterval)) * jitter;
|
||||||
|
self.logif("zzz for {d} and jitter {d}\n", .{ @as(u64, @intFromFloat(timeout)), jitter });
|
||||||
self.mutex.lock();
|
std.Thread.sleep(std.time.ns_per_ms * @as(u64, @intFromFloat(timeout)));
|
||||||
defer self.mutex.unlock();
|
|
||||||
|
|
||||||
if (self.heart.ack == true) {
|
|
||||||
std.debug.print("sending unrequested heartbeat\n", .{});
|
|
||||||
self.heartbeat() catch unreachable;
|
|
||||||
try self.client.readTimeout(1000);
|
|
||||||
} else {
|
|
||||||
self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable;
|
|
||||||
@panic("zombied conn\n");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
self.logif(">> ♥ and ack received: {}\n", .{self.heart.ack});
|
||||||
|
|
||||||
|
if (self.heart.ack) {
|
||||||
|
self.logif("sending unrequested heartbeat\n", .{});
|
||||||
|
try self.heartbeat();
|
||||||
|
try self.client.readTimeout(1000);
|
||||||
|
} else {
|
||||||
|
self.close(ShardSocketCloseCodes.ZombiedConnection, "Zombied connection") catch unreachable;
|
||||||
|
@panic("zombied conn\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
return heartbeat_wait(self, 1.0);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub inline fn reconnect(self: *Self) !void {
|
pub inline fn reconnect(self: *Self) !void {
|
||||||
@ -489,7 +520,7 @@ pub fn close(self: *Self, code: ShardSocketCloseCodes, reason: []const u8) !void
|
|||||||
self.mutex.lock();
|
self.mutex.lock();
|
||||||
defer self.mutex.unlock();
|
defer self.mutex.unlock();
|
||||||
|
|
||||||
std.debug.print("cooked closing ws conn...\n", .{});
|
self.logif("cooked closing ws conn...\n", .{});
|
||||||
// Implement reconnection logic here
|
// Implement reconnection logic here
|
||||||
try self.client.close(.{
|
try self.client.close(.{
|
||||||
.code = @intFromEnum(code), //u16
|
.code = @intFromEnum(code), //u16
|
||||||
@ -503,7 +534,7 @@ pub fn send(self: *Self, data: anytype) !void {
|
|||||||
var string = std.ArrayList(u8).init(fba.allocator());
|
var string = std.ArrayList(u8).init(fba.allocator());
|
||||||
try std.json.stringify(data, .{}, string.writer());
|
try std.json.stringify(data, .{}, string.writer());
|
||||||
|
|
||||||
//std.debug.print("{s}\n", .{string.items});
|
//self.logif("{s}\n", .{string.items});
|
||||||
|
|
||||||
try self.client.write(string.items);
|
try self.client.write(string.items);
|
||||||
}
|
}
|
||||||
@ -645,3 +676,10 @@ pub fn handleEvent(self: *Self, name: []const u8, payload: []const u8) !void {
|
|||||||
@call(.auto, self.handler.message_create, .{m});
|
@call(.auto, self.handler.message_create, .{m});
|
||||||
} else {}
|
} else {}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline fn logif(self: *Self, comptime format: []const u8, args: anytype) void {
|
||||||
|
switch (self.log) {
|
||||||
|
.yes => debug.info(format, args),
|
||||||
|
.no => {},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ pub fn main() !void {
|
|||||||
.token = TOKEN,
|
.token = TOKEN,
|
||||||
.intents = Intents.fromRaw(37379),
|
.intents = Intents.fromRaw(37379),
|
||||||
.run = Session.GatewayDispatchEvent{ .message_create = &message_create },
|
.run = Session.GatewayDispatchEvent{ .message_create = &message_create },
|
||||||
|
.log = .yes,
|
||||||
});
|
});
|
||||||
errdefer handler.deinit();
|
errdefer handler.deinit();
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user