diff --git a/src/cache/adapters/workeradapter.ts b/src/cache/adapters/workeradapter.ts index 3b9e47d..a722709 100644 --- a/src/cache/adapters/workeradapter.ts +++ b/src/cache/adapters/workeradapter.ts @@ -15,7 +15,7 @@ export class WorkerAdapter implements Adapter { if (worker_threads?.parentPort) parentPort = worker_threads.parentPort; } - postMessage(body: any) { + postMessage(body: any): unknown { if (parentPort) return parentPort.postMessage(body); return process.send!(body); } diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 62bf94c..35e8de6 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -25,6 +25,8 @@ import type { Client, ClientOptions } from './client'; import { Collectors } from './collectors'; import { type ClientUserStructure, Transformers } from './transformers'; +import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate'; +import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate'; let workerData: WorkerData; let manager: import('node:worker_threads').MessagePort; @@ -37,15 +39,16 @@ try { token: process.env.SEYFERT_WORKER_TOKEN!, workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!), workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true', + totalShards: Number(process.env.SEYFERT_WORKER_TOTALSHARDS), + mode: process.env.SEYFERT_WORKER_MODE, } as WorkerData; } catch {} export class WorkerClient extends BaseClient { private __handleGuilds?: Set = new Set(); - logger = new Logger({ - name: `[Worker #${workerData.workerId}]`, - }); + memberUpdateHandler = new MemberUpdateHandler(); + presenceUpdateHandler = new PresenceUpdateHandler(); collectors = new Collectors(); events? = new EventHandler(this); me!: When; @@ -57,41 +60,8 @@ export class WorkerClient extends BaseClient { constructor(options?: WorkerClientOptions) { super(options); - - if (!process.env.SEYFERT_SPAWNING) { - throw new Error('WorkerClient cannot spawn without manager'); - } - this.postMessage({ - type: 'WORKER_START', - workerId: workerData.workerId, - } satisfies WorkerStart); - - const worker_threads = lazyLoadPackage('node:worker_threads'); - if (worker_threads?.parentPort) { - manager = worker_threads?.parentPort; - } - (manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data)); - - this.setServices({ - cache: { - adapter: new WorkerAdapter(workerData), - disabledCache: options?.disabledCache, - }, - }); - if (workerData.debug) { - this.debugger = new Logger({ - name: `[Worker #${workerData.workerId}]`, - logLevel: LogLevels.Debug, - }); - } - if (workerData.workerProxy) { - this.setServices({ - rest: new ApiHandler({ - token: workerData.token, - workerProxy: true, - debug: workerData.debug, - }), - }); + if (options?.postMessage) { + this.postMessage = options.postMessage; } } @@ -126,7 +96,54 @@ export class WorkerClient extends BaseClient { } } + setWorkerData(data: WorkerData) { + workerData = data; + } + async start(options: Omit, 'httpConnection' | 'token' | 'connection'> = {}) { + const worker_threads = lazyLoadPackage('node:worker_threads'); + + if (worker_threads?.parentPort) { + manager = worker_threads?.parentPort; + } + + if (workerData.mode !== 'custom') + (manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data)); + + this.logger = new Logger({ + name: `[Worker #${workerData.workerId}]`, + }); + + const adapter = new WorkerAdapter(workerData); + if (this.options.postMessage) { + adapter.postMessage = this.options.postMessage; + } + this.setServices({ + cache: { + adapter, + disabledCache: this.options.disabledCache, + }, + }); + + if (workerData.debug) { + this.debugger = new Logger({ + name: `[Worker #${workerData.workerId}]`, + logLevel: LogLevels.Debug, + }); + } + if (workerData.workerProxy) { + this.setServices({ + rest: new ApiHandler({ + token: workerData.token, + workerProxy: true, + debug: workerData.debug, + }), + }); + } + this.postMessage({ + type: 'WORKER_START', + workerId: workerData.workerId, + } satisfies WorkerStart); await super.start(options); await this.loadEvents(options.eventsDir); this.cache.intents = workerData.intents; @@ -140,12 +157,12 @@ export class WorkerClient extends BaseClient { } } - postMessage(body: any) { + postMessage(body: unknown): unknown { if (manager) return manager.postMessage(body); return process.send!(body); } - protected async handleManagerMessages(data: ManagerMessages) { + async handleManagerMessages(data: ManagerMessages) { switch (data.type) { case 'CACHE_RESULT': if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) { @@ -329,6 +346,23 @@ export class WorkerClient extends BaseClient { this.collectors.run('RAW', packet), ]); //ignore promise switch (packet.t) { + //// Cases where we must obtain the old data before updating + case 'GUILD_MEMBER_UPDATE': + { + if (!this.memberUpdateHandler.check(packet.d)) { + return; + } + await this.events?.execute(packet.t, packet, this as WorkerClient, shardId); + } + break; + case 'PRESENCE_UPDATE': + { + if (!this.presenceUpdateHandler.check(packet.d)) { + return; + } + await this.events?.execute(packet.t, packet, this as WorkerClient, shardId); + } + break; case 'GUILD_CREATE': { if (this.__handleGuilds?.has(packet.d.id)) { this.__handleGuilds.delete(packet.d.id); @@ -348,6 +382,12 @@ export class WorkerClient extends BaseClient { default: { await this.events?.execute(packet.t as never, packet, this, shardId); switch (packet.t) { + case 'INTERACTION_CREATE': + await this.handleCommand.interaction(packet.d, shardId); + break; + case 'MESSAGE_CREATE': + await this.handleCommand.message(packet.d, shardId); + break; case 'READY': if (!this.__handleGuilds) this.__handleGuilds = new Set(); for (const g of packet.d.guilds) { @@ -370,13 +410,7 @@ export class WorkerClient extends BaseClient { } delete this.__handleGuilds; } - this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`); - break; - case 'INTERACTION_CREATE': - await this.handleCommand.interaction(packet.d, shardId); - break; - case 'MESSAGE_CREATE': - await this.handleCommand.message(packet.d, shardId); + this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`); break; } break; @@ -395,8 +429,9 @@ export function generateShardInfo(shard: Shard): WorkerShardInfo { } interface WorkerClientOptions extends BaseClientOptions { - disabledCache: Cache['disabledCache']; + disabledCache?: Cache['disabledCache']; commands?: NonNullable['commands']; handlePayload?: ShardManagerOptions['handlePayload']; gateway?: ClientOptions['gateway']; + postMessage?: (body: unknown) => unknown; } diff --git a/src/websocket/discord/events/memberUpdate.ts b/src/websocket/discord/events/memberUpdate.ts index 52dcfa5..cbb38b1 100644 --- a/src/websocket/discord/events/memberUpdate.ts +++ b/src/websocket/discord/events/memberUpdate.ts @@ -33,6 +33,7 @@ export class MemberUpdateHandler { membersEquals(old: GatewayGuildMemberUpdateDispatchData, member: GatewayGuildMemberUpdateDispatchData) { return ( + old.guild_id === member.guild_id && old.joined_at === member.joined_at && old.nick === member.nick && old.avatar === member.avatar && diff --git a/src/websocket/discord/events/presenceUpdate.ts b/src/websocket/discord/events/presenceUpdate.ts index c3a7ae2..7cd9fc9 100644 --- a/src/websocket/discord/events/presenceUpdate.ts +++ b/src/websocket/discord/events/presenceUpdate.ts @@ -33,7 +33,6 @@ export class PresenceUpdateHandler { presenceEquals(oldPresence: GatewayPresenceUpdateDispatchData, newPresence: GatewayPresenceUpdateDispatchData) { return ( - newPresence && oldPresence.status === newPresence.status && oldPresence.activities?.length === newPresence.activities?.length && oldPresence.activities?.every((activity, index) => diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 1acd13b..905153e 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -4,7 +4,7 @@ import type { GatewayIntentBits, GatewayPresenceUpdateData, } from 'discord-api-types/v10'; -import type { Logger } from '../../common'; +import type { Awaitable, DeepPartial, Logger } from '../../common'; import type { IdentifyProperties } from '../constants'; export interface ShardManagerOptions extends ShardDetails { @@ -38,8 +38,15 @@ export interface ShardManagerOptions extends ShardDetails { compress?: boolean; } -export interface WorkerManagerOptions extends Omit { - mode: 'threads' | 'clusters'; +export interface CustomManagerAdapter { + postMessage(workerId: number, body: unknown): Awaitable; + spawn(workerData: WorkerData, env: Record): Awaitable; +} + +export interface WorkerManagerOptions extends Omit { + mode: 'threads' | 'clusters' | 'custom'; + + adapter?: CustomManagerAdapter; workers?: number; @@ -53,6 +60,8 @@ export interface WorkerManagerOptions extends Omit>; } export interface ShardData { @@ -117,6 +126,8 @@ export interface WorkerData { token: string; path: string; shards: number[]; + totalShards: number; + mode: 'custom' | 'clusters' | 'threads'; workerId: number; debug: boolean; workerProxy: boolean; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index ca392c2..3cc2d05 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -5,25 +5,21 @@ import { ApiHandler, Logger, Router } from '../..'; import { MemoryAdapter, type Adapter } from '../../cache'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; import { MergeOptions, lazyLoadPackage, type MakePartial } from '../../common'; -import { WorkerManagerDefaults } from '../constants'; +import { WorkerManagerDefaults, properties } from '../constants'; import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; -import { MemberUpdateHandler } from './events/memberUpdate'; -import { PresenceUpdateHandler } from './events/presenceUpdate'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; -import type { WorkerInfo, WorkerMessage, WorkerShardInfo, WorkerStart } from './worker'; +import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; export class WorkerManager extends Map< number, - (ClusterWorker | import('node:worker_threads').Worker) & { ready?: boolean } + (ClusterWorker | import('node:worker_threads').Worker | { ready: boolean }) & { ready?: boolean } > { - options!: Required; + options!: MakePartial, 'adapter'>; debugger?: Logger; connectQueue!: ConnectQueue; cacheAdapter: Adapter; promises = new Map void; timeout: NodeJS.Timeout }>(); - memberUpdateHandler = new MemberUpdateHandler(); - presenceUpdateHandler = new PresenceUpdateHandler(); rest!: ApiHandler; constructor(options: MakePartial) { super(); @@ -128,10 +124,16 @@ export class WorkerManager extends Map< case 'threads': (worker as import('worker_threads').Worker).postMessage(body); break; + case 'custom': + this.options.adapter!.postMessage(id, body); + break; } } async prepareWorkers(shards: number[][]) { + const worker_threads = lazyLoadPackage('node:worker_threads'); + if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.'); + for (let i = 0; i < shards.length; i++) { let worker = this.get(i); if (!worker) { @@ -143,23 +145,11 @@ export class WorkerManager extends Map< intents: this.options.intents, workerId: i, workerProxy: this.options.workerProxy, + totalShards: this.totalShards, + mode: this.options.mode, }); this.set(i, worker); } - const listener = (message: WorkerStart) => { - if (message.type !== 'WORKER_START') return; - worker!.removeListener('message', listener); - this.postMessage(i, { - type: 'SPAWN_SHARDS', - compress: this.options.compress ?? false, - info: { - ...this.options.info, - shards: this.totalShards, - }, - properties: this.options.properties, - } satisfies ManagerSpawnShards); - }; - worker.on('message', listener); } } @@ -188,12 +178,17 @@ export class WorkerManager extends Map< worker.on('message', data => this.handleWorkerMessage(data)); return worker; } + case 'custom': + this.options.adapter!.spawn(workerData, env); + return { + ready: false, + }; } } spawn(workerId: number, shardId: number) { this.connectQueue.push(() => { - const worker = this.get(workerId); + const worker = this.has(workerId); if (!worker) { this.debugger?.fatal("Trying spawn with worker doesn't exist"); return; @@ -208,12 +203,28 @@ export class WorkerManager extends Map< async handleWorkerMessage(message: WorkerMessage) { switch (message.type) { + case 'WORKER_START': + { + this.postMessage(message.workerId, { + type: 'SPAWN_SHARDS', + compress: this.options.compress ?? false, + info: { + ...this.options.info, + shards: this.totalShards, + }, + properties: { + ...properties, + ...this.options.properties, + }, + } satisfies ManagerSpawnShards); + } + break; case 'CONNECT_QUEUE': this.spawn(message.workerId, message.shardId); break; case 'CACHE_REQUEST': { - const worker = this.get(message.workerId); + const worker = this.has(message.workerId); if (!worker) { throw new Error('Invalid request from unavailable worker'); } @@ -227,21 +238,7 @@ export class WorkerManager extends Map< } break; case 'RECEIVE_PAYLOAD': - { - switch (message.payload.t) { - case 'GUILD_MEMBER_UPDATE': - if (!this.memberUpdateHandler.check(message.payload.d)) { - return; - } - break; - case 'PRESENCE_UPDATE': - if (!this.presenceUpdateHandler.check(message.payload.d)) { - return; - } - break; - } - this.options.handlePayload(message.shardId, message.workerId, message.payload); - } + await this.options.handlePayload(message.shardId, message.workerId, message.payload); break; case 'RESULT_PAYLOAD': { @@ -353,7 +350,7 @@ export class WorkerManager extends Map< async send(data: GatewaySendPayload, shardId: number) { const workerId = this.calculateWorkerId(shardId); - const worker = this.get(workerId); + const worker = this.has(workerId); if (!worker) { throw new Error(`Worker #${workerId} doesnt exist`); @@ -373,7 +370,7 @@ export class WorkerManager extends Map< async getShardInfo(shardId: number) { const workerId = this.calculateWorkerId(shardId); - const worker = this.get(workerId); + const worker = this.has(workerId); if (!worker) { throw new Error(`Worker #${workerId} doesnt exist`); @@ -387,7 +384,7 @@ export class WorkerManager extends Map< } async getWorkerInfo(workerId: number) { - const worker = this.get(workerId); + const worker = this.has(workerId); if (!worker) { throw new Error(`Worker #${workerId} doesnt exist`);