workerProxy implemented

This commit is contained in:
MARCROCK22 2024-03-13 16:30:03 -04:00
parent b059fb0d5d
commit 08188e3592
9 changed files with 128 additions and 63 deletions

View File

@ -1,7 +1,11 @@
import { filetypeinfo } from 'magic-bytes.js';
import { randomUUID } from 'node:crypto';
import { setTimeout as delay } from 'node:timers/promises';
import { parentPort, workerData } from 'node:worker_threads';
import { Logger } from '../common';
import { snowflakeToTimestamp } from '../structures/extra/functions';
import type { WorkerData } from '../websocket';
import type { WorkerSendApiRequest } from '../websocket/discord/worker';
import { CDN } from './CDN';
import type { ProxyRequestMethod } from './Router';
import { Bucket } from './bucket';
@ -24,6 +28,7 @@ export class ApiHandler {
readyQueue: (() => void)[] = [];
cdn = new CDN();
debugger?: Logger;
workerPromises?: Map<string, { resolve: (value: any) => any; reject: (error: any) => any }>;
constructor(options: ApiHandlerOptions) {
this.options = {
@ -37,6 +42,9 @@ export class ApiHandler {
name: '[API]',
});
}
if (options.workerProxy && !parentPort) throw new Error('Cannot use workerProxy without a parent.');
if (options.workerProxy) this.workerPromises = new Map();
}
globalUnblock() {
@ -47,11 +55,39 @@ export class ApiHandler {
}
}
#randomUUID(): string {
const uuid = randomUUID();
if (this.workerPromises!.has(uuid)) return this.#randomUUID();
return uuid;
}
async request<T = any>(
method: HttpMethods,
url: `/${string}`,
{ auth = true, ...request }: ApiRequestOptions = {},
): Promise<T> {
if (this.options.workerProxy) {
const nonce = this.#randomUUID();
parentPort!.postMessage({
method,
url,
type: 'WORKER_API_REQUEST',
workerId: (workerData as WorkerData).workerId,
nonce,
requestOptions: { auth, ...request },
} satisfies WorkerSendApiRequest);
let resolve = (_value: T) => {};
let reject = () => {};
const promise = new Promise<T>((res, rej) => {
resolve = res;
reject = rej;
});
this.workerPromises!.set(nonce, { reject, resolve });
return promise;
}
const route = request.route || this.routefy(url, method);
let attempts = 0;

View File

@ -12,6 +12,7 @@ export interface ApiHandlerOptions {
debug?: boolean;
agent?: string;
smartBucket?: boolean;
workerProxy?: boolean;
}
export interface ApiHandlerInternalOptions extends MakeRequired<ApiHandlerOptions, 'baseUrl' | 'domain'> {

View File

@ -149,11 +149,7 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
!this.__handleGuilds?.size ||
!((this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
) {
if (
[...this.gateway.values()].every(shard => shard.data.session_id) &&
this.events.values.BOT_READY &&
(this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true)
) {
if ([...this.gateway.values()].every(shard => shard.data.session_id)) {
await this.events.runEvent('BOT_READY', this, this.me, -1);
}
delete this.__handleGuilds;
@ -163,12 +159,7 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
case 'GUILD_CREATE': {
if (this.__handleGuilds?.has(packet.d.id)) {
this.__handleGuilds.delete(packet.d.id);
if (
!this.__handleGuilds.size &&
[...this.gateway.values()].every(shard => shard.data.session_id) &&
this.events.values.BOT_READY &&
(this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true)
) {
if (!this.__handleGuilds.size && [...this.gateway.values()].every(shard => shard.data.session_id)) {
await this.events.runEvent('BOT_READY', this, this.me, -1);
}
if (!this.__handleGuilds.size) delete this.__handleGuilds;

View File

@ -209,8 +209,8 @@ export async function onInteractionCreate(
case InteractionType.MessageComponent:
{
const interaction = BaseInteraction.from(self, body, __reply) as ComponentInteraction;
if (self.components.hasComponent([body.message.id, body.id], interaction.customId)) {
await self.components.onComponent([body.message.id, body.id], interaction);
if (self.components.hasComponent(body.message.id, interaction.customId)) {
await self.components.onComponent(body.message.id, interaction);
} else {
await self.components.executeComponent(interaction);
}

View File

@ -1,4 +1,5 @@
import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads';
import { ApiHandler } from '..';
import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache';
import type { GatewayDispatchPayload, GatewaySendPayload, When } from '../common';
@ -53,6 +54,15 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
logLevel: LogLevels.Debug,
});
}
if (workerData.workerProxy) {
this.setServices({
rest: new ApiHandler({
token: workerData.token,
workerProxy: true,
debug: workerData.debug,
}),
});
}
}
get workerId() {
@ -106,6 +116,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
manager!.postMessage({
type: 'RESULT_PAYLOAD',
nonce: data.nonce,
workerId: this.workerId,
} satisfies WorkerSendResultPayload);
}
break;
@ -169,6 +180,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
...generateShardInfo(shard),
nonce: data.nonce,
type: 'SHARD_INFO',
workerId: this.workerId,
} satisfies WorkerSendShardInfo);
}
break;
@ -183,11 +195,15 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}
break;
case 'BOT_READY':
if (
this.events.values.BOT_READY &&
(this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true)
) {
await this.events.runEvent('BOT_READY', this, this.me, -1);
await this.events.runEvent('BOT_READY', this, this.me, -1);
break;
case 'API_RESPONSE':
{
const promise = this.rest.workerPromises!.get(data.nonce);
if (!promise) return;
this.rest.workerPromises!.delete(data.nonce);
if (data.error) return promise.reject(data.error);
promise.resolve(data.response);
}
break;
}
@ -219,11 +235,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
!this.__handleGuilds?.size ||
!((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
) {
if (
[...this.shards.values()].every(shard => shard.data.session_id) &&
this.events.values.WORKER_READY &&
(this.events.values.WORKER_READY.fired ? !this.events.values.WORKER_READY.data.once : true)
) {
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
manager!.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
@ -243,12 +255,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
case 'GUILD_CREATE': {
if (this.__handleGuilds?.has(packet.d.id)) {
this.__handleGuilds.delete(packet.d.id);
if (
!this.__handleGuilds.size &&
[...this.shards.values()].every(shard => shard.data.session_id) &&
this.events.values.WORKER_READY &&
(this.events.values.WORKER_READY.fired ? !this.events.values.WORKER_READY.data.once : true)
) {
if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) {
manager!.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,

View File

@ -71,31 +71,28 @@ export class ComponentHandler extends BaseHandler {
};
}
async onComponent(ids: [string, string], interaction: ComponentInteraction) {
for (const id of ids) {
const row = this.values.get(id);
const component = row?.components?.[interaction.customId];
if (!component) continue;
if (row.options?.filter) {
if (!(await row.options.filter(interaction))) return;
}
row.idle?.refresh();
await component(
interaction,
reason => {
row.options?.onStop?.(reason ?? 'stop');
this.deleteValue(id);
},
() => {
this.resetTimeouts(id);
},
);
break;
async onComponent(id: string, interaction: ComponentInteraction) {
const row = this.values.get(id);
const component = row?.components?.[interaction.customId];
if (!component) return;
if (row.options?.filter) {
if (!(await row.options.filter(interaction))) return;
}
row.idle?.refresh();
await component(
interaction,
reason => {
row.options?.onStop?.(reason ?? 'stop');
this.deleteValue(id);
},
() => {
this.resetTimeouts(id);
},
);
}
hasComponent(ids: [string, string], customId: string) {
return ids.some(id => this.values.get(id)?.components?.[customId]);
hasComponent(id: string, customId: string) {
return this.values.get(id)?.components?.[customId];
}
resetTimeouts(id: string) {

View File

@ -46,6 +46,8 @@ export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handleP
*/
shardsPerWorker?: number;
workerProxy?: boolean;
path: string;
handlePayload(shardId: number, workerId: number, packet: GatewayDispatchPayload): unknown;
@ -115,4 +117,5 @@ export interface WorkerData {
shards: number[];
workerId: number;
debug: boolean;
workerProxy: boolean;
}

View File

@ -1,3 +1,4 @@
import type { ApiRequestOptions, HttpMethods } from '../..';
import type { GatewayDispatchPayload } from '../../common';
export interface WorkerShardInfo {
@ -7,14 +8,17 @@ export interface WorkerShardInfo {
resumable: boolean;
}
export type WorkerInfo = { shards: WorkerShardInfo[]; workerId: number };
export type WorkerInfo = { shards: WorkerShardInfo[] };
type CreateWorkerMessage<T extends string, D extends object = {}> = { type: T } & D;
type CreateWorkerMessage<T extends string, D extends object = {}> = {
type: T;
workerId: number;
} & D;
export type WorkerRequestConnect = CreateWorkerMessage<'CONNECT_QUEUE', { shardId: number; workerId: number }>;
export type WorkerRequestConnect = CreateWorkerMessage<'CONNECT_QUEUE', { shardId: number }>;
export type WorkerReceivePayload = CreateWorkerMessage<
'RECEIVE_PAYLOAD',
{ shardId: number; workerId: number; payload: GatewayDispatchPayload }
{ shardId: number; payload: GatewayDispatchPayload }
>;
export type WorkerSendResultPayload = CreateWorkerMessage<'RESULT_PAYLOAD', { nonce: string }>;
export type WorkerSendCacheRequest = CreateWorkerMessage<
@ -37,15 +41,18 @@ export type WorkerSendCacheRequest = CreateWorkerMessage<
| 'removeRelationship'
| 'removeToRelationship';
args: any[];
workerId: number;
}
>;
export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>;
export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>;
export type WorkerReady = CreateWorkerMessage<
'WORKER_READY',
export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>;
export type WorkerSendApiRequest = CreateWorkerMessage<
'WORKER_API_REQUEST',
{
workerId: number;
method: HttpMethods;
url: `/${string}`;
requestOptions: ApiRequestOptions;
nonce: string;
}
>;
@ -56,4 +63,5 @@ export type WorkerMessage =
| WorkerSendCacheRequest
| WorkerSendShardInfo
| WorkerSendInfo
| WorkerReady;
| WorkerReady
| WorkerSendApiRequest;

View File

@ -17,6 +17,7 @@ import { MemberUpdateHandler } from './events/memberUpdate';
import { PresenceUpdateHandler } from './events/presenceUpdate';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker';
export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
options!: Required<WorkerManagerOptions>;
debugger?: Logger;
@ -28,7 +29,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
rest!: ApiHandler;
constructor(options: MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info' | 'handlePayload'>) {
super();
this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, options);
this.options = MergeOptions<WorkerManager['options']>(WorkerManagerDefaults, options);
this.cacheAdapter = new MemoryAdapter();
}
@ -126,6 +127,7 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
shards: shards[i],
intents: this.options.intents,
workerId: i,
workerProxy: this.options.workerProxy,
});
this.set(i, worker);
}
@ -246,6 +248,16 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
}
}
break;
case 'WORKER_API_REQUEST':
{
const response = await this.rest.request(message.method, message.url, message.requestOptions);
this.get(message.workerId)!.postMessage({
nonce: message.nonce,
response,
type: 'API_RESPONSE',
} satisfies ManagerSendApiResponse);
}
break;
}
}
@ -334,7 +346,8 @@ export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
token: this.options.token,
baseUrl: 'api/v10',
domain: 'https://discord.com',
}); //TODO: share ratelimits with all workers
debug: this.options.debug,
});
this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get();
this.options.totalShards ??= this.options.info.shards;
this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, this.options);
@ -380,6 +393,14 @@ export type ManagerRequestShardInfo = CreateManagerMessage<'SHARD_INFO', { nonce
export type ManagerRequestWorkerInfo = CreateManagerMessage<'WORKER_INFO', { nonce: string }>;
export type ManagerSendCacheResult = CreateManagerMessage<'CACHE_RESULT', { nonce: string; result: any }>;
export type ManagerSendBotReady = CreateManagerMessage<'BOT_READY'>;
export type ManagerSendApiResponse = CreateManagerMessage<
'API_RESPONSE',
{
response: any;
error?: any;
nonce: string;
}
>;
export type ManagerMessages =
| ManagerAllowConnect
@ -388,4 +409,5 @@ export type ManagerMessages =
| ManagerRequestShardInfo
| ManagerRequestWorkerInfo
| ManagerSendCacheResult
| ManagerSendBotReady;
| ManagerSendBotReady
| ManagerSendApiResponse;