diff --git a/src/cache/adapters/workeradapter.ts b/src/cache/adapters/workeradapter.ts index 8ee1745..5ba42b2 100644 --- a/src/cache/adapters/workeradapter.ts +++ b/src/cache/adapters/workeradapter.ts @@ -1,5 +1,5 @@ import { randomUUID } from 'node:crypto'; -import { parentPort, workerData, type MessagePort } from 'node:worker_threads'; +import { parentPort, type MessagePort } from 'node:worker_threads'; import type { WorkerData } from '../../websocket'; import type { WorkerSendCacheRequest } from '../../websocket/discord/worker'; import type { Adapter } from './types'; @@ -8,18 +8,26 @@ export class WorkerAdapter implements Adapter { isAsync = true; promises = new Map void; timeout: NodeJS.Timeout }>(); - constructor(readonly parent: MessagePort) {} + constructor( + readonly parent: MessagePort | NodeJS.Process, + public workerData: WorkerData, + ) {} + + postMessage(body: any) { + if (parentPort) return parentPort.postMessage(body); + return process.send!(body); + } protected send(method: WorkerSendCacheRequest['method'], ...args: any[]): Promise { const nonce = randomUUID(); if (this.promises.has(nonce)) return this.send(method, ...args); - parentPort!.postMessage({ + this.postMessage({ type: 'CACHE_REQUEST', args, nonce, method, - workerId: (workerData as WorkerData).workerId, + workerId: this.workerData.workerId, } satisfies WorkerSendCacheRequest); let resolve = (_: any) => { diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 6b48996..d6242f8 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,5 +1,5 @@ import { randomUUID } from 'node:crypto'; -import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads'; +import { parentPort as manager } from 'node:worker_threads'; import { ApiHandler } from '..'; import type { Cache } from '../cache'; import { WorkerAdapter } from '../cache'; @@ -18,6 +18,7 @@ import type { WorkerSendResultPayload, WorkerSendShardInfo, WorkerShardInfo, + WorkerStart, } from '../websocket/discord/worker'; import type { ManagerMessages } from '../websocket/discord/workermanager'; import type { BaseClientOptions, StartOptions } from './base'; @@ -26,7 +27,18 @@ import type { Client } from './client'; import { onInteractionCreate } from './oninteractioncreate'; import { onMessageCreate } from './onmessagecreate'; -const workerData = __workerData__ as WorkerData; +let workerData: WorkerData; +try { + workerData = { + debug: process.env.SEYFERT_WORKER_DEBUG === 'true', + intents: Number.parseInt(process.env.SEYFERT_WORKER_INTENTS!), + path: process.env.SEYFERT_WORKER_PATH!, + shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number.parseInt(id)), + token: process.env.SEYFERT_WORKER_TOKEN!, + workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!), + workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true', + } as WorkerData; +} catch {} export class WorkerClient extends BaseClient { private __handleGuilds?: Set = new Set(); @@ -43,13 +55,17 @@ export class WorkerClient extends BaseClient { constructor(options?: WorkerClientOptions) { super(options); - if (!manager) { + if (!process.env.SEYFERT_SPAWNING) { throw new Error('WorkerClient cannot spawn without manager'); } - manager.on('message', data => this.handleManagerMessages(data)); + this.postMessage({ + type: 'WORKER_START', + workerId: workerData.workerId, + } satisfies WorkerStart); + (manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data)); this.setServices({ cache: { - adapter: new WorkerAdapter(manager), + adapter: new WorkerAdapter(manager ?? process, workerData), disabledCache: options?.disabledCache, }, }); @@ -96,6 +112,11 @@ export class WorkerClient extends BaseClient { } } + postMessage(body: any) { + if (manager) return manager.postMessage(body); + return process.send!(body); + } + protected async handleManagerMessages(data: ManagerMessages) { switch (data.type) { case 'CACHE_RESULT': @@ -118,7 +139,7 @@ export class WorkerClient extends BaseClient { ...data, } satisfies GatewaySendPayload); - manager!.postMessage({ + this.postMessage({ type: 'RESULT_PAYLOAD', nonce: data.nonce, workerId: this.workerId, @@ -138,9 +159,9 @@ export class WorkerClient extends BaseClient { break; case 'SPAWN_SHARDS': { - const cache = this.cache; const onPacket = this.onPacket.bind(this); const handlePayload = this.options?.handlePayload?.bind(this); + const self = this; for (const id of workerData.shards) { let shard = this.shards.get(id); if (!shard) { @@ -152,9 +173,9 @@ export class WorkerClient extends BaseClient { debugger: this.debugger, async handlePayload(shardId, payload) { await handlePayload?.(shardId, payload); - await cache.onPacket(payload); + await self.cache.onPacket(payload); await onPacket?.(payload, shardId); - manager!.postMessage({ + self.postMessage({ workerId: workerData.workerId, shardId, type: 'RECEIVE_PAYLOAD', @@ -165,7 +186,7 @@ export class WorkerClient extends BaseClient { this.shards.set(id, shard); } - manager!.postMessage({ + this.postMessage({ type: 'CONNECT_QUEUE', shardId: id, workerId: workerData.workerId, @@ -181,7 +202,7 @@ export class WorkerClient extends BaseClient { return; } - manager!.postMessage({ + this.postMessage({ ...generateShardInfo(shard), nonce: data.nonce, type: 'SHARD_INFO', @@ -191,7 +212,7 @@ export class WorkerClient extends BaseClient { break; case 'WORKER_INFO': { - manager!.postMessage({ + this.postMessage({ shards: [...this.shards.values()].map(generateShardInfo), workerId: workerData.workerId, type: 'WORKER_INFO', @@ -222,7 +243,7 @@ export class WorkerClient extends BaseClient { } catch (e) { result = e; } - manager!.postMessage({ + this.postMessage({ type: 'EVAL_RESPONSE', response: result, workerId: workerData.workerId, @@ -270,7 +291,7 @@ export class WorkerClient extends BaseClient { tellWorker(workerId: number, func: (_: this) => {}) { const nonce = this.generateNonce(); - manager!.postMessage({ + this.postMessage({ type: 'EVAL', func: func.toString(), toWorkerId: workerId, @@ -307,7 +328,7 @@ export class WorkerClient extends BaseClient { !((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds) ) { if ([...this.shards.values()].every(shard => shard.data.session_id)) { - manager!.postMessage({ + this.postMessage({ type: 'WORKER_READY', workerId: this.workerId, } as WorkerReady); @@ -327,7 +348,7 @@ export class WorkerClient extends BaseClient { if (this.__handleGuilds?.has(packet.d.id)) { this.__handleGuilds.delete(packet.d.id); if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) { - manager!.postMessage({ + this.postMessage({ type: 'WORKER_READY', workerId: this.workerId, } as WorkerReady); diff --git a/src/collection.ts b/src/collection.ts index 42ecb05..576aa68 100644 --- a/src/collection.ts +++ b/src/collection.ts @@ -216,7 +216,7 @@ export interface LimitedCollectionOptions { export class LimitedCollection { static readonly default: LimitedCollectionOptions = { resetOnDemand: false, - limit: Infinity, + limit: Number.POSITIVE_INFINITY, expire: 0, }; @@ -262,7 +262,7 @@ export class LimitedCollection { } } - if (this.closer!.expireOn === expireOn) { + if (this.closer?.expireOn === expireOn) { this.resetTimeout(); } } diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 9da52e1..7036b37 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -39,6 +39,8 @@ export interface ShardManagerOptions extends ShardDetails { } export interface WorkerManagerOptions extends Omit { + mode: 'threads' | 'clusters'; + workers?: number; /** diff --git a/src/websocket/discord/worker.ts b/src/websocket/discord/worker.ts index 88c5546..7811855 100644 --- a/src/websocket/discord/worker.ts +++ b/src/websocket/discord/worker.ts @@ -46,6 +46,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage< export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>; export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>; export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>; +export type WorkerStart = CreateWorkerMessage<'WORKER_START'>; export type WorkerSendApiRequest = CreateWorkerMessage< 'WORKER_API_REQUEST', { @@ -90,4 +91,5 @@ export type WorkerMessage = | WorkerSendApiRequest | WorkerExecuteEval | WorkerSendEvalResponse - | WorkerSendEval; + | WorkerSendEval + | WorkerStart; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index b1c6a6e..23b9efe 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -1,5 +1,6 @@ +import cluster, { type Worker as ClusterWorker } from 'node:cluster'; import { randomUUID } from 'node:crypto'; -import { Worker } from 'node:worker_threads'; +import { Worker as ThreadWorker } from 'node:worker_threads'; import { ApiHandler, Router } from '../..'; import { MemoryAdapter, type Adapter } from '../../cache'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; @@ -16,9 +17,9 @@ import { ConnectQueue } from '../structures/timeout'; import { MemberUpdateHandler } from './events/memberUpdate'; import { PresenceUpdateHandler } from './events/presenceUpdate'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; -import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; +import type { WorkerInfo, WorkerMessage, WorkerShardInfo, WorkerStart } from './worker'; -export class WorkerManager extends Map { +export class WorkerManager extends Map { options!: Required; debugger?: Logger; connectQueue!: ConnectQueue; @@ -120,6 +121,19 @@ export class WorkerManager extends Map { return chunks; } + postMessage(id: number, body: any) { + const worker = this.get(id); + if (!worker) return this.debugger?.error(`Worker ${id} doesnt exists.`); + switch (this.options.mode) { + case 'clusters': + (worker as ClusterWorker).send(body); + break; + case 'threads': + (worker as ThreadWorker).postMessage(body); + break; + } + } + async prepareWorkers(shards: number[][]) { for (let i = 0; i < shards.length; i++) { let worker = this.get(i); @@ -135,23 +149,47 @@ export class WorkerManager extends Map { }); this.set(i, worker); } - worker.postMessage({ - type: 'SPAWN_SHARDS', - compress: this.options.compress ?? false, - info: { - ...this.options.info, - shards: this.totalShards, - }, - properties: this.options.properties, - } satisfies ManagerSpawnShards); + const listener = (message: WorkerStart) => { + if (message.type !== 'WORKER_START') return; + worker!.removeListener('message', listener); + this.postMessage(i, { + type: 'SPAWN_SHARDS', + compress: this.options.compress ?? false, + info: { + ...this.options.info, + shards: this.totalShards, + }, + properties: this.options.properties, + } satisfies ManagerSpawnShards); + }; + worker.on('message', listener); } } createWorker(workerData: WorkerData) { - const worker = new Worker(workerData.path, { workerData }); - worker.on('message', data => this.handleWorkerMessage(data)); - - return worker; + const env: Record = { + SEYFERT_SPAWNING: 'true', + }; + for (const i in workerData) { + env[`SEYFERT_WORKER_${i.toUpperCase()}`] = workerData[i as keyof WorkerData]; + } + switch (this.options.mode) { + case 'threads': { + const worker = new ThreadWorker(workerData.path, { + env, + }); + worker.on('message', data => this.handleWorkerMessage(data)); + return worker; + } + case 'clusters': { + cluster.setupPrimary({ + exec: workerData.path, + }); + const worker = cluster.fork(env); + worker.on('message', data => this.handleWorkerMessage(data)); + return worker; + } + } } spawn(workerId: number, shardId: number) { @@ -161,8 +199,7 @@ export class WorkerManager extends Map { this.debugger?.fatal("Trying spawn with worker doesn't exist"); return; } - - worker.postMessage({ + this.postMessage(workerId, { type: 'ALLOW_CONNECT', shardId, presence: this.options.presence?.(shardId, workerId), @@ -183,7 +220,7 @@ export class WorkerManager extends Map { } // @ts-expect-error const result = await this.cacheAdapter[message.method](...message.args); - worker.postMessage({ + this.postMessage(message.workerId, { type: 'CACHE_RESULT', nonce: message.nonce, result, @@ -246,7 +283,7 @@ export class WorkerManager extends Map { { this.get(message.workerId)!.ready = true; if ([...this.values()].every(w => w.ready)) { - this.get(this.keys().next().value)?.postMessage({ + this.postMessage(this.keys().next().value, { type: 'BOT_READY', } satisfies ManagerSendBotReady); this.forEach(w => { @@ -258,7 +295,7 @@ export class WorkerManager extends Map { case 'WORKER_API_REQUEST': { const response = await this.rest.request(message.method, message.url, message.requestOptions); - this.get(message.workerId)!.postMessage({ + this.postMessage(message.workerId, { nonce: message.nonce, response, type: 'API_RESPONSE', @@ -280,14 +317,14 @@ export class WorkerManager extends Map { case 'EVAL': { const nonce = this.generateNonce(); - this.get(message.toWorkerId)!.postMessage({ + this.postMessage(message.toWorkerId, { nonce, func: message.func, type: 'EXECUTE_EVAL', toWorkerId: message.toWorkerId, } satisfies ManagerExecuteEval); this.generateSendPromise(nonce, 'Eval timeout').then(val => - this.get(message.workerId)!.postMessage({ + this.postMessage(message.workerId, { nonce: message.nonce, response: val, type: 'EVAL_RESPONSE', @@ -334,7 +371,7 @@ export class WorkerManager extends Map { const nonce = this.generateNonce(); - worker.postMessage({ + this.postMessage(workerId, { type: 'SEND_PAYLOAD', shardId, nonce, @@ -354,7 +391,7 @@ export class WorkerManager extends Map { const nonce = this.generateNonce(false); - worker.postMessage({ shardId, nonce, type: 'SHARD_INFO' } satisfies ManagerRequestShardInfo); + this.postMessage(workerId, { shardId, nonce, type: 'SHARD_INFO' } satisfies ManagerRequestShardInfo); return this.generateSendPromise(nonce, 'Get shard info timeout'); } @@ -368,7 +405,7 @@ export class WorkerManager extends Map { const nonce = this.generateNonce(); - worker.postMessage({ nonce, type: 'WORKER_INFO' } satisfies ManagerRequestWorkerInfo); + this.postMessage(workerId, { nonce, type: 'WORKER_INFO' } satisfies ManagerRequestWorkerInfo); return this.generateSendPromise(nonce, 'Get worker info timeout'); } @@ -386,7 +423,7 @@ export class WorkerManager extends Map { debug: this.options.debug, }); this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get(); - this.options.shardEnd ??= this.options.info.shards; + this.options.shardEnd ??= this.options.totalShards ?? this.options.info.shards; this.options.totalShards ??= this.options.shardEnd; this.options = MergeOptions>(WorkerManagerDefaults, this.options); this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker);