feat: custom worker events (#298)

* chore: apply formatting

* feat: custom worker events

* fix: xd

---------

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This commit is contained in:
MARCROCK22 2024-11-20 12:06:23 -04:00 committed by GitHub
parent 35bd23b160
commit 442752319c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 109 additions and 54 deletions

View File

@ -7,7 +7,7 @@ import type { GatewayDispatchPayload, GatewaySendPayload } from '../types';
import { Shard, type ShardManagerOptions, type WorkerData, properties } from '../websocket';
import type {
WorkerDisconnectedAllShardsResharding,
WorkerMessage,
WorkerMessages,
WorkerReady,
WorkerReadyResharding,
WorkerReceivePayload,
@ -78,6 +78,14 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
if (options?.postMessage) {
this.postMessage = options.postMessage;
}
if (this.options.handleManagerMessages) {
const oldFn = this.handleManagerMessages.bind(this);
this.handleManagerMessages = async message => {
await this.options.handleManagerMessages!(message);
return oldFn(message);
};
}
}
get workerId() {
@ -157,7 +165,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}
}
postMessage(body: WorkerMessage): unknown {
postMessage(body: WorkerMessages): unknown {
if (manager) return manager.postMessage(body);
return process.send!(body);
}
@ -593,4 +601,5 @@ interface WorkerClientOptions extends BaseClientOptions {
postMessage?: (body: unknown) => unknown;
/** can have perfomance issues in big bots if the client sends every event, specially in startup (false by default) */
sendPayloadToParent?: boolean;
handleManagerMessages?(message: ManagerMessages): any;
}

View File

@ -27,6 +27,8 @@ export interface DefaultLocale {}
export interface ExtendContext {}
export interface ExtraProps {}
export interface UsingClient extends BaseClient {}
export interface CustomWorkerClientEvents {}
export interface CustomWorkerManagerEvents {}
export interface ExtendedRC {}
export interface ExtendedRCLocations {}
export type ParseClient<T extends BaseClient> = T;

View File

@ -6,6 +6,7 @@ import type {
GatewayPresenceUpdateData,
} from '../../types';
import type { IdentifyProperties } from '../constants';
import type { WorkerMessages } from './worker';
export interface ShardManagerOptions extends ShardDetails {
/** Important data which is used by the manager to connect shards to the gateway. */
@ -69,6 +70,8 @@ export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handleP
handlePayload?(shardId: number, workerId: number, packet: GatewayDispatchPayload): any;
handleWorkerMessage?(message: WorkerMessages): any;
properties?: DeepPartial<NonNullable<ShardManagerOptions['properties']>>;
}

View File

@ -1,4 +1,5 @@
import type { ApiRequestOptions, HttpMethods } from '../..';
import type { ApiRequestOptions, CustomWorkerClientEvents, HttpMethods } from '../..';
import type { Identify } from '../../common';
import type { GatewayDispatchPayload } from '../../types';
export interface WorkerShardInfo {
@ -86,7 +87,7 @@ export type WorkerSendToWorkerEval = CreateWorkerMessage<
}
>;
export type WorkerMessage =
export type BaseWorkerMessage =
| WorkerRequestConnect
| WorkerReceivePayload
| WorkerSendResultPayload
@ -103,3 +104,18 @@ export type WorkerMessage =
| WorkerRequestConnectResharding
| WorkerReadyResharding
| WorkerDisconnectedAllShardsResharding;
export type CustomWorkerClientMessages = {
[K in keyof CustomWorkerClientEvents]: Identify<
{
type: K;
workerId: number;
} & Identify<CustomWorkerClientEvents[K] extends never ? {} : CustomWorkerClientEvents[K]>
>;
};
export type WorkerMessages =
| {
[K in BaseWorkerMessage['type']]: Identify<Extract<BaseWorkerMessage, { type: K }>>;
}[BaseWorkerMessage['type']]
| CustomWorkerClientMessages[keyof CustomWorkerClientMessages];

View File

@ -1,16 +1,16 @@
import cluster, { type Worker as ClusterWorker } from 'node:cluster';
import { type UUID, randomUUID } from 'node:crypto';
import type { Worker as WorkerThreadsWorker } from 'node:worker_threads';
import { ApiHandler, Logger, type UsingClient, type WorkerClient } from '../..';
import { ApiHandler, type CustomWorkerManagerEvents, Logger, type UsingClient, type WorkerClient } from '../..';
import { type Adapter, MemoryAdapter } from '../../cache';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import { BASE_HOST, type MakePartial, MergeOptions, lazyLoadPackage } from '../../common';
import { BASE_HOST, type Identify, type MakePartial, MergeOptions, lazyLoadPackage } from '../../common';
import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBotResult } from '../../types';
import { WorkerManagerDefaults, properties } from '../constants';
import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker';
import type { WorkerInfo, WorkerMessages, WorkerShardInfo } from './worker';
export class WorkerManager extends Map<
number,
@ -46,7 +46,7 @@ export class WorkerManager extends Map<
return chunks;
}
options: MakePartial<Required<WorkerManagerOptions>, 'adapter'>;
options: MakePartial<Required<WorkerManagerOptions>, 'adapter' | 'handleWorkerMessage' | 'handlePayload'>;
debugger?: Logger;
connectQueue!: ConnectQueue;
workerQueue: (() => void)[] = [];
@ -57,13 +57,24 @@ export class WorkerManager extends Map<
private _info?: RESTGetAPIGatewayBotResult;
constructor(
options: Omit<MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info' | 'handlePayload'>, 'resharding'> & {
options: Omit<
MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info' | 'handlePayload' | 'handleWorkerMessage'>,
'resharding'
> & {
resharding?: MakePartial<NonNullable<WorkerManagerOptions['resharding']>, 'getInfo'>;
},
) {
super();
this.options = options as WorkerManager['options'];
this.cacheAdapter = new MemoryAdapter();
if (this.options.handleWorkerMessage) {
const oldFn = this.handleWorkerMessage.bind(this);
this.handleWorkerMessage = async message => {
await this.options.handleWorkerMessage!(message);
return oldFn(message);
};
}
}
setCache(adapter: Adapter) {
@ -241,7 +252,7 @@ export class WorkerManager extends Map<
});
}
async handleWorkerMessage(message: WorkerMessage) {
async handleWorkerMessage(message: WorkerMessages) {
switch (message.type) {
case 'WORKER_READY_RESHARDING':
{
@ -340,7 +351,7 @@ export class WorkerManager extends Map<
}
break;
case 'RECEIVE_PAYLOAD':
await this.options.handlePayload(message.shardId, message.workerId, message.payload);
await this.options.handlePayload?.(message.shardId, message.workerId, message.payload);
break;
case 'RESULT_PAYLOAD':
{
@ -687,7 +698,7 @@ export type ManagerSendEvalResponse = CreateManagerMessage<
}
>;
export type ManagerMessages =
export type BaseManagerMessages =
| ManagerAllowConnect
| ManagerSpawnShards
| ManagerSendPayload
@ -704,3 +715,17 @@ export type ManagerMessages =
| DisconnectAllShardsResharding
| ConnnectAllShardsResharding
| ManagerExecuteEval;
export type CustomManagerMessages = {
[K in keyof CustomWorkerManagerEvents]: Identify<
{
type: K;
} & Identify<CustomWorkerManagerEvents[K] extends never ? {} : CustomWorkerManagerEvents[K]>
>;
};
export type ManagerMessages =
| {
[K in BaseManagerMessages['type']]: Identify<Extract<BaseManagerMessages, { type: K }>>;
}[BaseManagerMessages['type']]
| CustomManagerMessages[keyof CustomManagerMessages];