From d34d45604f4ac2e7964e917a218008630afc65c9 Mon Sep 17 00:00:00 2001 From: MARCROCK22 <57925328+MARCROCK22@users.noreply.github.com> Date: Wed, 13 Mar 2024 17:55:47 -0400 Subject: [PATCH] tellWorker --- src/api/api.ts | 21 +++++--- src/client/workerclient.ts | 71 ++++++++++++++++++++++++ src/websocket/discord/worker.ts | 28 +++++++++- src/websocket/discord/workermanager.ts | 75 +++++++++++++++++++++----- 4 files changed, 172 insertions(+), 23 deletions(-) diff --git a/src/api/api.ts b/src/api/api.ts index c5a9a03..f14ac14 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -68,14 +68,19 @@ export class ApiHandler { ): Promise { if (this.options.workerProxy) { const nonce = this.#randomUUID(); - parentPort!.postMessage({ - method, - url, - type: 'WORKER_API_REQUEST', - workerId: (workerData as WorkerData).workerId, - nonce, - requestOptions: { auth, ...request }, - } satisfies WorkerSendApiRequest); + parentPort!.postMessage( + { + method, + url, + type: 'WORKER_API_REQUEST', + workerId: (workerData as WorkerData).workerId, + nonce, + requestOptions: { auth, ...request }, + } satisfies WorkerSendApiRequest, + request.files + ?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data)) + .map(x => x.data as Buffer | Uint8Array), + ); let resolve = (_value: T) => {}; let reject = () => {}; diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 697b2ef..6b48996 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,3 +1,4 @@ +import { randomUUID } from 'node:crypto'; import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads'; import { ApiHandler } from '..'; import type { Cache } from '../cache'; @@ -11,6 +12,8 @@ import type { WorkerReady, WorkerReceivePayload, WorkerRequestConnect, + WorkerSendEval, + WorkerSendEvalResponse, WorkerSendInfo, WorkerSendResultPayload, WorkerSendShardInfo, @@ -33,6 +36,8 @@ export class WorkerClient extends BaseClient { events = new EventHandler(this.logger); me!: When; + promises = new Map void; timeout: NodeJS.Timeout }>(); + shards = new Map(); declare options: WorkerClientOptions | undefined; @@ -206,9 +211,75 @@ export class WorkerClient extends BaseClient { promise.resolve(data.response); } break; + case 'EXECUTE_EVAL': + { + let result; + try { + // biome-ignore lint/security/noGlobalEval: yes + result = await eval(` + (${data.func})(this) + `); + } catch (e) { + result = e; + } + manager!.postMessage({ + type: 'EVAL_RESPONSE', + response: result, + workerId: workerData.workerId, + nonce: data.nonce, + } satisfies WorkerSendEvalResponse); + } + break; + case 'EVAL_RESPONSE': + { + const evalResponse = this.promises.get(data.nonce); + if (!evalResponse) return; + this.promises.delete(data.nonce); + clearTimeout(evalResponse.timeout); + evalResponse.resolve(data.response); + } + break; } } + private generateNonce(large = true): string { + const uuid = randomUUID(); + const nonce = large ? uuid : uuid.split('-')[0]; + if (this.promises.has(nonce)) return this.generateNonce(large); + return nonce; + } + + private generateSendPromise(nonce: string, message = 'Timeout'): Promise { + let resolve = (_: T) => { + /**/ + }; + let timeout = -1 as unknown as NodeJS.Timeout; + + const promise = new Promise((res, rej) => { + resolve = res; + timeout = setTimeout(() => { + this.promises.delete(nonce); + rej(new Error(message)); + }, 60e3); + }); + + this.promises.set(nonce, { resolve, timeout }); + + return promise; + } + + tellWorker(workerId: number, func: (_: this) => {}) { + const nonce = this.generateNonce(); + manager!.postMessage({ + type: 'EVAL', + func: func.toString(), + toWorkerId: workerId, + workerId: workerData.workerId, + nonce, + } satisfies WorkerSendEval); + return this.generateSendPromise(nonce); + } + protected async onPacket(packet: GatewayDispatchPayload, shardId: number) { await this.events.execute('RAW', packet, this as WorkerClient, shardId); switch (packet.t) { diff --git a/src/websocket/discord/worker.ts b/src/websocket/discord/worker.ts index 841dd69..88c5546 100644 --- a/src/websocket/discord/worker.ts +++ b/src/websocket/discord/worker.ts @@ -55,6 +55,29 @@ export type WorkerSendApiRequest = CreateWorkerMessage< nonce: string; } >; +export type WorkerExecuteEval = CreateWorkerMessage< + 'EXECUTE_EVAL', + { + func: string; + nonce: string; + toWorkerId: number; + } +>; +export type WorkerSendEvalResponse = CreateWorkerMessage< + 'EVAL_RESPONSE', + { + response: any; + nonce: string; + } +>; +export type WorkerSendEval = CreateWorkerMessage< + 'EVAL', + { + func: string; + nonce: string; + toWorkerId: number; + } +>; export type WorkerMessage = | WorkerRequestConnect @@ -64,4 +87,7 @@ export type WorkerMessage = | WorkerSendShardInfo | WorkerSendInfo | WorkerReady - | WorkerSendApiRequest; + | WorkerSendApiRequest + | WorkerExecuteEval + | WorkerSendEvalResponse + | WorkerSendEval; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index b06fd23..978ada5 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -202,37 +202,37 @@ export class WorkerManager extends Map { break; case 'RESULT_PAYLOAD': { - const cacheData = this.promises.get(message.nonce); - if (!cacheData) { + const resultPayload = this.promises.get(message.nonce); + if (!resultPayload) { return; } this.promises.delete(message.nonce); - clearTimeout(cacheData.timeout); - cacheData.resolve(true); + clearTimeout(resultPayload.timeout); + resultPayload.resolve(true); } break; case 'SHARD_INFO': { const { nonce, type, ...data } = message; - const cacheData = this.promises.get(nonce); - if (!cacheData) { + const shardInfo = this.promises.get(nonce); + if (!shardInfo) { return; } this.promises.delete(nonce); - clearTimeout(cacheData.timeout); - cacheData.resolve(data); + clearTimeout(shardInfo.timeout); + shardInfo.resolve(data); } break; case 'WORKER_INFO': { const { nonce, type, ...data } = message; - const cacheData = this.promises.get(nonce); - if (!cacheData) { + const workerInfo = this.promises.get(nonce); + if (!workerInfo) { return; } this.promises.delete(nonce); - clearTimeout(cacheData.timeout); - cacheData.resolve(data); + clearTimeout(workerInfo.timeout); + workerInfo.resolve(data); } break; case 'WORKER_READY': @@ -258,6 +258,36 @@ export class WorkerManager extends Map { } satisfies ManagerSendApiResponse); } break; + case 'EVAL_RESPONSE': + { + const { nonce, type, ...data } = message; + const evalResponse = this.promises.get(nonce); + if (!evalResponse) { + return; + } + this.promises.delete(nonce); + clearTimeout(evalResponse.timeout); + evalResponse.resolve(data.response); + } + break; + case 'EVAL': + { + const nonce = this.generateNonce(); + this.get(message.toWorkerId)!.postMessage({ + nonce, + func: message.func, + type: 'EXECUTE_EVAL', + toWorkerId: message.toWorkerId, + } satisfies ManagerExecuteEval); + this.generateSendPromise(nonce, 'Eval timeout').then(val => + this.get(message.workerId)!.postMessage({ + nonce: message.nonce, + response: val, + type: 'EVAL_RESPONSE', + } satisfies ManagerSendEvalResponse), + ); + } + break; } } @@ -279,7 +309,7 @@ export class WorkerManager extends Map { timeout = setTimeout(() => { this.promises.delete(nonce); rej(new Error(message)); - }, 3e3); + }, 60e3); }); this.promises.set(nonce, { resolve, timeout }); @@ -401,6 +431,21 @@ export type ManagerSendApiResponse = CreateManagerMessage< nonce: string; } >; +export type ManagerExecuteEval = CreateManagerMessage< + 'EXECUTE_EVAL', + { + func: string; + nonce: string; + toWorkerId: number; + } +>; +export type ManagerSendEvalResponse = CreateManagerMessage< + 'EVAL_RESPONSE', + { + response: any; + nonce: string; + } +>; export type ManagerMessages = | ManagerAllowConnect @@ -410,4 +455,6 @@ export type ManagerMessages = | ManagerRequestWorkerInfo | ManagerSendCacheResult | ManagerSendBotReady - | ManagerSendApiResponse; + | ManagerSendApiResponse + | ManagerSendEvalResponse + | ManagerExecuteEval;