feat: queue to spawn workers (#255)

* feat: queue to spawn workers

* fix: using optional chaining
This commit is contained in:
MARCROCK22 2024-08-25 14:14:41 -04:00 committed by GitHub
parent 498c66efdb
commit 633b19c382
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 53 additions and 19 deletions

View File

@ -16,6 +16,7 @@ import type {
WorkerSendResultPayload,
WorkerSendShardInfo,
WorkerShardInfo,
WorkerShardsConnected,
WorkerStart,
} from '../websocket/discord/worker';
import type { ManagerMessages } from '../websocket/discord/workermanager';
@ -225,6 +226,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this);
const self = this;
const { sendPayloadToParent } = this.options;
for (const id of workerData.shards) {
let shard = this.shards.get(id);
if (!shard) {
@ -241,12 +243,13 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload);
await onPacket?.(payload, shardId);
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
if (sendPayloadToParent)
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
},
});
this.shards.set(id, shard);
@ -418,6 +421,13 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
this.applicationId = packet.d.application.id;
this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never;
await this.events?.execute(packet.t as never, packet, this, shardId);
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
this.postMessage({
type: 'WORKER_SHARDS_CONNECTED',
workerId: this.workerId,
} as WorkerShardsConnected);
await this.events?.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1);
}
if (
!(
this.__handleGuilds?.size &&
@ -460,4 +470,6 @@ interface WorkerClientOptions extends BaseClientOptions {
handlePayload?: ShardManagerOptions['handlePayload'];
gateway?: ClientOptions['gateway'];
postMessage?: (body: unknown) => unknown;
/** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */
sendPayloadToParent?: boolean;
}

View File

@ -8,3 +8,7 @@ export const BOT_READY = (_self: UsingClient, me: ClientUserStructure) => {
export const WORKER_READY = (_self: UsingClient, me: ClientUserStructure) => {
return me;
};
export const WORKER_SHARDS_CONNECTED = (_self: UsingClient, me: ClientUserStructure) => {
return me;
};

View File

@ -51,6 +51,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage<
export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>;
export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>;
export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>;
export type WorkerShardsConnected = CreateWorkerMessage<'WORKER_SHARDS_CONNECTED'>;
export type WorkerStart = CreateWorkerMessage<'WORKER_START'>;
export type WorkerSendApiRequest = CreateWorkerMessage<
'WORKER_API_REQUEST',
@ -87,6 +88,7 @@ export type WorkerMessage =
| WorkerSendShardInfo
| WorkerSendInfo
| WorkerReady
| WorkerShardsConnected
| WorkerSendApiRequest
| WorkerSendEvalResponse
| WorkerSendEval

View File

@ -18,6 +18,7 @@ export class WorkerManager extends Map<
options!: MakePartial<Required<WorkerManagerOptions>, 'adapter'>;
debugger?: Logger;
connectQueue!: ConnectQueue;
workerQueue: (() => void)[] = [];
cacheAdapter: Adapter;
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
rest!: ApiHandler;
@ -135,20 +136,22 @@ export class WorkerManager extends Map<
if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.');
for (let i = 0; i < shards.length; i++) {
let worker = this.get(i);
if (!worker) {
worker = this.createWorker({
path: this.options.path,
debug: this.options.debug,
token: this.options.token,
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: this.totalShards,
mode: this.options.mode,
const workerExists = this.has(i);
if (!workerExists) {
this.workerQueue.push(() => {
const worker = this.createWorker({
path: this.options.path,
debug: this.options.debug,
token: this.options.token,
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: this.totalShards,
mode: this.options.mode,
});
this.set(i, worker);
});
this.set(i, worker);
}
}
}
@ -288,6 +291,17 @@ export class WorkerManager extends Map<
}
}
break;
case 'WORKER_SHARDS_CONNECTED':
{
const nextWorker = this.workerQueue.shift();
if (nextWorker) {
this.debugger?.info('Spawning next worker');
nextWorker();
} else {
this.debugger?.info('No more workers to spawn left');
}
}
break;
case 'WORKER_API_REQUEST':
{
const response = await this.rest.request(message.method, message.url, message.requestOptions);
@ -431,6 +445,8 @@ export class WorkerManager extends Map<
const spaces = this.prepareSpaces();
await this.prepareWorkers(spaces);
// Start workers queue
return this.workerQueue.shift()?.();
}
}