This commit is contained in:
MARCROCK22 2024-03-16 15:14:57 -04:00
parent f2d611028a
commit e63264bdf1
6 changed files with 120 additions and 50 deletions

View File

@ -1,5 +1,5 @@
import { randomUUID } from 'node:crypto';
import { parentPort, workerData, type MessagePort } from 'node:worker_threads';
import { parentPort, type MessagePort } from 'node:worker_threads';
import type { WorkerData } from '../../websocket';
import type { WorkerSendCacheRequest } from '../../websocket/discord/worker';
import type { Adapter } from './types';
@ -8,18 +8,26 @@ export class WorkerAdapter implements Adapter {
isAsync = true;
promises = new Map<string, { resolve: (value: unknown) => void; timeout: NodeJS.Timeout }>();
constructor(readonly parent: MessagePort) {}
constructor(
readonly parent: MessagePort | NodeJS.Process,
public workerData: WorkerData,
) {}
postMessage(body: any) {
if (parentPort) return parentPort.postMessage(body);
return process.send!(body);
}
protected send(method: WorkerSendCacheRequest['method'], ...args: any[]): Promise<any> {
const nonce = randomUUID();
if (this.promises.has(nonce)) return this.send(method, ...args);
parentPort!.postMessage({
this.postMessage({
type: 'CACHE_REQUEST',
args,
nonce,
method,
workerId: (workerData as WorkerData).workerId,
workerId: this.workerData.workerId,
} satisfies WorkerSendCacheRequest);
let resolve = (_: any) => {

View File

@ -1,5 +1,5 @@
import { randomUUID } from 'node:crypto';
import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads';
import { parentPort as manager } from 'node:worker_threads';
import { ApiHandler } from '..';
import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache';
@ -18,6 +18,7 @@ import type {
WorkerSendResultPayload,
WorkerSendShardInfo,
WorkerShardInfo,
WorkerStart,
} from '../websocket/discord/worker';
import type { ManagerMessages } from '../websocket/discord/workermanager';
import type { BaseClientOptions, StartOptions } from './base';
@ -26,7 +27,18 @@ import type { Client } from './client';
import { onInteractionCreate } from './oninteractioncreate';
import { onMessageCreate } from './onmessagecreate';
const workerData = __workerData__ as WorkerData;
let workerData: WorkerData;
try {
workerData = {
debug: process.env.SEYFERT_WORKER_DEBUG === 'true',
intents: Number.parseInt(process.env.SEYFERT_WORKER_INTENTS!),
path: process.env.SEYFERT_WORKER_PATH!,
shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number.parseInt(id)),
token: process.env.SEYFERT_WORKER_TOKEN!,
workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!),
workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true',
} as WorkerData;
} catch {}
export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: Set<string> = new Set();
@ -43,13 +55,17 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
constructor(options?: WorkerClientOptions) {
super(options);
if (!manager) {
if (!process.env.SEYFERT_SPAWNING) {
throw new Error('WorkerClient cannot spawn without manager');
}
manager.on('message', data => this.handleManagerMessages(data));
this.postMessage({
type: 'WORKER_START',
workerId: workerData.workerId,
} satisfies WorkerStart);
(manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data));
this.setServices({
cache: {
adapter: new WorkerAdapter(manager),
adapter: new WorkerAdapter(manager ?? process, workerData),
disabledCache: options?.disabledCache,
},
});
@ -96,6 +112,11 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}
}
postMessage(body: any) {
if (manager) return manager.postMessage(body);
return process.send!(body);
}
protected async handleManagerMessages(data: ManagerMessages) {
switch (data.type) {
case 'CACHE_RESULT':
@ -118,7 +139,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
...data,
} satisfies GatewaySendPayload);
manager!.postMessage({
this.postMessage({
type: 'RESULT_PAYLOAD',
nonce: data.nonce,
workerId: this.workerId,
@ -138,9 +159,9 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
break;
case 'SPAWN_SHARDS':
{
const cache = this.cache;
const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this);
const self = this;
for (const id of workerData.shards) {
let shard = this.shards.get(id);
if (!shard) {
@ -152,9 +173,9 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
debugger: this.debugger,
async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload);
await cache.onPacket(payload);
await self.cache.onPacket(payload);
await onPacket?.(payload, shardId);
manager!.postMessage({
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
@ -165,7 +186,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
this.shards.set(id, shard);
}
manager!.postMessage({
this.postMessage({
type: 'CONNECT_QUEUE',
shardId: id,
workerId: workerData.workerId,
@ -181,7 +202,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
return;
}
manager!.postMessage({
this.postMessage({
...generateShardInfo(shard),
nonce: data.nonce,
type: 'SHARD_INFO',
@ -191,7 +212,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
break;
case 'WORKER_INFO':
{
manager!.postMessage({
this.postMessage({
shards: [...this.shards.values()].map(generateShardInfo),
workerId: workerData.workerId,
type: 'WORKER_INFO',
@ -222,7 +243,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
} catch (e) {
result = e;
}
manager!.postMessage({
this.postMessage({
type: 'EVAL_RESPONSE',
response: result,
workerId: workerData.workerId,
@ -270,7 +291,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
tellWorker(workerId: number, func: (_: this) => {}) {
const nonce = this.generateNonce();
manager!.postMessage({
this.postMessage({
type: 'EVAL',
func: func.toString(),
toWorkerId: workerId,
@ -307,7 +328,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
!((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
) {
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
manager!.postMessage({
this.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerReady);
@ -327,7 +348,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
if (this.__handleGuilds?.has(packet.d.id)) {
this.__handleGuilds.delete(packet.d.id);
if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) {
manager!.postMessage({
this.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerReady);

View File

@ -216,7 +216,7 @@ export interface LimitedCollectionOptions {
export class LimitedCollection<K, V> {
static readonly default: LimitedCollectionOptions = {
resetOnDemand: false,
limit: Infinity,
limit: Number.POSITIVE_INFINITY,
expire: 0,
};
@ -262,7 +262,7 @@ export class LimitedCollection<K, V> {
}
}
if (this.closer!.expireOn === expireOn) {
if (this.closer?.expireOn === expireOn) {
this.resetTimeout();
}
}

View File

@ -39,6 +39,8 @@ export interface ShardManagerOptions extends ShardDetails {
}
export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handlePayload'> {
mode: 'threads' | 'clusters';
workers?: number;
/**

View File

@ -46,6 +46,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage<
export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>;
export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>;
export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>;
export type WorkerStart = CreateWorkerMessage<'WORKER_START'>;
export type WorkerSendApiRequest = CreateWorkerMessage<
'WORKER_API_REQUEST',
{
@ -90,4 +91,5 @@ export type WorkerMessage =
| WorkerSendApiRequest
| WorkerExecuteEval
| WorkerSendEvalResponse
| WorkerSendEval;
| WorkerSendEval
| WorkerStart;

View File

@ -1,5 +1,6 @@
import cluster, { type Worker as ClusterWorker } from 'node:cluster';
import { randomUUID } from 'node:crypto';
import { Worker } from 'node:worker_threads';
import { Worker as ThreadWorker } from 'node:worker_threads';
import { ApiHandler, Router } from '../..';
import { MemoryAdapter, type Adapter } from '../../cache';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
@ -16,9 +17,9 @@ import { ConnectQueue } from '../structures/timeout';
import { MemberUpdateHandler } from './events/memberUpdate';
import { PresenceUpdateHandler } from './events/presenceUpdate';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo, WorkerStart } from './worker';
export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) & { ready?: boolean }> {
options!: Required<WorkerManagerOptions>;
debugger?: Logger;
connectQueue!: ConnectQueue;
@ -120,6 +121,19 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
return chunks;
}
postMessage(id: number, body: any) {
const worker = this.get(id);
if (!worker) return this.debugger?.error(`Worker ${id} doesnt exists.`);
switch (this.options.mode) {
case 'clusters':
(worker as ClusterWorker).send(body);
break;
case 'threads':
(worker as ThreadWorker).postMessage(body);
break;
}
}
async prepareWorkers(shards: number[][]) {
for (let i = 0; i < shards.length; i++) {
let worker = this.get(i);
@ -135,23 +149,47 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
});
this.set(i, worker);
}
worker.postMessage({
type: 'SPAWN_SHARDS',
compress: this.options.compress ?? false,
info: {
...this.options.info,
shards: this.totalShards,
},
properties: this.options.properties,
} satisfies ManagerSpawnShards);
const listener = (message: WorkerStart) => {
if (message.type !== 'WORKER_START') return;
worker!.removeListener('message', listener);
this.postMessage(i, {
type: 'SPAWN_SHARDS',
compress: this.options.compress ?? false,
info: {
...this.options.info,
shards: this.totalShards,
},
properties: this.options.properties,
} satisfies ManagerSpawnShards);
};
worker.on('message', listener);
}
}
createWorker(workerData: WorkerData) {
const worker = new Worker(workerData.path, { workerData });
worker.on('message', data => this.handleWorkerMessage(data));
return worker;
const env: Record<string, any> = {
SEYFERT_SPAWNING: 'true',
};
for (const i in workerData) {
env[`SEYFERT_WORKER_${i.toUpperCase()}`] = workerData[i as keyof WorkerData];
}
switch (this.options.mode) {
case 'threads': {
const worker = new ThreadWorker(workerData.path, {
env,
});
worker.on('message', data => this.handleWorkerMessage(data));
return worker;
}
case 'clusters': {
cluster.setupPrimary({
exec: workerData.path,
});
const worker = cluster.fork(env);
worker.on('message', data => this.handleWorkerMessage(data));
return worker;
}
}
}
spawn(workerId: number, shardId: number) {
@ -161,8 +199,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
this.debugger?.fatal("Trying spawn with worker doesn't exist");
return;
}
worker.postMessage({
this.postMessage(workerId, {
type: 'ALLOW_CONNECT',
shardId,
presence: this.options.presence?.(shardId, workerId),
@ -183,7 +220,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
}
// @ts-expect-error
const result = await this.cacheAdapter[message.method](...message.args);
worker.postMessage({
this.postMessage(message.workerId, {
type: 'CACHE_RESULT',
nonce: message.nonce,
result,
@ -246,7 +283,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
{
this.get(message.workerId)!.ready = true;
if ([...this.values()].every(w => w.ready)) {
this.get(this.keys().next().value)?.postMessage({
this.postMessage(this.keys().next().value, {
type: 'BOT_READY',
} satisfies ManagerSendBotReady);
this.forEach(w => {
@ -258,7 +295,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
case 'WORKER_API_REQUEST':
{
const response = await this.rest.request(message.method, message.url, message.requestOptions);
this.get(message.workerId)!.postMessage({
this.postMessage(message.workerId, {
nonce: message.nonce,
response,
type: 'API_RESPONSE',
@ -280,14 +317,14 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
case 'EVAL':
{
const nonce = this.generateNonce();
this.get(message.toWorkerId)!.postMessage({
this.postMessage(message.toWorkerId, {
nonce,
func: message.func,
type: 'EXECUTE_EVAL',
toWorkerId: message.toWorkerId,
} satisfies ManagerExecuteEval);
this.generateSendPromise(nonce, 'Eval timeout').then(val =>
this.get(message.workerId)!.postMessage({
this.postMessage(message.workerId, {
nonce: message.nonce,
response: val,
type: 'EVAL_RESPONSE',
@ -334,7 +371,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
const nonce = this.generateNonce();
worker.postMessage({
this.postMessage(workerId, {
type: 'SEND_PAYLOAD',
shardId,
nonce,
@ -354,7 +391,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
const nonce = this.generateNonce(false);
worker.postMessage({ shardId, nonce, type: 'SHARD_INFO' } satisfies ManagerRequestShardInfo);
this.postMessage(workerId, { shardId, nonce, type: 'SHARD_INFO' } satisfies ManagerRequestShardInfo);
return this.generateSendPromise<WorkerShardInfo>(nonce, 'Get shard info timeout');
}
@ -368,7 +405,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
const nonce = this.generateNonce();
worker.postMessage({ nonce, type: 'WORKER_INFO' } satisfies ManagerRequestWorkerInfo);
this.postMessage(workerId, { nonce, type: 'WORKER_INFO' } satisfies ManagerRequestWorkerInfo);
return this.generateSendPromise<WorkerInfo>(nonce, 'Get worker info timeout');
}
@ -386,7 +423,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
debug: this.options.debug,
});
this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get();
this.options.shardEnd ??= this.options.info.shards;
this.options.shardEnd ??= this.options.totalShards ?? this.options.info.shards;
this.options.totalShards ??= this.options.shardEnd;
this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, this.options);
this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker);