working on cache

This commit is contained in:
Yuzu 2025-04-13 15:34:39 -05:00
parent 21cf72449f
commit 4f1d1f7d69
8 changed files with 3310 additions and 3205 deletions

View File

@ -1,60 +1,80 @@
const std = @import("std"); const std = @import("std");
/// defaults are to be overridden by the end user
/// otherwise, simply do TableTemplate{}
/// this is a template for the cache tables
pub const TableTemplate = struct {
comptime User: type = @import("./config.zig").StoredUser,
comptime Guild: type = @import("./config.zig").StoredGuild,
comptime Channel: type = @import("./config.zig").StoredChannel,
comptime Emoji: type = @import("./config.zig").StoredEmoji,
comptime Message: type = @import("./config.zig").StoredMessage,
comptime Role: type = @import("./config.zig").StoredRole,
comptime Sticker: type = @import("./config.zig").StoredSticker,
comptime Reaction: type = @import("./config.zig").StoredReaction,
comptime Member: type = @import("./config.zig").StoredMember,
comptime Thread: type = @import("./config.zig").StoredChannel,
};
// by default this caches everything // by default this caches everything
// therefore we'll allow custom cache tables // therefore we'll allow custom cache tables
pub const CacheTables = struct { pub fn CacheTables(comptime Table: TableTemplate) type {
const Snowflake = @import("./structures/snowflake.zig").Snowflake; return struct {
const Types = @import("./structures/types.zig"); const Snowflake = @import("./structures/snowflake.zig").Snowflake;
const StoredUser = @import("./config.zig").StoredUser; const StoredUser: type = Table.User;
const StoredGuild = @import("./config.zig").StoredGuild; const StoredGuild: type = Table.Guild;
const StoredChannel = @import("./config.zig").StoredChannel; const StoredChannel: type = Table.Channel;
const StoredEmoji = @import("./config.zig").StoredEmoji; const StoredEmoji: type = Table.Emoji;
const StoredMessage = @import("./config.zig").StoredMessage; const StoredMessage: type = Table.Message;
const StoredRole = @import("./config.zig").StoredRole; const StoredRole: type = Table.Role;
const StoredSticker = @import("./config.zig").StoredSticker; const StoredSticker: type = Table.Sticker;
const StoredReaction = @import("./config.zig").StoredReaction; const StoredReaction: type = Table.Reaction;
const StoredMember = @import("./config.zig").StoredMember; const StoredMember: type = Table.Member;
const StoredThread: type = Table.Thread;
users: CacheLike(Snowflake, StoredUser), users: CacheLike(Snowflake, StoredUser),
guilds: CacheLike(Snowflake, StoredGuild), guilds: CacheLike(Snowflake, StoredGuild),
channels: CacheLike(Snowflake, StoredChannel), channels: CacheLike(Snowflake, StoredChannel),
emojis: CacheLike(Snowflake, StoredEmoji), emojis: CacheLike(Snowflake, StoredEmoji),
messages: CacheLike(Snowflake, StoredMessage), messages: CacheLike(Snowflake, StoredMessage),
roles: CacheLike(Snowflake, StoredRole), roles: CacheLike(Snowflake, StoredRole),
stickers: CacheLike(Snowflake, StoredSticker), stickers: CacheLike(Snowflake, StoredSticker),
reactions: CacheLike(Snowflake, StoredReaction), reactions: CacheLike(Snowflake, StoredReaction),
members: CacheLike(Snowflake, StoredMember), members: CacheLike(Snowflake, StoredMember),
threads: CacheLike(Snowflake, StoredChannel), threads: CacheLike(Snowflake, StoredThread),
/// you can customize with your own cache /// you can customize with your own cache
pub fn defaults(allocator: std.mem.Allocator) CacheTables { pub fn defaults(allocator: std.mem.Allocator) CacheTables(Table) {
var users = DefaultCache(Snowflake, StoredUser).init(allocator); var users = DefaultCache(Snowflake, StoredUser).init(allocator);
var guilds = DefaultCache(Snowflake, StoredGuild).init(allocator); var guilds = DefaultCache(Snowflake, StoredGuild).init(allocator);
var channels = DefaultCache(Snowflake, StoredChannel).init(allocator); var channels = DefaultCache(Snowflake, StoredChannel).init(allocator);
var emojis = DefaultCache(Snowflake, StoredEmoji).init(allocator); var emojis = DefaultCache(Snowflake, StoredEmoji).init(allocator);
var messages = DefaultCache(Snowflake, StoredMessage).init(allocator); var messages = DefaultCache(Snowflake, StoredMessage).init(allocator);
var roles = DefaultCache(Snowflake, StoredRole).init(allocator); var roles = DefaultCache(Snowflake, StoredRole).init(allocator);
var stickers = DefaultCache(Snowflake, StoredSticker).init(allocator); var stickers = DefaultCache(Snowflake, StoredSticker).init(allocator);
var reactions = DefaultCache(Snowflake, StoredReaction).init(allocator); var reactions = DefaultCache(Snowflake, StoredReaction).init(allocator);
var members = DefaultCache(Snowflake, StoredMember).init(allocator); var members = DefaultCache(Snowflake, StoredMember).init(allocator);
var threads = DefaultCache(Snowflake, StoredChannel).init(allocator); var threads = DefaultCache(Snowflake, StoredChannel).init(allocator);
return .{ return .{
.users = users.cache(), .users = users.cache(),
.guilds = guilds.cache(), .guilds = guilds.cache(),
.channels = channels.cache(), .channels = channels.cache(),
.emojis = emojis.cache(), .emojis = emojis.cache(),
.messages = messages.cache(), .messages = messages.cache(),
.roles = roles.cache(), .roles = roles.cache(),
.stickers = stickers.cache(), .stickers = stickers.cache(),
.reactions = reactions.cache(), .reactions = reactions.cache(),
.members = members.cache(), .members = members.cache(),
.threads = threads.cache(), .threads = threads.cache(),
}; };
} }
}; };
}
/// this is an extensible cache, you may implement your own
/// I recommend "zigache" to be used
pub fn CacheLike(comptime K: type, comptime V: type) type { pub fn CacheLike(comptime K: type, comptime V: type) type {
return struct { return struct {
ptr: *anyopaque, ptr: *anyopaque,
@ -65,7 +85,7 @@ pub fn CacheLike(comptime K: type, comptime V: type) type {
countFn: *const fn(*anyopaque) usize, countFn: *const fn(*anyopaque) usize,
pub fn put(self: CacheLike(K, V), key: K, value: V) !void { pub fn put(self: CacheLike(K, V), key: K, value: V) !void {
self.putFn(self.ptr, key, value); return self.putFn(self.ptr, key, value);
} }
pub fn get(self: CacheLike(K, V), key: K) ?V { pub fn get(self: CacheLike(K, V), key: K) ?V {
@ -131,10 +151,6 @@ pub fn CacheLike(comptime K: type, comptime V: type) type {
}; };
} }
// make a cache that uses a hash map
// must have putFn, getFn, removeFn, etc
// must have a cache() function to return the interface
pub fn DefaultCache(comptime K: type, comptime V: type) type { pub fn DefaultCache(comptime K: type, comptime V: type) type {
return struct { return struct {
const Self = @This(); const Self = @This();

View File

@ -25,6 +25,31 @@ pub const StoredUser = struct {
banner: ?[]const u8 = null, banner: ?[]const u8 = null,
avatar_decoration_data: ?AvatarDecorationData = null, avatar_decoration_data: ?AvatarDecorationData = null,
clan: ?[]const u8 = null, clan: ?[]const u8 = null,
const Self = @This();
pub fn transform(default: @import("./structures/user.zig").User) Self {
return .{
.username = default.username,
.global_name = default.global_name,
.locale = default.locale,
.flags = default.flags,
.premium_type = default.premium_type,
.public_flags = default.public_flags,
.accent_color = default.accent_color,
.id = default.id,
.discriminator = default.discriminator,
.avatar = default.avatar,
.bot = default.bot,
.system = default.system,
.mfa_enabled = default.mfa_enabled,
.verified = default.verified,
.email = default.email,
.banner = default.banner,
.avatar_decoration_data = default.avatar_decoration_data,
.clan = default.clan,
};
}
}; };
const VerificationLevels = @import("./structures/shared.zig").VerificationLevels; const VerificationLevels = @import("./structures/shared.zig").VerificationLevels;

View File

@ -1,253 +0,0 @@
//! ISC License
//!
//! Copyright (c) 2024-2025 Yuzu
//!
//! Permission to use, copy, modify, and/or distribute this software for any
//! purpose with or without fee is hereby granted, provided that the above
//! copyright notice and this permission notice appear in all copies.
//!
//! THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
//! REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
//! AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
//! INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
//! LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
//! OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
//! PERFORMANCE OF THIS SOFTWARE.
const Intents = @import("./structures/types.zig").Intents;
const Snowflake = @import("./structures/snowflake.zig").Snowflake;
const GatewayBotInfo = @import("internal.zig").GatewayBotInfo;
const IdentifyProperties = @import("internal.zig").IdentifyProperties;
const ShardDetails = @import("internal.zig").ShardDetails;
const ConnectQueue = @import("internal.zig").ConnectQueue;
const GatewayDispatchEvent = @import("internal.zig").GatewayDispatchEvent;
const Log = @import("internal.zig").Log;
const Shard = @import("shard.zig");
const std = @import("std");
const mem = std.mem;
const debug = @import("internal.zig").debug;
const Self = @This();
shard_details: ShardDetails,
allocator: mem.Allocator,
/// Queue for managing shard connections
connect_queue: ConnectQueue(Shard),
shards: std.AutoArrayHashMap(usize, Shard),
handler: GatewayDispatchEvent(*Shard),
/// where we dispatch work for every thread, threads must be spawned upon shard creation
/// make sure the address of workers is stable
workers: std.Thread.Pool = undefined,
/// configuration settings
options: SessionOptions,
log: Log,
cache: @import("cache.zig").CacheTables,
pub const ShardData = struct {
/// resume seq to resume connections
resume_seq: ?usize,
/// resume_gateway_url is the url to resume the connection
/// https://discord.com/developers/docs/topics/gateway#ready-event
resume_gateway_url: ?[]const u8,
/// session_id is the unique session id of the gateway
session_id: ?[]const u8,
};
pub const SessionOptions = struct {
/// Important data which is used by the manager to connect shards to the gateway. */
info: GatewayBotInfo,
/// Delay in milliseconds to wait before spawning next shard. OPTIMAL IS ABOVE 5100. YOU DON'T WANT TO HIT THE RATE LIMIT!!!
spawn_shard_delay: ?u64 = 5300,
/// Total amount of shards your bot uses. Useful for zero-downtime updates or resharding.
total_shards: usize = 1,
shard_start: usize = 0,
shard_end: usize = 1,
/// The payload handlers for messages on the shard.
resharding: ?struct { interval: u64, percentage: usize } = null,
/// worker threads
workers_per_shard: usize = 1,
/// The shard lifespan in milliseconds. If a shard is not connected within this time, it will be closed.
shard_lifespan: ?u64 = null,
};
pub fn init(allocator: mem.Allocator, settings: struct {
token: []const u8,
intents: Intents,
options: SessionOptions,
run: GatewayDispatchEvent(*Shard),
log: Log,
cache: @import("cache.zig").CacheTables,
}) mem.Allocator.Error!Self {
const concurrency = settings.options.info.session_start_limit.?.max_concurrency;
return .{
.allocator = allocator,
.connect_queue = try ConnectQueue(Shard).init(allocator, concurrency, 5000),
.shards = .init(allocator),
.workers = undefined,
.shard_details = ShardDetails{
.token = settings.token,
.intents = settings.intents,
},
.handler = settings.run,
.options = .{
.info = .{
.url = settings.options.info.url,
.shards = settings.options.info.shards,
.session_start_limit = settings.options.info.session_start_limit,
},
.total_shards = settings.options.total_shards,
.shard_start = settings.options.shard_start,
.shard_end = settings.options.shard_end,
.workers_per_shard = settings.options.workers_per_shard,
},
.log = settings.log,
.cache = settings.cache,
};
}
pub fn deinit(self: *Self) void {
self.connect_queue.deinit();
self.shards.deinit();
}
pub fn forceIdentify(self: *Self, shard_id: usize) !void {
self.logif("#{d} force identify", .{shard_id});
const shard = try self.create(shard_id);
return shard.identify(null);
}
pub fn disconnect(self: *Self, shard_id: usize) Shard.CloseError!void {
return if (self.shards.get(shard_id)) |shard| shard.disconnect();
}
pub fn disconnectAll(self: *Self) Shard.CloseError!void {
while (self.shards.iterator().next()) |shard| shard.value_ptr.disconnect();
}
/// spawn buckets in order
/// Log bucket preparation
/// Divide shards into chunks based on concurrency
/// Assign each shard to a bucket
/// Return list of buckets
/// https://discord.com/developers/docs/events/gateway#sharding-max-concurrency
fn spawnBuckets(self: *Self) ![][]Shard {
const concurrency = self.options.info.session_start_limit.?.max_concurrency;
self.logif("{d}-{d}", .{ self.options.shard_start, self.options.shard_end });
const range = std.math.sub(usize, self.options.shard_start, self.options.shard_end) catch 1;
const bucket_count = (range + concurrency - 1) / concurrency;
self.logif("#0 preparing buckets", .{});
const buckets = try self.allocator.alloc([]Shard, bucket_count);
for (buckets, 0..) |*bucket, i| {
const bucket_size = if ((i + 1) * concurrency > range) range - (i * concurrency) else concurrency;
bucket.* = try self.allocator.alloc(Shard, bucket_size);
for (bucket.*, 0..) |*shard, j| {
shard.* = try self.create(self.options.shard_start + i * concurrency + j);
}
}
self.logif("{d} buckets created", .{bucket_count});
// finally defihne threads
try self.workers.init(.{
.allocator = self.allocator,
.n_jobs = self.options.workers_per_shard * self.options.total_shards,
});
return buckets;
}
/// creates a shard and stores it
fn create(self: *Self, shard_id: usize) !Shard {
if (self.shards.get(shard_id)) |s| return s;
const shard: Shard = try .init(self.allocator, shard_id, self.options.total_shards, .{
.token = self.shard_details.token,
.intents = self.shard_details.intents,
.options = Shard.ShardOptions{
.info = self.options.info,
.ratelimit_options = .{},
},
.run = self.handler,
.log = self.log,
.cache = self.cache,
.sharder_pool = &self.workers,
});
try self.shards.put(shard_id, shard);
return shard;
}
pub fn resume_(self: *Self, shard_id: usize, shard_data: ShardData) void {
if (self.shards.contains(shard_id)) return error.CannotOverrideExistingShard;
const shard = self.create(shard_id);
shard.data = shard_data;
return self.connect_queue.push(.{
.shard = shard,
.callback = &callback,
});
}
fn callback(self: *ConnectQueue(Shard).RequestWithShard) anyerror!void {
try self.shard.connect();
}
pub fn spawnShards(self: *Self) !void {
const buckets = try self.spawnBuckets();
self.logif("Spawning shards", .{});
for (buckets) |bucket| {
for (bucket) |shard| {
self.logif("adding {d} to connect queue", .{shard.id});
try self.connect_queue.push(.{
.shard = shard,
.callback = &callback,
});
}
}
//self.startResharder();
}
pub fn send(self: *Self, shard_id: usize, data: anytype) Shard.SendError!void {
if (self.shards.get(shard_id)) |shard| try shard.send(data);
}
// SPEC OF THE RESHARDER:
// Class Self
//
// Method startResharder():
// If resharding interval is not set or shard bounds are not valid:
// Exit
// Set up periodic check for resharding:
// If new shards are required:
// Log resharding process
// Update options with new shard settings
// Disconnect old shards and clear them from manager
// Spawn shards again with updated configuration
//
inline fn logif(self: *Self, comptime format: []const u8, args: anytype) void {
switch (self.log) {
.yes => debug.info(format, args),
.no => {},
}
}

View File

@ -312,7 +312,7 @@ pub const CacheLike = @import("cache.zig").CacheLike;
pub const DefaultCache = @import("cache.zig").DefaultCache; pub const DefaultCache = @import("cache.zig").DefaultCache;
pub const Permissions = @import("extra/permissions.zig").Permissions; pub const Permissions = @import("extra/permissions.zig").Permissions;
pub const Shard = @import("shard.zig"); pub const Shard = @import("shard.zig").Shard;
pub const zjson = @compileError("Deprecated."); pub const zjson = @compileError("Deprecated.");
pub const Internal = @import("internal.zig"); pub const Internal = @import("internal.zig");
@ -320,8 +320,10 @@ const GatewayDispatchEvent = Internal.GatewayDispatchEvent;
const GatewayBotInfo = Internal.GatewayBotInfo; const GatewayBotInfo = Internal.GatewayBotInfo;
const Log = Internal.Log; const Log = Internal.Log;
pub const Sharder = @import("core.zig"); // sharder
const SessionOptions = Sharder.SessionOptions; pub const Sharder = @import("sharder.zig").ShardManager;
pub const cache = @import("cache.zig");
pub const FetchReq = @import("http.zig").FetchReq; pub const FetchReq = @import("http.zig").FetchReq;
pub const FileData = @import("http.zig").FileData; pub const FileData = @import("http.zig").FileData;
@ -331,67 +333,99 @@ const mem = std.mem;
const http = std.http; const http = std.http;
const json = std.json; const json = std.json;
const Self = @This(); pub fn CustomisedSession(comptime Table: cache.TableTemplate) type {
return struct {
const Self = @This();
allocator: mem.Allocator, allocator: mem.Allocator,
sharder: Sharder, sharder: Sharder(Table),
token: []const u8, token: []const u8,
pub fn init(allocator: mem.Allocator) Self { pub fn init(allocator: mem.Allocator) Self {
return .{ return .{
.allocator = allocator, .allocator = allocator,
.sharder = undefined, .sharder = undefined,
.token = undefined, .token = undefined,
};
}
pub fn deinit(self: *Self) void {
self.sharder.deinit();
}
pub fn start(self: *Self, settings: struct {
token: []const u8,
intents: Intents,
options: struct {
spawn_shard_delay: u64 = 5300,
total_shards: usize = 1,
shard_start: usize = 0,
shard_end: usize = 1,
},
run: GatewayDispatchEvent,
log: Log,
cache: cache.TableTemplate,
}) !void {
self.token = settings.token;
var req = FetchReq.init(self.allocator, settings.token);
defer req.deinit();
const res = try req.makeRequest(.GET, "/gateway/bot", null);
const body = try req.body.toOwnedSlice();
defer self.allocator.free(body);
// check status idk
if (res.status != http.Status.ok) {
@panic("we are cooked\n"); // check your token dumbass
}
const parsed = try json.parseFromSlice(GatewayBotInfo, self.allocator, body, .{});
defer parsed.deinit();
self.sharder = try Sharder(Table).init(self.allocator, .{
.token = settings.token,
.intents = settings.intents,
.run = settings.run,
.options = Sharder(Table).SessionOptions{
.info = parsed.value,
.shard_start = settings.options.shard_start,
.shard_end = @intCast(parsed.value.shards),
.total_shards = @intCast(parsed.value.shards),
.spawn_shard_delay = settings.options.spawn_shard_delay,
},
.log = settings.log,
.cache = settings.cache,
});
try self.sharder.spawnShards();
}
}; };
} }
pub fn deinit(self: *Self) void { // defaults
self.sharder.deinit(); const DefaultTable = cache.TableTemplate{};
pub const Session = CustomisedSession(DefaultTable);
pub fn init(allocator: mem.Allocator) Session {
return Session.init(allocator);
} }
pub fn start(self: *Self, settings: struct { pub fn deinit(self: *Session) void {
self.deinit();
}
pub fn start(self: *Session, settings: struct {
token: []const u8, token: []const u8,
intents: Self.Intents, intents: Intents,
options: struct { options: struct {
spawn_shard_delay: u64 = 5300, spawn_shard_delay: u64 = 5300,
total_shards: usize = 1, total_shards: usize = 1,
shard_start: usize = 0, shard_start: usize = 0,
shard_end: usize = 1, shard_end: usize = 1,
}, },
run: GatewayDispatchEvent(*Shard), run: GatewayDispatchEvent,
log: Log, log: Log,
cache: @import("cache.zig").CacheTables, cache: cache.TableTemplate,
}) !void { }) !void {
self.token = settings.token; return self.start(settings);
var req = FetchReq.init(self.allocator, settings.token);
defer req.deinit();
const res = try req.makeRequest(.GET, "/gateway/bot", null);
const body = try req.body.toOwnedSlice();
defer self.allocator.free(body);
// check status idk
if (res.status != http.Status.ok) {
@panic("we are cooked\n"); // check your token dumbass
}
const parsed = try json.parseFromSlice(GatewayBotInfo, self.allocator, body, .{});
defer parsed.deinit();
self.sharder = try Sharder.init(self.allocator, .{
.token = settings.token,
.intents = settings.intents,
.run = settings.run,
.options = SessionOptions{
.info = parsed.value,
.shard_start = settings.options.shard_start,
.shard_end = @intCast(parsed.value.shards),
.total_shards = @intCast(parsed.value.shards),
.spawn_shard_delay = settings.options.spawn_shard_delay,
},
.log = settings.log,
.cache = settings.cache,
});
try self.sharder.spawnShards();
} }

View File

@ -276,85 +276,83 @@ pub const Bucket = struct {
} }
}; };
pub fn GatewayDispatchEvent(comptime T: type) type { pub const GatewayDispatchEvent = struct {
return struct { application_command_permissions_update: ?*const fn (save: *anyopaque, application_command_permissions: Types.ApplicationCommandPermissions) anyerror!void = undefined,
application_command_permissions_update: ?*const fn (save: T, application_command_permissions: Types.ApplicationCommandPermissions) anyerror!void = undefined, auto_moderation_rule_create: ?*const fn (save: *anyopaque, rule: Types.AutoModerationRule) anyerror!void = undefined,
auto_moderation_rule_create: ?*const fn (save: T, rule: Types.AutoModerationRule) anyerror!void = undefined, auto_moderation_rule_update: ?*const fn (save: *anyopaque, rule: Types.AutoModerationRule) anyerror!void = undefined,
auto_moderation_rule_update: ?*const fn (save: T, rule: Types.AutoModerationRule) anyerror!void = undefined, auto_moderation_rule_delete: ?*const fn (save: *anyopaque, rule: Types.AutoModerationRule) anyerror!void = undefined,
auto_moderation_rule_delete: ?*const fn (save: T, rule: Types.AutoModerationRule) anyerror!void = undefined, auto_moderation_action_execution: ?*const fn (save: *anyopaque, action_execution: Types.AutoModerationActionExecution) anyerror!void = undefined,
auto_moderation_action_execution: ?*const fn (save: T, action_execution: Types.AutoModerationActionExecution) anyerror!void = undefined,
channel_create: ?*const fn (save: T, chan: Types.Channel) anyerror!void = undefined, channel_create: ?*const fn (save: *anyopaque, chan: Types.Channel) anyerror!void = undefined,
channel_update: ?*const fn (save: T, chan: Types.Channel) anyerror!void = undefined, channel_update: ?*const fn (save: *anyopaque, chan: Types.Channel) anyerror!void = undefined,
/// this isn't send when the channel is not relevant to you /// this isn't send when the channel is not relevant to you
channel_delete: ?*const fn (save: T, chan: Types.Channel) anyerror!void = undefined, channel_delete: ?*const fn (save: *anyopaque, chan: Types.Channel) anyerror!void = undefined,
channel_pins_update: ?*const fn (save: T, chan_pins_update: Types.ChannelPinsUpdate) anyerror!void = undefined, channel_pins_update: ?*const fn (save: *anyopaque, chan_pins_update: Types.ChannelPinsUpdate) anyerror!void = undefined,
thread_create: ?*const fn (save: T, thread: Types.Channel) anyerror!void = undefined, thread_create: ?*const fn (save: *anyopaque, thread: Types.Channel) anyerror!void = undefined,
thread_update: ?*const fn (save: T, thread: Types.Channel) anyerror!void = undefined, thread_update: ?*const fn (save: *anyopaque, thread: Types.Channel) anyerror!void = undefined,
/// has `id`, `guild_id`, `parent_id`, and `type` fields. /// has `id`, `guild_id`, `parent_id`, and `type` fields.
thread_delete: ?*const fn (save: T, thread: Types.Partial(Types.Channel)) anyerror!void = undefined, thread_delete: ?*const fn (save: *anyopaque, thread: Types.Partial(Types.Channel)) anyerror!void = undefined,
thread_list_sync: ?*const fn (save: T, data: Types.ThreadListSync) anyerror!void = undefined, thread_list_sync: ?*const fn (save: *anyopaque, data: Types.ThreadListSync) anyerror!void = undefined,
thread_member_update: ?*const fn (save: T, guild_id: Types.ThreadMemberUpdate) anyerror!void = undefined, thread_member_update: ?*const fn (save: *anyopaque, guild_id: Types.ThreadMemberUpdate) anyerror!void = undefined,
thread_members_update: ?*const fn (save: T, thread_data: Types.ThreadMembersUpdate) anyerror!void = undefined, thread_members_update: ?*const fn (save: *anyopaque, thread_data: Types.ThreadMembersUpdate) anyerror!void = undefined,
// TODO: implement // guild_audit_log_entry_create: null = null, // TODO: implement // guild_audit_log_entry_create: null = null,
guild_create: ?*const fn (save: T, guild: Types.Guild) anyerror!void = undefined, guild_create: ?*const fn (save: *anyopaque, guild: Types.Guild) anyerror!void = undefined,
guild_create_unavailable: ?*const fn (save: T, guild: Types.UnavailableGuild) anyerror!void = undefined, guild_create_unavailable: ?*const fn (save: *anyopaque, guild: Types.UnavailableGuild) anyerror!void = undefined,
guild_update: ?*const fn (save: T, guild: Types.Guild) anyerror!void = undefined, guild_update: ?*const fn (save: *anyopaque, guild: Types.Guild) anyerror!void = undefined,
/// this is not necessarily sent upon deletion of a guild /// this is not necessarily sent upon deletion of a guild
/// but from when a user is *removed* therefrom /// but from when a user is *removed* therefrom
guild_delete: ?*const fn (save: T, guild: Types.UnavailableGuild) anyerror!void = undefined, guild_delete: ?*const fn (save: *anyopaque, guild: Types.UnavailableGuild) anyerror!void = undefined,
guild_ban_add: ?*const fn (save: T, gba: Types.GuildBanAddRemove) anyerror!void = undefined, guild_ban_add: ?*const fn (save: *anyopaque, gba: Types.GuildBanAddRemove) anyerror!void = undefined,
guild_ban_remove: ?*const fn (save: T, gbr: Types.GuildBanAddRemove) anyerror!void = undefined, guild_ban_remove: ?*const fn (save: *anyopaque, gbr: Types.GuildBanAddRemove) anyerror!void = undefined,
guild_emojis_update: ?*const fn (save: T, fields: Types.GuildEmojisUpdate) anyerror!void = undefined, guild_emojis_update: ?*const fn (save: *anyopaque, fields: Types.GuildEmojisUpdate) anyerror!void = undefined,
guild_stickers_update: ?*const fn (save: T, fields: Types.GuildStickersUpdate) anyerror!void = undefined, guild_stickers_update: ?*const fn (save: *anyopaque, fields: Types.GuildStickersUpdate) anyerror!void = undefined,
guild_integrations_update: ?*const fn (save: T, fields: Types.GuildIntegrationsUpdate) anyerror!void = undefined, guild_integrations_update: ?*const fn (save: *anyopaque, fields: Types.GuildIntegrationsUpdate) anyerror!void = undefined,
guild_member_add: ?*const fn (save: T, guild_id: Types.GuildMemberAdd) anyerror!void = undefined, guild_member_add: ?*const fn (save: *anyopaque, guild_id: Types.GuildMemberAdd) anyerror!void = undefined,
guild_member_update: ?*const fn (save: T, fields: Types.GuildMemberUpdate) anyerror!void = undefined, guild_member_update: ?*const fn (save: *anyopaque, fields: Types.GuildMemberUpdate) anyerror!void = undefined,
guild_member_remove: ?*const fn (save: T, user: Types.GuildMemberRemove) anyerror!void = undefined, guild_member_remove: ?*const fn (save: *anyopaque, user: Types.GuildMemberRemove) anyerror!void = undefined,
guild_members_chunk: ?*const fn (save: T, data: Types.GuildMembersChunk) anyerror!void = undefined, guild_members_chunk: ?*const fn (save: *anyopaque, data: Types.GuildMembersChunk) anyerror!void = undefined,
guild_role_create: ?*const fn (save: T, role: Types.GuildRoleCreate) anyerror!void = undefined, guild_role_create: ?*const fn (save: *anyopaque, role: Types.GuildRoleCreate) anyerror!void = undefined,
guild_role_delete: ?*const fn (save: T, role: Types.GuildRoleDelete) anyerror!void = undefined, guild_role_delete: ?*const fn (save: *anyopaque, role: Types.GuildRoleDelete) anyerror!void = undefined,
guild_role_update: ?*const fn (save: T, role: Types.GuildRoleUpdate) anyerror!void = undefined, guild_role_update: ?*const fn (save: *anyopaque, role: Types.GuildRoleUpdate) anyerror!void = undefined,
guild_scheduled_event_create: ?*const fn (save: T, s_event: Types.ScheduledEvent) anyerror!void = undefined, guild_scheduled_event_create: ?*const fn (save: *anyopaque, s_event: Types.ScheduledEvent) anyerror!void = undefined,
guild_scheduled_event_update: ?*const fn (save: T, s_event: Types.ScheduledEvent) anyerror!void = undefined, guild_scheduled_event_update: ?*const fn (save: *anyopaque, s_event: Types.ScheduledEvent) anyerror!void = undefined,
guild_scheduled_event_delete: ?*const fn (save: T, s_event: Types.ScheduledEvent) anyerror!void = undefined, guild_scheduled_event_delete: ?*const fn (save: *anyopaque, s_event: Types.ScheduledEvent) anyerror!void = undefined,
guild_scheduled_event_user_add: ?*const fn (save: T, data: Types.ScheduledEventUserAdd) anyerror!void = undefined, guild_scheduled_event_user_add: ?*const fn (save: *anyopaque, data: Types.ScheduledEventUserAdd) anyerror!void = undefined,
guild_scheduled_event_user_remove: ?*const fn (save: T, data: Types.ScheduledEventUserRemove) anyerror!void = undefined, guild_scheduled_event_user_remove: ?*const fn (save: *anyopaque, data: Types.ScheduledEventUserRemove) anyerror!void = undefined,
integration_create: ?*const fn (save: T, guild_id: Types.IntegrationCreateUpdate) anyerror!void = undefined, integration_create: ?*const fn (save: *anyopaque, guild_id: Types.IntegrationCreateUpdate) anyerror!void = undefined,
integration_update: ?*const fn (save: T, guild_id: Types.IntegrationCreateUpdate) anyerror!void = undefined, integration_update: ?*const fn (save: *anyopaque, guild_id: Types.IntegrationCreateUpdate) anyerror!void = undefined,
integration_delete: ?*const fn (save: T, guild_id: Types.IntegrationDelete) anyerror!void = undefined, integration_delete: ?*const fn (save: *anyopaque, guild_id: Types.IntegrationDelete) anyerror!void = undefined,
interaction_create: ?*const fn (save: T, interaction: Types.MessageInteraction) anyerror!void = undefined, interaction_create: ?*const fn (save: *anyopaque, interaction: Types.MessageInteraction) anyerror!void = undefined,
invite_create: ?*const fn (save: T, data: Types.InviteCreate) anyerror!void = undefined, invite_create: ?*const fn (save: *anyopaque, data: Types.InviteCreate) anyerror!void = undefined,
invite_delete: ?*const fn (save: T, data: Types.InviteDelete) anyerror!void = undefined, invite_delete: ?*const fn (save: *anyopaque, data: Types.InviteDelete) anyerror!void = undefined,
message_create: ?*const fn (save: T, message: Types.Message) anyerror!void = undefined, message_create: ?*const fn (save: *anyopaque, message: Types.Message) anyerror!void = undefined,
message_update: ?*const fn (save: T, message: Types.Message) anyerror!void = undefined, message_update: ?*const fn (save: *anyopaque, message: Types.Message) anyerror!void = undefined,
message_delete: ?*const fn (save: T, log: Types.MessageDelete) anyerror!void = undefined, message_delete: ?*const fn (save: *anyopaque, log: Types.MessageDelete) anyerror!void = undefined,
message_delete_bulk: ?*const fn (save: T, log: Types.MessageDeleteBulk) anyerror!void = undefined, message_delete_bulk: ?*const fn (save: *anyopaque, log: Types.MessageDeleteBulk) anyerror!void = undefined,
message_reaction_add: ?*const fn (save: T, log: Types.MessageReactionAdd) anyerror!void = undefined, message_reaction_add: ?*const fn (save: *anyopaque, log: Types.MessageReactionAdd) anyerror!void = undefined,
message_reaction_remove_all: ?*const fn (save: T, data: Types.MessageReactionRemoveAll) anyerror!void = undefined, message_reaction_remove_all: ?*const fn (save: *anyopaque, data: Types.MessageReactionRemoveAll) anyerror!void = undefined,
message_reaction_remove: ?*const fn (save: T, data: Types.MessageReactionRemove) anyerror!void = undefined, message_reaction_remove: ?*const fn (save: *anyopaque, data: Types.MessageReactionRemove) anyerror!void = undefined,
message_reaction_remove_emoji: ?*const fn (save: T, data: Types.MessageReactionRemoveEmoji) anyerror!void = undefined, message_reaction_remove_emoji: ?*const fn (save: *anyopaque, data: Types.MessageReactionRemoveEmoji) anyerror!void = undefined,
presence_update: ?*const fn (save: T, presence: Types.PresenceUpdate) anyerror!void = undefined, presence_update: ?*const fn (save: *anyopaque, presence: Types.PresenceUpdate) anyerror!void = undefined,
stage_instance_create: ?*const fn (save: T, stage_instance: Types.StageInstance) anyerror!void = undefined, stage_instance_create: ?*const fn (save: *anyopaque, stage_instance: Types.StageInstance) anyerror!void = undefined,
stage_instance_update: ?*const fn (save: T, stage_instance: Types.StageInstance) anyerror!void = undefined, stage_instance_update: ?*const fn (save: *anyopaque, stage_instance: Types.StageInstance) anyerror!void = undefined,
stage_instance_delete: ?*const fn (save: T, stage_instance: Types.StageInstance) anyerror!void = undefined, stage_instance_delete: ?*const fn (save: *anyopaque, stage_instance: Types.StageInstance) anyerror!void = undefined,
typing_start: ?*const fn (save: T, data: Types.TypingStart) anyerror!void = undefined, typing_start: ?*const fn (save: *anyopaque, data: Types.TypingStart) anyerror!void = undefined,
/// remember this is only sent when you change your profile yourself/your bot does /// remember this is only sent when you change your profile yourself/your bot does
user_update: ?*const fn (save: T, user: Types.User) anyerror!void = undefined, user_update: ?*const fn (save: *anyopaque, user: Types.User) anyerror!void = undefined,
// will do these someday, music is rather pointless at this point in time // will do these someday, music is rather pointless at this point in time
// TODO: implement // voice_channel_effect_send: null = null, // TODO: implement // voice_channel_effect_send: null = null,
// TODO: implement // voice_state_update: null = null, // TODO: implement // voice_state_update: null = null,
// TODO: implement // voice_server_update: null = null, // TODO: implement // voice_server_update: null = null,
webhooks_update: ?*const fn (save: T, fields: Types.WebhookUpdate) anyerror!void = undefined, webhooks_update: ?*const fn (save: *anyopaque, fields: Types.WebhookUpdate) anyerror!void = undefined,
entitlement_create: ?*const fn (save: T, entitlement: Types.Entitlement) anyerror!void = undefined, entitlement_create: ?*const fn (save: *anyopaque, entitlement: Types.Entitlement) anyerror!void = undefined,
entitlement_update: ?*const fn (save: T, entitlement: Types.Entitlement) anyerror!void = undefined, entitlement_update: ?*const fn (save: *anyopaque, entitlement: Types.Entitlement) anyerror!void = undefined,
/// discord claims this is infrequent, therefore not throughoutly tested - Yuzu /// discord claims this is infrequent, therefore not throughoutly tested - Yuzu
entitlement_delete: ?*const fn (save: T, entitlement: Types.Entitlement) anyerror!void = undefined, entitlement_delete: ?*const fn (save: *anyopaque, entitlement: Types.Entitlement) anyerror!void = undefined,
message_poll_vote_add: ?*const fn (save: T, poll: Types.PollVoteAdd) anyerror!void = undefined, message_poll_vote_add: ?*const fn (save: *anyopaque, poll: Types.PollVoteAdd) anyerror!void = undefined,
message_poll_vote_remove: ?*const fn (save: T, poll: Types.PollVoteRemove) anyerror!void = undefined, message_poll_vote_remove: ?*const fn (save: *anyopaque, poll: Types.PollVoteRemove) anyerror!void = undefined,
ready: ?*const fn (save: T, data: Types.Ready) anyerror!void = undefined, ready: ?*const fn (save: *anyopaque, data: Types.Ready) anyerror!void = undefined,
// TODO: implement // resumed: null = null, // TODO: implement // resumed: null = null,
any: ?*const fn (save: T, data: std.json.Value) anyerror!void = undefined, any: ?*const fn (save: *anyopaque, data: std.json.Value) anyerror!void = undefined,
}; };
}

File diff suppressed because it is too large Load Diff

263
src/sharder.zig Normal file
View File

@ -0,0 +1,263 @@
//! ISC License
//!
//! Copyright (c) 2024-2025 Yuzu
//!
//! Permission to use, copy, modify, and/or distribute this software for any
//! purpose with or without fee is hereby granted, provided that the above
//! copyright notice and this permission notice appear in all copies.
//!
//! THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
//! REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
//! AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
//! INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
//! LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
//! OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
//! PERFORMANCE OF THIS SOFTWARE.
const Intents = @import("./structures/types.zig").Intents;
const Snowflake = @import("./structures/snowflake.zig").Snowflake;
const GatewayBotInfo = @import("internal.zig").GatewayBotInfo;
const IdentifyProperties = @import("internal.zig").IdentifyProperties;
const ShardDetails = @import("internal.zig").ShardDetails;
const ConnectQueue = @import("internal.zig").ConnectQueue;
const GatewayDispatchEvent = @import("internal.zig").GatewayDispatchEvent;
const Log = @import("internal.zig").Log;
const Shard = @import("shard.zig").Shard;
const std = @import("std");
const mem = std.mem;
const debug = @import("internal.zig").debug;
const TableTemplate = @import("cache.zig").TableTemplate;
pub fn ShardManager(comptime Table: TableTemplate) type {
return struct {
const Self = @This();
shard_details: ShardDetails,
allocator: mem.Allocator,
/// Queue for managing shard connections
connect_queue: ConnectQueue(Shard(Table)),
shards: std.AutoArrayHashMap(usize, Shard(Table)),
handler: GatewayDispatchEvent,
/// where we dispatch work for every thread, threads must be spawned upon shard creation
/// make sure the address of workers is stable
workers: std.Thread.Pool = undefined,
/// configuration settings
options: SessionOptions,
log: Log,
// must be initialised
cache: *@import("cache.zig").CacheTables(Table),
pub const ShardData = struct {
/// resume seq to resume connections
resume_seq: ?usize,
/// resume_gateway_url is the url to resume the connection
/// https://discord.com/developers/docs/topics/gateway#ready-event
resume_gateway_url: ?[]const u8,
/// session_id is the unique session id of the gateway
session_id: ?[]const u8,
};
pub const SessionOptions = struct {
/// Important data which is used by the manager to connect shards to the gateway. */
info: GatewayBotInfo,
/// Delay in milliseconds to wait before spawning next shard. OPTIMAL IS ABOVE 5100. YOU DON'T WANT TO HIT THE RATE LIMIT!!!
spawn_shard_delay: ?u64 = 5300,
/// Total amount of shards your bot uses. Useful for zero-downtime updates or resharding.
total_shards: usize = 1,
shard_start: usize = 0,
shard_end: usize = 1,
/// The payload handlers for messages on the shard.
resharding: ?struct { interval: u64, percentage: usize } = null,
/// worker threads
workers_per_shard: usize = 1,
/// The shard lifespan in milliseconds. If a shard is not connected within this time, it will be closed.
shard_lifespan: ?u64 = null,
};
pub fn init(allocator: mem.Allocator, settings: struct {
token: []const u8,
intents: Intents,
options: SessionOptions,
run: GatewayDispatchEvent,
log: Log,
cache: @import("cache.zig").TableTemplate,
}) mem.Allocator.Error!Self {
const concurrency = settings.options.info.session_start_limit.?.max_concurrency;
const cache = try allocator.create(@import("cache.zig").CacheTables(Table));
cache.* = @import("cache.zig").CacheTables(Table).defaults(allocator);
return .{
.allocator = allocator,
.connect_queue = try ConnectQueue(Shard(Table)).init(allocator, concurrency, 5000),
.shards = .init(allocator),
.workers = undefined,
.shard_details = ShardDetails{
.token = settings.token,
.intents = settings.intents,
},
.handler = settings.run,
.options = .{
.info = .{
.url = settings.options.info.url,
.shards = settings.options.info.shards,
.session_start_limit = settings.options.info.session_start_limit,
},
.total_shards = settings.options.total_shards,
.shard_start = settings.options.shard_start,
.shard_end = settings.options.shard_end,
.workers_per_shard = settings.options.workers_per_shard,
},
.log = settings.log,
.cache = cache,
};
}
pub fn deinit(self: *Self) void {
self.connect_queue.deinit();
self.shards.deinit();
}
pub fn forceIdentify(self: *Self, shard_id: usize) !void {
self.logif("#{d} force identify", .{shard_id});
const shard = try self.create(shard_id);
return shard.identify(null);
}
pub fn disconnect(self: *Self, shard_id: usize) Shard(Table).CloseError!void {
return if (self.shards.get(shard_id)) |shard| shard.disconnect();
}
pub fn disconnectAll(self: *Self) Shard(Table).CloseError!void {
while (self.shards.iterator().next()) |shard| shard.value_ptr.disconnect();
}
/// spawn buckets in order
/// Log bucket preparation
/// Divide shards into chunks based on concurrency
/// Assign each shard to a bucket
/// Return list of buckets
/// https://discord.com/developers/docs/events/gateway#sharding-max-concurrency
fn spawnBuckets(self: *Self) ![][]Shard(Table) {
const concurrency = self.options.info.session_start_limit.?.max_concurrency;
self.logif("{d}-{d}", .{ self.options.shard_start, self.options.shard_end });
const range = std.math.sub(usize, self.options.shard_start, self.options.shard_end) catch 1;
const bucket_count = (range + concurrency - 1) / concurrency;
self.logif("#0 preparing buckets", .{});
const buckets = try self.allocator.alloc([]Shard(Table), bucket_count);
for (buckets, 0..) |*bucket, i| {
const bucket_size = if ((i + 1) * concurrency > range) range - (i * concurrency) else concurrency;
bucket.* = try self.allocator.alloc(Shard(Table), bucket_size);
for (bucket.*, 0..) |*shard, j| {
shard.* = try self.create(self.options.shard_start + i * concurrency + j);
}
}
self.logif("{d} buckets created", .{bucket_count});
// finally defihne threads
try self.workers.init(.{
.allocator = self.allocator,
.n_jobs = self.options.workers_per_shard * self.options.total_shards,
});
return buckets;
}
/// creates a shard and stores it
fn create(self: *Self, shard_id: usize) !Shard(Table) {
if (self.shards.get(shard_id)) |s| return s;
const shard: Shard(Table) = try .init(self.allocator, shard_id, self.options.total_shards, .{
.token = self.shard_details.token,
.intents = self.shard_details.intents,
.options = Shard(Table).ShardOptions{
.info = self.options.info,
.ratelimit_options = .{},
},
.run = self.handler,
.log = self.log,
.cache = self.cache,
.sharder_pool = &self.workers,
});
try self.shards.put(shard_id, shard);
return shard;
}
pub fn resume_(self: *Self, shard_id: usize, shard_data: ShardData) void {
if (self.shards.contains(shard_id)) return error.CannotOverrideExistingShard;
const shard = self.create(shard_id);
shard.data = shard_data;
return self.connect_queue.push(.{
.shard = shard,
.callback = &callback,
});
}
fn callback(self: *ConnectQueue(Shard(Table)).RequestWithShard) anyerror!void {
try self.shard.connect();
}
pub fn spawnShards(self: *Self) !void {
const buckets = try self.spawnBuckets();
self.logif("Spawning shards", .{});
for (buckets) |bucket| {
for (bucket) |shard| {
self.logif("adding {d} to connect queue", .{shard.id});
try self.connect_queue.push(.{
.shard = shard,
.callback = &callback,
});
}
}
//self.startResharder();
}
pub fn send(self: *Self, shard_id: usize, data: anytype) Shard(Table).SendError!void {
if (self.shards.get(shard_id)) |shard| try shard.send(data);
}
// SPEC OF THE RESHARDER:
// Class Self
//
// Method startResharder():
// If resharding interval is not set or shard bounds are not valid:
// Exit
// Set up periodic check for resharding:
// If new shards are required:
// Log resharding process
// Update options with new shard settings
// Disconnect old shards and clear them from manager
// Spawn shards again with updated configuration
//
inline fn logif(self: *Self, comptime format: []const u8, args: anytype) void {
switch (self.log) {
.yes => debug.info(format, args),
.no => {},
}
}
};
}

View File

@ -20,12 +20,16 @@ const Shard = Discord.Shard;
const Intents = Discord.Intents; const Intents = Discord.Intents;
const INTENTS = 53608447; const INTENTS = 53608447;
const Cache = Discord.cache.TableTemplate{};
fn ready(_: *Shard, payload: Discord.Ready) !void { fn ready(_: *anyopaque, payload: Discord.Ready) !void {
std.debug.print("logged in as {s}\n", .{payload.user.username}); std.debug.print("logged in as {s}\n", .{payload.user.username});
} }
fn message_create(session: *Shard, message: Discord.Message) !void { fn message_create(shard_ptr: *anyopaque, message: Discord.Message) !void {
// set custom cache
const session: *Shard(Cache) = @ptrCast(@alignCast(shard_ptr));
if (message.content != null and std.ascii.eqlIgnoreCase(message.content.?, "!hi")) { if (message.content != null and std.ascii.eqlIgnoreCase(message.content.?, "!hi")) {
var result = try session.sendMessage(message.channel_id, .{ .content = "hi :)" }); var result = try session.sendMessage(message.channel_id, .{ .content = "hi :)" });
defer result.deinit(); defer result.deinit();
@ -48,6 +52,6 @@ pub fn main() !void {
.run = .{ .message_create = &message_create, .ready = &ready }, .run = .{ .message_create = &message_create, .ready = &ready },
.log = .yes, .log = .yes,
.options = .{}, .options = .{},
.cache = Discord.CacheTables.defaults(allocator), .cache = Cache,
}); });
} }