tellWorker

This commit is contained in:
MARCROCK22 2024-03-13 17:55:47 -04:00
parent 08188e3592
commit d34d45604f
4 changed files with 172 additions and 23 deletions

View File

@ -68,14 +68,19 @@ export class ApiHandler {
): Promise<T> { ): Promise<T> {
if (this.options.workerProxy) { if (this.options.workerProxy) {
const nonce = this.#randomUUID(); const nonce = this.#randomUUID();
parentPort!.postMessage({ parentPort!.postMessage(
method, {
url, method,
type: 'WORKER_API_REQUEST', url,
workerId: (workerData as WorkerData).workerId, type: 'WORKER_API_REQUEST',
nonce, workerId: (workerData as WorkerData).workerId,
requestOptions: { auth, ...request }, nonce,
} satisfies WorkerSendApiRequest); requestOptions: { auth, ...request },
} satisfies WorkerSendApiRequest,
request.files
?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data))
.map(x => x.data as Buffer | Uint8Array),
);
let resolve = (_value: T) => {}; let resolve = (_value: T) => {};
let reject = () => {}; let reject = () => {};

View File

@ -1,3 +1,4 @@
import { randomUUID } from 'node:crypto';
import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads'; import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads';
import { ApiHandler } from '..'; import { ApiHandler } from '..';
import type { Cache } from '../cache'; import type { Cache } from '../cache';
@ -11,6 +12,8 @@ import type {
WorkerReady, WorkerReady,
WorkerReceivePayload, WorkerReceivePayload,
WorkerRequestConnect, WorkerRequestConnect,
WorkerSendEval,
WorkerSendEvalResponse,
WorkerSendInfo, WorkerSendInfo,
WorkerSendResultPayload, WorkerSendResultPayload,
WorkerSendShardInfo, WorkerSendShardInfo,
@ -33,6 +36,8 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
events = new EventHandler(this.logger); events = new EventHandler(this.logger);
me!: When<Ready, ClientUser>; me!: When<Ready, ClientUser>;
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
shards = new Map<number, Shard>(); shards = new Map<number, Shard>();
declare options: WorkerClientOptions | undefined; declare options: WorkerClientOptions | undefined;
@ -206,9 +211,75 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
promise.resolve(data.response); promise.resolve(data.response);
} }
break; break;
case 'EXECUTE_EVAL':
{
let result;
try {
// biome-ignore lint/security/noGlobalEval: yes
result = await eval(`
(${data.func})(this)
`);
} catch (e) {
result = e;
}
manager!.postMessage({
type: 'EVAL_RESPONSE',
response: result,
workerId: workerData.workerId,
nonce: data.nonce,
} satisfies WorkerSendEvalResponse);
}
break;
case 'EVAL_RESPONSE':
{
const evalResponse = this.promises.get(data.nonce);
if (!evalResponse) return;
this.promises.delete(data.nonce);
clearTimeout(evalResponse.timeout);
evalResponse.resolve(data.response);
}
break;
} }
} }
private generateNonce(large = true): string {
const uuid = randomUUID();
const nonce = large ? uuid : uuid.split('-')[0];
if (this.promises.has(nonce)) return this.generateNonce(large);
return nonce;
}
private generateSendPromise<T = unknown>(nonce: string, message = 'Timeout'): Promise<T> {
let resolve = (_: T) => {
/**/
};
let timeout = -1 as unknown as NodeJS.Timeout;
const promise = new Promise<T>((res, rej) => {
resolve = res;
timeout = setTimeout(() => {
this.promises.delete(nonce);
rej(new Error(message));
}, 60e3);
});
this.promises.set(nonce, { resolve, timeout });
return promise;
}
tellWorker(workerId: number, func: (_: this) => {}) {
const nonce = this.generateNonce();
manager!.postMessage({
type: 'EVAL',
func: func.toString(),
toWorkerId: workerId,
workerId: workerData.workerId,
nonce,
} satisfies WorkerSendEval);
return this.generateSendPromise(nonce);
}
protected async onPacket(packet: GatewayDispatchPayload, shardId: number) { protected async onPacket(packet: GatewayDispatchPayload, shardId: number) {
await this.events.execute('RAW', packet, this as WorkerClient<true>, shardId); await this.events.execute('RAW', packet, this as WorkerClient<true>, shardId);
switch (packet.t) { switch (packet.t) {

View File

@ -55,6 +55,29 @@ export type WorkerSendApiRequest = CreateWorkerMessage<
nonce: string; nonce: string;
} }
>; >;
export type WorkerExecuteEval = CreateWorkerMessage<
'EXECUTE_EVAL',
{
func: string;
nonce: string;
toWorkerId: number;
}
>;
export type WorkerSendEvalResponse = CreateWorkerMessage<
'EVAL_RESPONSE',
{
response: any;
nonce: string;
}
>;
export type WorkerSendEval = CreateWorkerMessage<
'EVAL',
{
func: string;
nonce: string;
toWorkerId: number;
}
>;
export type WorkerMessage = export type WorkerMessage =
| WorkerRequestConnect | WorkerRequestConnect
@ -64,4 +87,7 @@ export type WorkerMessage =
| WorkerSendShardInfo | WorkerSendShardInfo
| WorkerSendInfo | WorkerSendInfo
| WorkerReady | WorkerReady
| WorkerSendApiRequest; | WorkerSendApiRequest
| WorkerExecuteEval
| WorkerSendEvalResponse
| WorkerSendEval;

View File

@ -202,37 +202,37 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
break; break;
case 'RESULT_PAYLOAD': case 'RESULT_PAYLOAD':
{ {
const cacheData = this.promises.get(message.nonce); const resultPayload = this.promises.get(message.nonce);
if (!cacheData) { if (!resultPayload) {
return; return;
} }
this.promises.delete(message.nonce); this.promises.delete(message.nonce);
clearTimeout(cacheData.timeout); clearTimeout(resultPayload.timeout);
cacheData.resolve(true); resultPayload.resolve(true);
} }
break; break;
case 'SHARD_INFO': case 'SHARD_INFO':
{ {
const { nonce, type, ...data } = message; const { nonce, type, ...data } = message;
const cacheData = this.promises.get(nonce); const shardInfo = this.promises.get(nonce);
if (!cacheData) { if (!shardInfo) {
return; return;
} }
this.promises.delete(nonce); this.promises.delete(nonce);
clearTimeout(cacheData.timeout); clearTimeout(shardInfo.timeout);
cacheData.resolve(data); shardInfo.resolve(data);
} }
break; break;
case 'WORKER_INFO': case 'WORKER_INFO':
{ {
const { nonce, type, ...data } = message; const { nonce, type, ...data } = message;
const cacheData = this.promises.get(nonce); const workerInfo = this.promises.get(nonce);
if (!cacheData) { if (!workerInfo) {
return; return;
} }
this.promises.delete(nonce); this.promises.delete(nonce);
clearTimeout(cacheData.timeout); clearTimeout(workerInfo.timeout);
cacheData.resolve(data); workerInfo.resolve(data);
} }
break; break;
case 'WORKER_READY': case 'WORKER_READY':
@ -258,6 +258,36 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
} satisfies ManagerSendApiResponse); } satisfies ManagerSendApiResponse);
} }
break; break;
case 'EVAL_RESPONSE':
{
const { nonce, type, ...data } = message;
const evalResponse = this.promises.get(nonce);
if (!evalResponse) {
return;
}
this.promises.delete(nonce);
clearTimeout(evalResponse.timeout);
evalResponse.resolve(data.response);
}
break;
case 'EVAL':
{
const nonce = this.generateNonce();
this.get(message.toWorkerId)!.postMessage({
nonce,
func: message.func,
type: 'EXECUTE_EVAL',
toWorkerId: message.toWorkerId,
} satisfies ManagerExecuteEval);
this.generateSendPromise(nonce, 'Eval timeout').then(val =>
this.get(message.workerId)!.postMessage({
nonce: message.nonce,
response: val,
type: 'EVAL_RESPONSE',
} satisfies ManagerSendEvalResponse),
);
}
break;
} }
} }
@ -279,7 +309,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
timeout = setTimeout(() => { timeout = setTimeout(() => {
this.promises.delete(nonce); this.promises.delete(nonce);
rej(new Error(message)); rej(new Error(message));
}, 3e3); }, 60e3);
}); });
this.promises.set(nonce, { resolve, timeout }); this.promises.set(nonce, { resolve, timeout });
@ -401,6 +431,21 @@ export type ManagerSendApiResponse = CreateManagerMessage<
nonce: string; nonce: string;
} }
>; >;
export type ManagerExecuteEval = CreateManagerMessage<
'EXECUTE_EVAL',
{
func: string;
nonce: string;
toWorkerId: number;
}
>;
export type ManagerSendEvalResponse = CreateManagerMessage<
'EVAL_RESPONSE',
{
response: any;
nonce: string;
}
>;
export type ManagerMessages = export type ManagerMessages =
| ManagerAllowConnect | ManagerAllowConnect
@ -410,4 +455,6 @@ export type ManagerMessages =
| ManagerRequestWorkerInfo | ManagerRequestWorkerInfo
| ManagerSendCacheResult | ManagerSendCacheResult
| ManagerSendBotReady | ManagerSendBotReady
| ManagerSendApiResponse; | ManagerSendApiResponse
| ManagerSendEvalResponse
| ManagerExecuteEval;