diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index c7f7146..497b27a 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -16,6 +16,7 @@ import type { WorkerSendResultPayload, WorkerSendShardInfo, WorkerShardInfo, + WorkerShardsConnected, WorkerStart, } from '../websocket/discord/worker'; import type { ManagerMessages } from '../websocket/discord/workermanager'; @@ -225,6 +226,7 @@ export class WorkerClient extends BaseClient { const onPacket = this.onPacket.bind(this); const handlePayload = this.options?.handlePayload?.bind(this); const self = this; + const { sendPayloadToParent } = this.options; for (const id of workerData.shards) { let shard = this.shards.get(id); if (!shard) { @@ -241,12 +243,13 @@ export class WorkerClient extends BaseClient { async handlePayload(shardId, payload) { await handlePayload?.(shardId, payload); await onPacket?.(payload, shardId); - self.postMessage({ - workerId: workerData.workerId, - shardId, - type: 'RECEIVE_PAYLOAD', - payload, - } satisfies WorkerReceivePayload); + if (sendPayloadToParent) + self.postMessage({ + workerId: workerData.workerId, + shardId, + type: 'RECEIVE_PAYLOAD', + payload, + } satisfies WorkerReceivePayload); }, }); this.shards.set(id, shard); @@ -418,6 +421,13 @@ export class WorkerClient extends BaseClient { this.applicationId = packet.d.application.id; this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never; await this.events?.execute(packet.t as never, packet, this, shardId); + if ([...this.shards.values()].every(shard => shard.data.session_id)) { + this.postMessage({ + type: 'WORKER_SHARDS_CONNECTED', + workerId: this.workerId, + } as WorkerShardsConnected); + await this.events?.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1); + } if ( !( this.__handleGuilds?.size && @@ -460,4 +470,6 @@ interface WorkerClientOptions extends BaseClientOptions { handlePayload?: ShardManagerOptions['handlePayload']; gateway?: ClientOptions['gateway']; postMessage?: (body: unknown) => unknown; + /** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */ + sendPayloadToParent?: boolean; } diff --git a/src/events/hooks/custom.ts b/src/events/hooks/custom.ts index c97199a..7a7e88e 100644 --- a/src/events/hooks/custom.ts +++ b/src/events/hooks/custom.ts @@ -8,3 +8,7 @@ export const BOT_READY = (_self: UsingClient, me: ClientUserStructure) => { export const WORKER_READY = (_self: UsingClient, me: ClientUserStructure) => { return me; }; + +export const WORKER_SHARDS_CONNECTED = (_self: UsingClient, me: ClientUserStructure) => { + return me; +}; diff --git a/src/websocket/discord/worker.ts b/src/websocket/discord/worker.ts index 89e07d8..fbadd8a 100644 --- a/src/websocket/discord/worker.ts +++ b/src/websocket/discord/worker.ts @@ -51,6 +51,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage< export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>; export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>; export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>; +export type WorkerShardsConnected = CreateWorkerMessage<'WORKER_SHARDS_CONNECTED'>; export type WorkerStart = CreateWorkerMessage<'WORKER_START'>; export type WorkerSendApiRequest = CreateWorkerMessage< 'WORKER_API_REQUEST', @@ -87,6 +88,7 @@ export type WorkerMessage = | WorkerSendShardInfo | WorkerSendInfo | WorkerReady + | WorkerShardsConnected | WorkerSendApiRequest | WorkerSendEvalResponse | WorkerSendEval diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index 4ad4e7d..b3b88e2 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -18,6 +18,7 @@ export class WorkerManager extends Map< options!: MakePartial, 'adapter'>; debugger?: Logger; connectQueue!: ConnectQueue; + workerQueue: (() => void)[] = []; cacheAdapter: Adapter; promises = new Map void; timeout: NodeJS.Timeout }>(); rest!: ApiHandler; @@ -135,20 +136,22 @@ export class WorkerManager extends Map< if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.'); for (let i = 0; i < shards.length; i++) { - let worker = this.get(i); - if (!worker) { - worker = this.createWorker({ - path: this.options.path, - debug: this.options.debug, - token: this.options.token, - shards: shards[i], - intents: this.options.intents, - workerId: i, - workerProxy: this.options.workerProxy, - totalShards: this.totalShards, - mode: this.options.mode, + const workerExists = this.has(i); + if (!workerExists) { + this.workerQueue.push(() => { + const worker = this.createWorker({ + path: this.options.path, + debug: this.options.debug, + token: this.options.token, + shards: shards[i], + intents: this.options.intents, + workerId: i, + workerProxy: this.options.workerProxy, + totalShards: this.totalShards, + mode: this.options.mode, + }); + this.set(i, worker); }); - this.set(i, worker); } } } @@ -288,6 +291,17 @@ export class WorkerManager extends Map< } } break; + case 'WORKER_SHARDS_CONNECTED': + { + const nextWorker = this.workerQueue.shift(); + if (nextWorker) { + this.debugger?.info('Spawning next worker'); + nextWorker(); + } else { + this.debugger?.info('No more workers to spawn left'); + } + } + break; case 'WORKER_API_REQUEST': { const response = await this.rest.request(message.method, message.url, message.requestOptions); @@ -431,6 +445,8 @@ export class WorkerManager extends Map< const spaces = this.prepareSpaces(); await this.prepareWorkers(spaces); + // Start workers queue + return this.workerQueue.shift()?.(); } }