From 53e99d52170e925768cd0486edad369f9aeaf096 Mon Sep 17 00:00:00 2001 From: MARCROCK22 Date: Sat, 16 Nov 2024 13:13:30 -0400 Subject: [PATCH] fix(workerProxy): files support --- src/api/api.ts | 70 +++++++++++++++++--------- src/client/workerclient.ts | 1 + src/common/it/utils.ts | 18 +++++++ src/websocket/discord/workermanager.ts | 8 +++ 4 files changed, 74 insertions(+), 23 deletions(-) diff --git a/src/api/api.ts b/src/api/api.ts index a3ab6e7..44d5170 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -1,5 +1,6 @@ import { type UUID, randomUUID } from 'node:crypto'; import { type Awaitable, BASE_HOST, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common'; +import { toArrayBuffer, toBuffer } from '../common/it/utils'; import type { WorkerData } from '../websocket'; import type { WorkerSendApiRequest } from '../websocket/discord/worker'; import { CDNRouter, Router } from './Router'; @@ -16,13 +17,12 @@ import { } from './shared'; import { isBufferLike } from './utils/utils'; -let parentPort: import('node:worker_threads').MessagePort; -let workerData: WorkerData; - export interface ApiHandler { /* @internal */ _proxy_?: APIRoutes; debugger?: Logger; + /* @internal */ + workerData?: WorkerData; } export type OnRatelimitCallback = (response: Response, request: ApiRequestOptions) => Awaitable; @@ -48,12 +48,34 @@ export class ApiHandler { const worker_threads = lazyLoadPackage('node:worker_threads'); - if (options.workerProxy && !worker_threads?.parentPort) throw new Error('Cannot use workerProxy without a parent.'); + if (options.workerProxy && !worker_threads?.parentPort && !process.send) + throw new Error('Cannot use workerProxy without a parent.'); if (options.workerProxy) this.workerPromises = new Map(); - if (worker_threads) { - workerData = worker_threads.workerData; - if (worker_threads.parentPort) parentPort = worker_threads.parentPort; + if (worker_threads?.parentPort) { + this.sendMessage = async body => { + console; + worker_threads.parentPort!.postMessage( + body, + body.requestOptions.files + ?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data)) + .map(x => (x.data instanceof Buffer ? toArrayBuffer(x.data) : (x.data as ArrayBuffer | Uint8Array))), + ); + }; + } else if (process.send) { + this.sendMessage = body => { + const data = { + ...body, + requestOptions: { + ...body.requestOptions, + files: body.requestOptions.files?.map(file => { + if (file.data instanceof ArrayBuffer) file.data = toBuffer(file.data); + return file; + }), + }, + }; + process.send!(data); + }; } } @@ -83,6 +105,17 @@ export class ApiHandler { return uuid; } + private sendMessage(_body: WorkerSendApiRequest) { + throw new Error('Function not implemented'); + } + + protected postMessage(body: WorkerSendApiRequest) { + this.sendMessage(body); + return new Promise((res, rej) => { + this.workerPromises!.set(body.nonce, { reject: rej, resolve: res }); + }); + } + async request( method: HttpMethods, url: `/${string}`, @@ -90,22 +123,13 @@ export class ApiHandler { ): Promise { if (this.options.workerProxy) { const nonce = this.#randomUUID(); - parentPort!.postMessage( - { - method, - url, - type: 'WORKER_API_REQUEST', - workerId: workerData.workerId, - nonce, - requestOptions: { auth, ...request }, - } satisfies WorkerSendApiRequest, - request.files - ?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data)) - .map(x => x.data as Buffer | Uint8Array), - ); - - return new Promise((res, rej) => { - this.workerPromises!.set(nonce, { reject: rej, resolve: res }); + return this.postMessage({ + method, + url, + type: 'WORKER_API_REQUEST', + workerId: this.workerData!.workerId, + nonce, + requestOptions: { auth, ...request }, }); } const route = request.route || this.routefy(url, method); diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 31154ae..0968500 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -137,6 +137,7 @@ export class WorkerClient extends BaseClient { }); } this.cache.intents = workerData.intents; + this.rest.workerData = workerData; this.postMessage({ type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START', workerId: workerData.workerId, diff --git a/src/common/it/utils.ts b/src/common/it/utils.ts index d243b67..e4f35f1 100644 --- a/src/common/it/utils.ts +++ b/src/common/it/utils.ts @@ -403,3 +403,21 @@ export function hasIntent(intents: number, target: keyof typeof GatewayIntentBit const intent = typeof target === 'string' ? GatewayIntentBits[target] : target; return (intents & intent) === intent; } + +export function toArrayBuffer(buffer: Buffer) { + const arrayBuffer = new ArrayBuffer(buffer.length); + const view = new Uint8Array(arrayBuffer); + for (let i = 0; i < buffer.length; ++i) { + view[i] = buffer[i]; + } + return arrayBuffer; +} + +export function toBuffer(arrayBuffer: ArrayBuffer) { + const buffer = Buffer.alloc(arrayBuffer.byteLength); + const view = new Uint8Array(arrayBuffer); + for (let i = 0; i < buffer.length; ++i) { + buffer[i] = view[i]; + } + return buffer; +} diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index 92b9c6c..fd249cb 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -403,6 +403,14 @@ export class WorkerManager extends Map< break; case 'WORKER_API_REQUEST': { + if (this.options.mode === 'clusters' && message.requestOptions.files?.length) { + message.requestOptions.files.forEach(file => { + //@ts-expect-error + if (file.data.type === 'Buffer' && Array.isArray(file.data?.data)) + //@ts-expect-error + file.data = new Uint8Array(file.data.data); + }); + } const response = await this.rest.request(message.method, message.url, message.requestOptions); this.postMessage(message.workerId, { nonce: message.nonce,