feat: implement heartbeater for managing worker heartbeat messages

This commit is contained in:
MARCROCK22 2025-05-17 12:58:39 -04:00
parent 0d8ad177b7
commit c20f2fd0a3
5 changed files with 104 additions and 27 deletions

View File

@ -2,6 +2,7 @@ import { type UUID, randomUUID } from 'node:crypto';
import { ApiHandler, Logger } from '..'; import { ApiHandler, Logger } from '..';
import { WorkerAdapter } from '../cache'; import { WorkerAdapter } from '../cache';
import { import {
type Awaitable,
type DeepPartial, type DeepPartial,
LogLevels, LogLevels,
type MakeRequired, type MakeRequired,
@ -13,6 +14,7 @@ import { EventHandler } from '../events';
import type { GatewayDispatchPayload, GatewaySendPayload } from '../types'; import type { GatewayDispatchPayload, GatewaySendPayload } from '../types';
import { Shard, type ShardManagerOptions, ShardSocketCloseCodes, type WorkerData, properties } from '../websocket'; import { Shard, type ShardManagerOptions, ShardSocketCloseCodes, type WorkerData, properties } from '../websocket';
import type { import type {
ClientHeartbeaterMessages,
WorkerDisconnectedAllShardsResharding, WorkerDisconnectedAllShardsResharding,
WorkerMessages, WorkerMessages,
WorkerReady, WorkerReady,
@ -37,6 +39,7 @@ import type { Client, ClientOptions } from './client';
import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate'; import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate';
import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate'; import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate';
import type { WorkerHeartbeaterMessages } from '../websocket/discord/heartbeater';
import type { ShardData } from '../websocket/discord/shared'; import type { ShardData } from '../websocket/discord/shared';
import { Collectors } from './collectors'; import { Collectors } from './collectors';
import { type ClientUserStructure, Transformers } from './transformers'; import { type ClientUserStructure, Transformers } from './transformers';
@ -173,13 +176,19 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
} }
} }
postMessage(body: WorkerMessages): unknown { postMessage(body: WorkerMessages | ClientHeartbeaterMessages): unknown {
if (manager) return manager.postMessage(body); if (manager) return manager.postMessage(body);
return process.send!(body); return process.send!(body);
} }
async handleManagerMessages(data: ManagerMessages) { async handleManagerMessages(data: ManagerMessages | WorkerHeartbeaterMessages) {
switch (data.type) { switch (data.type) {
case 'HEARTBEAT':
this.postMessage({
type: 'ACK_HEARTBEAT',
workerId: workerData.workerId,
});
break;
case 'CACHE_RESULT': case 'CACHE_RESULT':
if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) { if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) {
const cacheData = this.cache.adapter.promises.get(data.nonce)!; const cacheData = this.cache.adapter.promises.get(data.nonce)!;
@ -570,8 +579,8 @@ export interface WorkerClientOptions extends BaseClientOptions {
commands?: NonNullable<Client['options']>['commands']; commands?: NonNullable<Client['options']>['commands'];
handlePayload?: ShardManagerOptions['handlePayload']; handlePayload?: ShardManagerOptions['handlePayload'];
gateway?: ClientOptions['gateway']; gateway?: ClientOptions['gateway'];
postMessage?: (body: unknown) => unknown; postMessage?: (body: unknown) => Awaitable<unknown>;
/** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */ /** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */
sendPayloadToParent?: boolean; sendPayloadToParent?: boolean;
handleManagerMessages?(message: ManagerMessages): any; handleManagerMessages?(message: ManagerMessages | WorkerHeartbeaterMessages): Awaitable<unknown>;
} }

View File

@ -0,0 +1,43 @@
import type { Awaitable } from '../../common';
export type WorkerHeartbeaterMessages = SendHeartbeat;
export type CreateHeartbeaterMessage<T extends string, D extends object = object> = { type: T } & D;
export type SendHeartbeat = CreateHeartbeaterMessage<'HEARTBEAT'>;
export class Heartbeater {
store = new Map<
number,
{
ack: boolean;
interval: NodeJS.Timeout;
}
>();
constructor(
public sendMethod: (workerId: number, data: WorkerHeartbeaterMessages) => Awaitable<void>,
public interval: number,
) {}
register(workerId: number, recreate: (workerId: number) => Awaitable<void>) {
if (this.interval <= 0) return;
this.store.set(workerId, {
ack: true,
interval: setInterval(() => {
const heartbeat = this.store.get(workerId)!;
if (!heartbeat.ack) {
heartbeat.ack = true;
return recreate(workerId);
}
heartbeat.ack = false;
this.sendMethod(workerId, { type: 'HEARTBEAT' });
}, this.interval),
});
}
acknowledge(workerId: number) {
const heartbeat = this.store.get(workerId);
if (!heartbeat) return;
heartbeat.ack = true;
}
}

View File

@ -69,6 +69,9 @@ export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handleP
workerProxy?: boolean; workerProxy?: boolean;
/** @default 15000 */
heartbeaterInterval?: number;
path: string; path: string;
handlePayload?(shardId: number, workerId: number, packet: GatewayDispatchPayload): any; handlePayload?(shardId: number, workerId: number, packet: GatewayDispatchPayload): any;

View File

@ -114,7 +114,12 @@ export type CustomWorkerClientMessages = {
>; >;
}; };
export type ClientHeartbeaterMessages = ACKHeartbeat;
export type ACKHeartbeat = CreateWorkerMessage<'ACK_HEARTBEAT'>;
export type WorkerMessages = export type WorkerMessages =
| ClientHeartbeaterMessages
| { | {
[K in BaseWorkerMessage['type']]: Identify<Extract<BaseWorkerMessage, { type: K }>>; [K in BaseWorkerMessage['type']]: Identify<Extract<BaseWorkerMessage, { type: K }>>;
}[BaseWorkerMessage['type']] }[BaseWorkerMessage['type']]

View File

@ -9,6 +9,7 @@ import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBo
import { WorkerManagerDefaults, properties } from '../constants'; import { WorkerManagerDefaults, properties } from '../constants';
import { DynamicBucket } from '../structures'; import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout'; import { ConnectQueue } from '../structures/timeout';
import { Heartbeater, type WorkerHeartbeaterMessages } from './heartbeater';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessages, WorkerShardInfo } from './worker'; import type { WorkerInfo, WorkerMessages, WorkerShardInfo } from './worker';
@ -55,6 +56,7 @@ export class WorkerManager extends Map<
rest!: ApiHandler; rest!: ApiHandler;
reshardingWorkerQueue: (() => void)[] = []; reshardingWorkerQueue: (() => void)[] = [];
private _info?: RESTGetAPIGatewayBotResult; private _info?: RESTGetAPIGatewayBotResult;
heartbeater: Heartbeater;
constructor( constructor(
options: Omit< options: Omit<
@ -75,6 +77,8 @@ export class WorkerManager extends Map<
return oldFn(message); return oldFn(message);
}; };
} }
this.heartbeater = new Heartbeater(this.postMessage.bind(this), options.heartbeaterInterval ?? 15e3);
} }
setCache(adapter: Adapter) { setCache(adapter: Adapter) {
@ -144,12 +148,12 @@ export class WorkerManager extends Map<
return workerId; return workerId;
} }
postMessage(id: number, body: ManagerMessages) { postMessage(id: number, body: ManagerMessages | WorkerHeartbeaterMessages) {
const worker = this.get(id); const worker = this.get(id);
if (!worker) return this.debugger?.error(`Worker ${id} does not exists.`); if (!worker) return this.debugger?.error(`Worker ${id} does not exists.`);
switch (this.options.mode) { switch (this.options.mode) {
case 'clusters': case 'clusters':
(worker as ClusterWorker).send(body); if ((worker as ClusterWorker).isConnected()) (worker as ClusterWorker).send(body);
break; break;
case 'threads': case 'threads':
(worker as import('worker_threads').Worker).postMessage(body); (worker as import('worker_threads').Worker).postMessage(body);
@ -160,33 +164,40 @@ export class WorkerManager extends Map<
} }
} }
prepareWorkers(shards: number[][], resharding = false) { prepareWorkers(shards: number[][], rawResharding = false) {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads'); const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.'); if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.');
for (let i = 0; i < shards.length; i++) { for (let i = 0; i < shards.length; i++) {
const registerWorker = (resharding: boolean) => {
const worker = this.createWorker({
path: this.options.path,
debug: this.options.debug,
token: this.options.token,
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: resharding ? this._info!.shards : this.totalShards,
mode: this.options.mode,
resharding,
totalWorkers: shards.length,
info: {
...this.options.info,
shards: this.totalShards,
},
compress: this.options.compress,
});
this.set(i, worker);
};
const workerExists = this.has(i); const workerExists = this.has(i);
if (resharding || !workerExists) { if (rawResharding || !workerExists) {
this[resharding ? 'reshardingWorkerQueue' : 'workerQueue'].push(() => { this[rawResharding ? 'reshardingWorkerQueue' : 'workerQueue'].push(() => {
const worker = this.createWorker({ registerWorker(rawResharding);
path: this.options.path, this.heartbeater.register(i, () => {
debug: this.options.debug, this.delete(i);
token: this.options.token, registerWorker(false);
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
totalShards: resharding ? this._info!.shards : this.totalShards,
mode: this.options.mode,
resharding,
totalWorkers: shards.length,
info: {
...this.options.info,
shards: this.totalShards,
},
compress: this.options.compress,
}); });
this.set(i, worker);
}); });
} }
} }
@ -218,6 +229,9 @@ export class WorkerManager extends Map<
env, env,
}); });
worker.on('message', data => this.handleWorkerMessage(data)); worker.on('message', data => this.handleWorkerMessage(data));
worker.on('error', err => {
this.debugger?.error(`[Worker #${workerData.workerId}]`, err);
});
return worker; return worker;
} }
case 'clusters': { case 'clusters': {
@ -254,6 +268,9 @@ export class WorkerManager extends Map<
async handleWorkerMessage(message: WorkerMessages) { async handleWorkerMessage(message: WorkerMessages) {
switch (message.type) { switch (message.type) {
case 'ACK_HEARTBEAT':
this.heartbeater.acknowledge(message.workerId);
break;
case 'WORKER_READY_RESHARDING': case 'WORKER_READY_RESHARDING':
{ {
this.get(message.workerId)!.resharded = true; this.get(message.workerId)!.resharded = true;