fix(workerProxy): files support

This commit is contained in:
MARCROCK22 2024-11-16 13:13:30 -04:00
parent c92b1ab19f
commit 53e99d5217
4 changed files with 74 additions and 23 deletions

View File

@ -1,5 +1,6 @@
import { type UUID, randomUUID } from 'node:crypto'; import { type UUID, randomUUID } from 'node:crypto';
import { type Awaitable, BASE_HOST, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common'; import { type Awaitable, BASE_HOST, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common';
import { toArrayBuffer, toBuffer } from '../common/it/utils';
import type { WorkerData } from '../websocket'; import type { WorkerData } from '../websocket';
import type { WorkerSendApiRequest } from '../websocket/discord/worker'; import type { WorkerSendApiRequest } from '../websocket/discord/worker';
import { CDNRouter, Router } from './Router'; import { CDNRouter, Router } from './Router';
@ -16,13 +17,12 @@ import {
} from './shared'; } from './shared';
import { isBufferLike } from './utils/utils'; import { isBufferLike } from './utils/utils';
let parentPort: import('node:worker_threads').MessagePort;
let workerData: WorkerData;
export interface ApiHandler { export interface ApiHandler {
/* @internal */ /* @internal */
_proxy_?: APIRoutes; _proxy_?: APIRoutes;
debugger?: Logger; debugger?: Logger;
/* @internal */
workerData?: WorkerData;
} }
export type OnRatelimitCallback = (response: Response, request: ApiRequestOptions) => Awaitable<any>; export type OnRatelimitCallback = (response: Response, request: ApiRequestOptions) => Awaitable<any>;
@ -48,12 +48,34 @@ export class ApiHandler {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads'); const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (options.workerProxy && !worker_threads?.parentPort) throw new Error('Cannot use workerProxy without a parent.'); if (options.workerProxy && !worker_threads?.parentPort && !process.send)
throw new Error('Cannot use workerProxy without a parent.');
if (options.workerProxy) this.workerPromises = new Map(); if (options.workerProxy) this.workerPromises = new Map();
if (worker_threads) { if (worker_threads?.parentPort) {
workerData = worker_threads.workerData; this.sendMessage = async body => {
if (worker_threads.parentPort) parentPort = worker_threads.parentPort; console;
worker_threads.parentPort!.postMessage(
body,
body.requestOptions.files
?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data))
.map(x => (x.data instanceof Buffer ? toArrayBuffer(x.data) : (x.data as ArrayBuffer | Uint8Array))),
);
};
} else if (process.send) {
this.sendMessage = body => {
const data = {
...body,
requestOptions: {
...body.requestOptions,
files: body.requestOptions.files?.map(file => {
if (file.data instanceof ArrayBuffer) file.data = toBuffer(file.data);
return file;
}),
},
};
process.send!(data);
};
} }
} }
@ -83,6 +105,17 @@ export class ApiHandler {
return uuid; return uuid;
} }
private sendMessage(_body: WorkerSendApiRequest) {
throw new Error('Function not implemented');
}
protected postMessage<T = unknown>(body: WorkerSendApiRequest) {
this.sendMessage(body);
return new Promise<T>((res, rej) => {
this.workerPromises!.set(body.nonce, { reject: rej, resolve: res });
});
}
async request<T = unknown>( async request<T = unknown>(
method: HttpMethods, method: HttpMethods,
url: `/${string}`, url: `/${string}`,
@ -90,22 +123,13 @@ export class ApiHandler {
): Promise<T> { ): Promise<T> {
if (this.options.workerProxy) { if (this.options.workerProxy) {
const nonce = this.#randomUUID(); const nonce = this.#randomUUID();
parentPort!.postMessage( return this.postMessage<T>({
{ method,
method, url,
url, type: 'WORKER_API_REQUEST',
type: 'WORKER_API_REQUEST', workerId: this.workerData!.workerId,
workerId: workerData.workerId, nonce,
nonce, requestOptions: { auth, ...request },
requestOptions: { auth, ...request },
} satisfies WorkerSendApiRequest,
request.files
?.filter(x => !['string', 'boolean', 'number'].includes(typeof x.data))
.map(x => x.data as Buffer | Uint8Array),
);
return new Promise<T>((res, rej) => {
this.workerPromises!.set(nonce, { reject: rej, resolve: res });
}); });
} }
const route = request.route || this.routefy(url, method); const route = request.route || this.routefy(url, method);

View File

@ -137,6 +137,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}); });
} }
this.cache.intents = workerData.intents; this.cache.intents = workerData.intents;
this.rest.workerData = workerData;
this.postMessage({ this.postMessage({
type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START', type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START',
workerId: workerData.workerId, workerId: workerData.workerId,

View File

@ -403,3 +403,21 @@ export function hasIntent(intents: number, target: keyof typeof GatewayIntentBit
const intent = typeof target === 'string' ? GatewayIntentBits[target] : target; const intent = typeof target === 'string' ? GatewayIntentBits[target] : target;
return (intents & intent) === intent; return (intents & intent) === intent;
} }
export function toArrayBuffer(buffer: Buffer) {
const arrayBuffer = new ArrayBuffer(buffer.length);
const view = new Uint8Array(arrayBuffer);
for (let i = 0; i < buffer.length; ++i) {
view[i] = buffer[i];
}
return arrayBuffer;
}
export function toBuffer(arrayBuffer: ArrayBuffer) {
const buffer = Buffer.alloc(arrayBuffer.byteLength);
const view = new Uint8Array(arrayBuffer);
for (let i = 0; i < buffer.length; ++i) {
buffer[i] = view[i];
}
return buffer;
}

View File

@ -403,6 +403,14 @@ export class WorkerManager extends Map<
break; break;
case 'WORKER_API_REQUEST': case 'WORKER_API_REQUEST':
{ {
if (this.options.mode === 'clusters' && message.requestOptions.files?.length) {
message.requestOptions.files.forEach(file => {
//@ts-expect-error
if (file.data.type === 'Buffer' && Array.isArray(file.data?.data))
//@ts-expect-error
file.data = new Uint8Array(file.data.data);
});
}
const response = await this.rest.request(message.method, message.url, message.requestOptions); const response = await this.rest.request(message.method, message.url, message.requestOptions);
this.postMessage(message.workerId, { this.postMessage(message.workerId, {
nonce: message.nonce, nonce: message.nonce,