From 6817eb44f309fa033a5726955496855c7158326f Mon Sep 17 00:00:00 2001 From: MARCROCK22 <57925328+MARCROCK22@users.noreply.github.com> Date: Sat, 31 Aug 2024 23:17:04 -0400 Subject: [PATCH] feat(ShardManger): resharding (#259) * feat(ShardManger): resharding * fix: xd * fix: types * fix: minor changes * fix: unused interval --- src/client/client.ts | 42 ++++++++++++++------ src/websocket/constants/index.ts | 12 +++++- src/websocket/discord/shard.ts | 3 +- src/websocket/discord/sharder.ts | 67 ++++++++++++++++++++++++++++++++ src/websocket/discord/shared.ts | 19 ++++++++- 5 files changed, 129 insertions(+), 14 deletions(-) diff --git a/src/client/client.ts b/src/client/client.ts index 4fe00a3..930cb40 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -20,7 +20,7 @@ import { type ClientUserStructure, Transformers, type MessageStructure } from '. let parentPort: import('node:worker_threads').MessagePort; export class Client extends BaseClient { - private __handleGuilds?: Set = new Set(); + private __handleGuilds?: string[]; gateway!: ShardManager; me!: If; declare options: Omit & { @@ -111,6 +111,7 @@ export class Client extends BaseClient { this.gateway = new ShardManager({ token, info: await this.proxy.gateway.bot.get(), + getInfo: () => this.proxy.gateway.bot.get(), intents, handlePayload: async (shardId, packet) => { await this.options?.handlePayload?.(shardId, packet); @@ -126,6 +127,22 @@ export class Client extends BaseClient { ...this.options?.gateway?.properties, }, compress: this.options?.gateway?.compress, + resharding: { + interval: this.options?.resharding?.interval ?? 0, + percentage: this.options?.resharding?.percentage ?? 0, + reloadGuilds: ids => { + this.__handleGuilds = this.__handleGuilds?.concat(ids) ?? ids; + }, + onGuild: id => { + if (this.__handleGuilds) { + const index = this.__handleGuilds.indexOf(id); + if (index === -1) return false; + this.__handleGuilds.splice(index, 1); + return true; + } + return false; + }, + }, }); } @@ -159,14 +176,14 @@ export class Client extends BaseClient { break; case 'GUILD_DELETE': case 'GUILD_CREATE': { - if (this.__handleGuilds?.has(packet.d.id)) { - this.__handleGuilds?.delete(packet.d.id); - if (!this.__handleGuilds?.size && [...this.gateway.values()].every(shard => shard.data.session_id)) { + if (this.__handleGuilds?.includes(packet.d.id)) { + this.__handleGuilds?.splice(this.__handleGuilds!.indexOf(packet.d.id), 1); + if (!this.__handleGuilds?.length && [...this.gateway.values()].every(shard => shard.data.session_id)) { delete this.__handleGuilds; await this.cache.onPacket(packet); return this.events?.runEvent('BOT_READY', this, this.me, -1); } - if (!this.__handleGuilds?.size) delete this.__handleGuilds; + if (!this.__handleGuilds?.length) delete this.__handleGuilds; return this.cache.onPacket(packet); } await this.events?.execute(packet.t, packet, this as Client, shardId); @@ -183,17 +200,15 @@ export class Client extends BaseClient { await this.events?.execute(packet.t as never, packet, this as Client, shardId); await this.handleCommand.message(packet.d, shardId); break; - case 'READY': - if (!this.__handleGuilds) this.__handleGuilds = new Set(); - for (const g of packet.d.guilds) { - this.__handleGuilds?.add(g.id); - } + case 'READY': { + const ids = packet.d.guilds.map(x => x.id); + this.__handleGuilds = this.__handleGuilds?.concat(ids) ?? ids; this.botId = packet.d.user.id; this.applicationId = packet.d.application.id; this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never; if ( !( - this.__handleGuilds?.size && + this.__handleGuilds?.length && (this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds ) ) { @@ -205,6 +220,7 @@ export class Client extends BaseClient { this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`); await this.events?.execute(packet.t as never, packet, this as Client, shardId); break; + } default: await this.events?.execute(packet.t as never, packet, this as Client, shardId); break; @@ -232,4 +248,8 @@ export interface ClientOptions extends BaseClientOptions { reply?: (ctx: CommandContext) => boolean; }; handlePayload?: ShardManagerOptions['handlePayload']; + resharding?: { + interval: number; + percentage: number; + }; } diff --git a/src/websocket/constants/index.ts b/src/websocket/constants/index.ts index c254df7..8b9a2a4 100644 --- a/src/websocket/constants/index.ts +++ b/src/websocket/constants/index.ts @@ -20,11 +20,21 @@ const ShardManagerDefaults: Partial = { handlePayload: (shardId: number, packet: GatewayDispatchPayload): void => { console.info(`Packet ${packet.t} on shard ${shardId}`); }, + resharding: { + interval: 8 * 60 * 60 * 1e3, // 8h + percentage: 80, + reloadGuilds() { + throw new Error('Unexpected to run '); + }, + onGuild() { + throw new Error('Unexpected to run '); + }, + }, }; const WorkerManagerDefaults: Partial = { ...ShardManagerDefaults, - shardsPerWorker: 32, + shardsPerWorker: 16, handlePayload: (_shardId: number, _workerId: number, _packet: GatewayDispatchPayload): void => {}, }; diff --git a/src/websocket/discord/shard.ts b/src/websocket/discord/shard.ts index 505f80e..5f63d65 100644 --- a/src/websocket/discord/shard.ts +++ b/src/websocket/discord/shard.ts @@ -322,10 +322,11 @@ export class Shard { } async close(code: number, reason: string) { + clearInterval(this.heart.nodeInterval); if (!this.isOpen) { return this.debugger?.warn(`[Shard #${this.id}] Is not open`); } - this.debugger?.warn(`[Shard #${this.id}] Called close`); + this.debugger?.debug(`[Shard #${this.id}] Called close`); this.websocket?.close(code, reason); } diff --git a/src/websocket/discord/sharder.ts b/src/websocket/discord/sharder.ts index d0ee6c7..7611b42 100644 --- a/src/websocket/discord/sharder.ts +++ b/src/websocket/discord/sharder.ts @@ -12,6 +12,7 @@ import { type GatewayVoiceStateUpdate, type GatewaySendPayload, GatewayOpcodes, + type GatewayDispatchPayload, } from '../../types'; import { ShardManagerDefaults } from '../constants'; import { DynamicBucket } from '../structures'; @@ -51,6 +52,72 @@ export class ShardManager extends Map { workerData = worker_threads.workerData; if (worker_threads.parentPort) parentPort = worker_threads.parentPort; } + + if (this.options.resharding.interval <= 0) return; + setInterval(async () => { + this.debugger?.debug('Checking if reshard is needed'); + const info = await this.options.getInfo(); + if (info.shards <= this.totalShards) return this.debugger?.debug('Resharding not needed'); + //https://github.com/discordeno/discordeno/blob/6a5f446c0651b9fad9f1550ff1857fe7a026426b/packages/gateway/src/manager.ts#L106C8-L106C94 + const percentage = (info.shards / ((this.totalShards * 2500) / 1000)) * 100; + if (percentage < this.options.resharding.percentage) + return this.debugger?.debug( + `Percentage is not enough to reshard ${percentage}/${this.options.resharding.percentage}`, + ); + + this.debugger?.info('Starting resharding process'); + + this.connectQueue.concurrency = info.session_start_limit.max_concurrency; + this.options.totalShards = info.shards; + this.options.info.session_start_limit.max_concurrency = info.session_start_limit.max_concurrency; + + let shardsConnected = 0; + let handlePayload = async (sharder: ShardManager, _: number, packet: GatewayDispatchPayload) => { + if ( + (packet.t === 'GUILD_CREATE' || packet.t === 'GUILD_DELETE') && + this.options.resharding.onGuild(packet.d.id) + ) { + return; + } + + if (packet.t !== 'READY') return; + + this.options.resharding.reloadGuilds(packet.d.guilds.map(x => x.id)); + + if (++shardsConnected < info.shards) return; //waiting for last shard to connect + + // dont listen more events when all shards are ready + handlePayload = async () => {}; + await this.disconnectAll(); + this.clear(); + + for (const [id, shard] of sharder) { + shard.options.handlePayload = (shardId, packet) => { + return this.options.handlePayload(shardId, packet); + }; + this.set(id, shard); + } + + sharder.clear(); + }; + + const resharder = new ShardManager({ + ...this.options, + resharding: { + interval: 0, + percentage: 0, + reloadGuilds() {}, + onGuild() { + return true; + }, + }, + handlePayload: (shardId, packet): unknown => { + return handlePayload(resharder, shardId, packet); + }, + }); + + await resharder.spawnShards(); + }, this.options.resharding.interval); } get totalShards() { diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index a73c976..d4a9e72 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -8,6 +8,7 @@ import type { Awaitable, DeepPartial, Logger } from '../../common'; import type { IdentifyProperties } from '../constants'; export interface ShardManagerOptions extends ShardDetails { + getInfo(): Promise; /** Important data which is used by the manager to connect shards to the gateway. */ info: APIGatewayBotInfo; /** @@ -36,6 +37,22 @@ export interface ShardManagerOptions extends ShardDetails { presence?: (shardId: number, workerId: number) => GatewayPresenceUpdateData; compress?: boolean; + resharding?: { + interval: number; + percentage: number; + /** + * + * @param ids + * @returns + */ + reloadGuilds: (ids: string[]) => unknown; + /** + * + * @param id + * @returns true if deleted + */ + onGuild: (id: string) => boolean; + }; } export interface CustomManagerAdapter { @@ -51,7 +68,7 @@ export interface WorkerManagerOptions extends Omit