From c20f2fd0a3fd24bd2b00170c9833b9add7626f6a Mon Sep 17 00:00:00 2001 From: MARCROCK22 Date: Sat, 17 May 2025 12:58:39 -0400 Subject: [PATCH] feat: implement heartbeater for managing worker heartbeat messages --- src/client/workerclient.ts | 17 +++++-- src/websocket/discord/heartbeater.ts | 43 ++++++++++++++++++ src/websocket/discord/shared.ts | 3 ++ src/websocket/discord/worker.ts | 5 ++ src/websocket/discord/workermanager.ts | 63 ++++++++++++++++---------- 5 files changed, 104 insertions(+), 27 deletions(-) create mode 100644 src/websocket/discord/heartbeater.ts diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 4b54e0a..8b0ee64 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -2,6 +2,7 @@ import { type UUID, randomUUID } from 'node:crypto'; import { ApiHandler, Logger } from '..'; import { WorkerAdapter } from '../cache'; import { + type Awaitable, type DeepPartial, LogLevels, type MakeRequired, @@ -13,6 +14,7 @@ import { EventHandler } from '../events'; import type { GatewayDispatchPayload, GatewaySendPayload } from '../types'; import { Shard, type ShardManagerOptions, ShardSocketCloseCodes, type WorkerData, properties } from '../websocket'; import type { + ClientHeartbeaterMessages, WorkerDisconnectedAllShardsResharding, WorkerMessages, WorkerReady, @@ -37,6 +39,7 @@ import type { Client, ClientOptions } from './client'; import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate'; import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate'; +import type { WorkerHeartbeaterMessages } from '../websocket/discord/heartbeater'; import type { ShardData } from '../websocket/discord/shared'; import { Collectors } from './collectors'; import { type ClientUserStructure, Transformers } from './transformers'; @@ -173,13 +176,19 @@ export class WorkerClient extends BaseClient { } } - postMessage(body: WorkerMessages): unknown { + postMessage(body: WorkerMessages | ClientHeartbeaterMessages): unknown { if (manager) return manager.postMessage(body); return process.send!(body); } - async handleManagerMessages(data: ManagerMessages) { + async handleManagerMessages(data: ManagerMessages | WorkerHeartbeaterMessages) { switch (data.type) { + case 'HEARTBEAT': + this.postMessage({ + type: 'ACK_HEARTBEAT', + workerId: workerData.workerId, + }); + break; case 'CACHE_RESULT': if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) { const cacheData = this.cache.adapter.promises.get(data.nonce)!; @@ -570,8 +579,8 @@ export interface WorkerClientOptions extends BaseClientOptions { commands?: NonNullable['commands']; handlePayload?: ShardManagerOptions['handlePayload']; gateway?: ClientOptions['gateway']; - postMessage?: (body: unknown) => unknown; + postMessage?: (body: unknown) => Awaitable; /** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */ sendPayloadToParent?: boolean; - handleManagerMessages?(message: ManagerMessages): any; + handleManagerMessages?(message: ManagerMessages | WorkerHeartbeaterMessages): Awaitable; } diff --git a/src/websocket/discord/heartbeater.ts b/src/websocket/discord/heartbeater.ts new file mode 100644 index 0000000..9ba0257 --- /dev/null +++ b/src/websocket/discord/heartbeater.ts @@ -0,0 +1,43 @@ +import type { Awaitable } from '../../common'; + +export type WorkerHeartbeaterMessages = SendHeartbeat; + +export type CreateHeartbeaterMessage = { type: T } & D; + +export type SendHeartbeat = CreateHeartbeaterMessage<'HEARTBEAT'>; + +export class Heartbeater { + store = new Map< + number, + { + ack: boolean; + interval: NodeJS.Timeout; + } + >(); + constructor( + public sendMethod: (workerId: number, data: WorkerHeartbeaterMessages) => Awaitable, + public interval: number, + ) {} + + register(workerId: number, recreate: (workerId: number) => Awaitable) { + if (this.interval <= 0) return; + this.store.set(workerId, { + ack: true, + interval: setInterval(() => { + const heartbeat = this.store.get(workerId)!; + if (!heartbeat.ack) { + heartbeat.ack = true; + return recreate(workerId); + } + heartbeat.ack = false; + this.sendMethod(workerId, { type: 'HEARTBEAT' }); + }, this.interval), + }); + } + + acknowledge(workerId: number) { + const heartbeat = this.store.get(workerId); + if (!heartbeat) return; + heartbeat.ack = true; + } +} diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 223e4c7..9961d78 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -69,6 +69,9 @@ export interface WorkerManagerOptions extends Omit; }; +export type ClientHeartbeaterMessages = ACKHeartbeat; + +export type ACKHeartbeat = CreateWorkerMessage<'ACK_HEARTBEAT'>; + export type WorkerMessages = + | ClientHeartbeaterMessages | { [K in BaseWorkerMessage['type']]: Identify>; }[BaseWorkerMessage['type']] diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index 456589f..ed8dfb1 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -9,6 +9,7 @@ import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBo import { WorkerManagerDefaults, properties } from '../constants'; import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; +import { Heartbeater, type WorkerHeartbeaterMessages } from './heartbeater'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; import type { WorkerInfo, WorkerMessages, WorkerShardInfo } from './worker'; @@ -55,6 +56,7 @@ export class WorkerManager extends Map< rest!: ApiHandler; reshardingWorkerQueue: (() => void)[] = []; private _info?: RESTGetAPIGatewayBotResult; + heartbeater: Heartbeater; constructor( options: Omit< @@ -75,6 +77,8 @@ export class WorkerManager extends Map< return oldFn(message); }; } + + this.heartbeater = new Heartbeater(this.postMessage.bind(this), options.heartbeaterInterval ?? 15e3); } setCache(adapter: Adapter) { @@ -144,12 +148,12 @@ export class WorkerManager extends Map< return workerId; } - postMessage(id: number, body: ManagerMessages) { + postMessage(id: number, body: ManagerMessages | WorkerHeartbeaterMessages) { const worker = this.get(id); if (!worker) return this.debugger?.error(`Worker ${id} does not exists.`); switch (this.options.mode) { case 'clusters': - (worker as ClusterWorker).send(body); + if ((worker as ClusterWorker).isConnected()) (worker as ClusterWorker).send(body); break; case 'threads': (worker as import('worker_threads').Worker).postMessage(body); @@ -160,33 +164,40 @@ export class WorkerManager extends Map< } } - prepareWorkers(shards: number[][], resharding = false) { + prepareWorkers(shards: number[][], rawResharding = false) { const worker_threads = lazyLoadPackage('node:worker_threads'); if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.'); for (let i = 0; i < shards.length; i++) { + const registerWorker = (resharding: boolean) => { + const worker = this.createWorker({ + path: this.options.path, + debug: this.options.debug, + token: this.options.token, + shards: shards[i], + intents: this.options.intents, + workerId: i, + workerProxy: this.options.workerProxy, + totalShards: resharding ? this._info!.shards : this.totalShards, + mode: this.options.mode, + resharding, + totalWorkers: shards.length, + info: { + ...this.options.info, + shards: this.totalShards, + }, + compress: this.options.compress, + }); + this.set(i, worker); + }; const workerExists = this.has(i); - if (resharding || !workerExists) { - this[resharding ? 'reshardingWorkerQueue' : 'workerQueue'].push(() => { - const worker = this.createWorker({ - path: this.options.path, - debug: this.options.debug, - token: this.options.token, - shards: shards[i], - intents: this.options.intents, - workerId: i, - workerProxy: this.options.workerProxy, - totalShards: resharding ? this._info!.shards : this.totalShards, - mode: this.options.mode, - resharding, - totalWorkers: shards.length, - info: { - ...this.options.info, - shards: this.totalShards, - }, - compress: this.options.compress, + if (rawResharding || !workerExists) { + this[rawResharding ? 'reshardingWorkerQueue' : 'workerQueue'].push(() => { + registerWorker(rawResharding); + this.heartbeater.register(i, () => { + this.delete(i); + registerWorker(false); }); - this.set(i, worker); }); } } @@ -218,6 +229,9 @@ export class WorkerManager extends Map< env, }); worker.on('message', data => this.handleWorkerMessage(data)); + worker.on('error', err => { + this.debugger?.error(`[Worker #${workerData.workerId}]`, err); + }); return worker; } case 'clusters': { @@ -254,6 +268,9 @@ export class WorkerManager extends Map< async handleWorkerMessage(message: WorkerMessages) { switch (message.type) { + case 'ACK_HEARTBEAT': + this.heartbeater.acknowledge(message.workerId); + break; case 'WORKER_READY_RESHARDING': { this.get(message.workerId)!.resharded = true;