diff --git a/src/api/api.ts b/src/api/api.ts index 63d7fc6..c5a9a03 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -1,7 +1,11 @@ import { filetypeinfo } from 'magic-bytes.js'; +import { randomUUID } from 'node:crypto'; import { setTimeout as delay } from 'node:timers/promises'; +import { parentPort, workerData } from 'node:worker_threads'; import { Logger } from '../common'; import { snowflakeToTimestamp } from '../structures/extra/functions'; +import type { WorkerData } from '../websocket'; +import type { WorkerSendApiRequest } from '../websocket/discord/worker'; import { CDN } from './CDN'; import type { ProxyRequestMethod } from './Router'; import { Bucket } from './bucket'; @@ -24,6 +28,7 @@ export class ApiHandler { readyQueue: (() => void)[] = []; cdn = new CDN(); debugger?: Logger; + workerPromises?: Map any; reject: (error: any) => any }>; constructor(options: ApiHandlerOptions) { this.options = { @@ -37,6 +42,9 @@ export class ApiHandler { name: '[API]', }); } + + if (options.workerProxy && !parentPort) throw new Error('Cannot use workerProxy without a parent.'); + if (options.workerProxy) this.workerPromises = new Map(); } globalUnblock() { @@ -47,11 +55,39 @@ export class ApiHandler { } } + #randomUUID(): string { + const uuid = randomUUID(); + if (this.workerPromises!.has(uuid)) return this.#randomUUID(); + return uuid; + } + async request( method: HttpMethods, url: `/${string}`, { auth = true, ...request }: ApiRequestOptions = {}, ): Promise { + if (this.options.workerProxy) { + const nonce = this.#randomUUID(); + parentPort!.postMessage({ + method, + url, + type: 'WORKER_API_REQUEST', + workerId: (workerData as WorkerData).workerId, + nonce, + requestOptions: { auth, ...request }, + } satisfies WorkerSendApiRequest); + let resolve = (_value: T) => {}; + let reject = () => {}; + + const promise = new Promise((res, rej) => { + resolve = res; + reject = rej; + }); + + this.workerPromises!.set(nonce, { reject, resolve }); + + return promise; + } const route = request.route || this.routefy(url, method); let attempts = 0; diff --git a/src/api/shared.ts b/src/api/shared.ts index e56309a..7712f66 100644 --- a/src/api/shared.ts +++ b/src/api/shared.ts @@ -12,6 +12,7 @@ export interface ApiHandlerOptions { debug?: boolean; agent?: string; smartBucket?: boolean; + workerProxy?: boolean; } export interface ApiHandlerInternalOptions extends MakeRequired { diff --git a/src/client/client.ts b/src/client/client.ts index 316fb35..730a8d9 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -149,11 +149,7 @@ export class Client extends BaseClient { !this.__handleGuilds?.size || !((this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds) ) { - if ( - [...this.gateway.values()].every(shard => shard.data.session_id) && - this.events.values.BOT_READY && - (this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true) - ) { + if ([...this.gateway.values()].every(shard => shard.data.session_id)) { await this.events.runEvent('BOT_READY', this, this.me, -1); } delete this.__handleGuilds; @@ -163,12 +159,7 @@ export class Client extends BaseClient { case 'GUILD_CREATE': { if (this.__handleGuilds?.has(packet.d.id)) { this.__handleGuilds.delete(packet.d.id); - if ( - !this.__handleGuilds.size && - [...this.gateway.values()].every(shard => shard.data.session_id) && - this.events.values.BOT_READY && - (this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true) - ) { + if (!this.__handleGuilds.size && [...this.gateway.values()].every(shard => shard.data.session_id)) { await this.events.runEvent('BOT_READY', this, this.me, -1); } if (!this.__handleGuilds.size) delete this.__handleGuilds; diff --git a/src/client/oninteractioncreate.ts b/src/client/oninteractioncreate.ts index e5f6078..2d5fe99 100644 --- a/src/client/oninteractioncreate.ts +++ b/src/client/oninteractioncreate.ts @@ -209,8 +209,8 @@ export async function onInteractionCreate( case InteractionType.MessageComponent: { const interaction = BaseInteraction.from(self, body, __reply) as ComponentInteraction; - if (self.components.hasComponent([body.message.id, body.id], interaction.customId)) { - await self.components.onComponent([body.message.id, body.id], interaction); + if (self.components.hasComponent(body.message.id, interaction.customId)) { + await self.components.onComponent(body.message.id, interaction); } else { await self.components.executeComponent(interaction); } diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index d959e0f..697b2ef 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,4 +1,5 @@ import { workerData as __workerData__, parentPort as manager } from 'node:worker_threads'; +import { ApiHandler } from '..'; import type { Cache } from '../cache'; import { WorkerAdapter } from '../cache'; import type { GatewayDispatchPayload, GatewaySendPayload, When } from '../common'; @@ -53,6 +54,15 @@ export class WorkerClient extends BaseClient { logLevel: LogLevels.Debug, }); } + if (workerData.workerProxy) { + this.setServices({ + rest: new ApiHandler({ + token: workerData.token, + workerProxy: true, + debug: workerData.debug, + }), + }); + } } get workerId() { @@ -106,6 +116,7 @@ export class WorkerClient extends BaseClient { manager!.postMessage({ type: 'RESULT_PAYLOAD', nonce: data.nonce, + workerId: this.workerId, } satisfies WorkerSendResultPayload); } break; @@ -169,6 +180,7 @@ export class WorkerClient extends BaseClient { ...generateShardInfo(shard), nonce: data.nonce, type: 'SHARD_INFO', + workerId: this.workerId, } satisfies WorkerSendShardInfo); } break; @@ -183,11 +195,15 @@ export class WorkerClient extends BaseClient { } break; case 'BOT_READY': - if ( - this.events.values.BOT_READY && - (this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true) - ) { - await this.events.runEvent('BOT_READY', this, this.me, -1); + await this.events.runEvent('BOT_READY', this, this.me, -1); + break; + case 'API_RESPONSE': + { + const promise = this.rest.workerPromises!.get(data.nonce); + if (!promise) return; + this.rest.workerPromises!.delete(data.nonce); + if (data.error) return promise.reject(data.error); + promise.resolve(data.response); } break; } @@ -219,11 +235,7 @@ export class WorkerClient extends BaseClient { !this.__handleGuilds?.size || !((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds) ) { - if ( - [...this.shards.values()].every(shard => shard.data.session_id) && - this.events.values.WORKER_READY && - (this.events.values.WORKER_READY.fired ? !this.events.values.WORKER_READY.data.once : true) - ) { + if ([...this.shards.values()].every(shard => shard.data.session_id)) { manager!.postMessage({ type: 'WORKER_READY', workerId: this.workerId, @@ -243,12 +255,7 @@ export class WorkerClient extends BaseClient { case 'GUILD_CREATE': { if (this.__handleGuilds?.has(packet.d.id)) { this.__handleGuilds.delete(packet.d.id); - if ( - !this.__handleGuilds.size && - [...this.shards.values()].every(shard => shard.data.session_id) && - this.events.values.WORKER_READY && - (this.events.values.WORKER_READY.fired ? !this.events.values.WORKER_READY.data.once : true) - ) { + if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) { manager!.postMessage({ type: 'WORKER_READY', workerId: this.workerId, diff --git a/src/components/handler.ts b/src/components/handler.ts index 2b93169..4a5aaf5 100644 --- a/src/components/handler.ts +++ b/src/components/handler.ts @@ -71,31 +71,28 @@ export class ComponentHandler extends BaseHandler { }; } - async onComponent(ids: [string, string], interaction: ComponentInteraction) { - for (const id of ids) { - const row = this.values.get(id); - const component = row?.components?.[interaction.customId]; - if (!component) continue; - if (row.options?.filter) { - if (!(await row.options.filter(interaction))) return; - } - row.idle?.refresh(); - await component( - interaction, - reason => { - row.options?.onStop?.(reason ?? 'stop'); - this.deleteValue(id); - }, - () => { - this.resetTimeouts(id); - }, - ); - break; + async onComponent(id: string, interaction: ComponentInteraction) { + const row = this.values.get(id); + const component = row?.components?.[interaction.customId]; + if (!component) return; + if (row.options?.filter) { + if (!(await row.options.filter(interaction))) return; } + row.idle?.refresh(); + await component( + interaction, + reason => { + row.options?.onStop?.(reason ?? 'stop'); + this.deleteValue(id); + }, + () => { + this.resetTimeouts(id); + }, + ); } - hasComponent(ids: [string, string], customId: string) { - return ids.some(id => this.values.get(id)?.components?.[customId]); + hasComponent(id: string, customId: string) { + return this.values.get(id)?.components?.[customId]; } resetTimeouts(id: string) { diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 498ebf0..9da52e1 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -46,6 +46,8 @@ export interface WorkerManagerOptions extends Omit = { type: T } & D; +type CreateWorkerMessage = { + type: T; + workerId: number; +} & D; -export type WorkerRequestConnect = CreateWorkerMessage<'CONNECT_QUEUE', { shardId: number; workerId: number }>; +export type WorkerRequestConnect = CreateWorkerMessage<'CONNECT_QUEUE', { shardId: number }>; export type WorkerReceivePayload = CreateWorkerMessage< 'RECEIVE_PAYLOAD', - { shardId: number; workerId: number; payload: GatewayDispatchPayload } + { shardId: number; payload: GatewayDispatchPayload } >; export type WorkerSendResultPayload = CreateWorkerMessage<'RESULT_PAYLOAD', { nonce: string }>; export type WorkerSendCacheRequest = CreateWorkerMessage< @@ -37,15 +41,18 @@ export type WorkerSendCacheRequest = CreateWorkerMessage< | 'removeRelationship' | 'removeToRelationship'; args: any[]; - workerId: number; } >; export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>; export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>; -export type WorkerReady = CreateWorkerMessage< - 'WORKER_READY', +export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>; +export type WorkerSendApiRequest = CreateWorkerMessage< + 'WORKER_API_REQUEST', { - workerId: number; + method: HttpMethods; + url: `/${string}`; + requestOptions: ApiRequestOptions; + nonce: string; } >; @@ -56,4 +63,5 @@ export type WorkerMessage = | WorkerSendCacheRequest | WorkerSendShardInfo | WorkerSendInfo - | WorkerReady; + | WorkerReady + | WorkerSendApiRequest; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index 5926dda..b06fd23 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -17,6 +17,7 @@ import { MemberUpdateHandler } from './events/memberUpdate'; import { PresenceUpdateHandler } from './events/presenceUpdate'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; + export class WorkerManager extends Map { options!: Required; debugger?: Logger; @@ -28,7 +29,7 @@ export class WorkerManager extends Map { rest!: ApiHandler; constructor(options: MakePartial) { super(); - this.options = MergeOptions>(WorkerManagerDefaults, options); + this.options = MergeOptions(WorkerManagerDefaults, options); this.cacheAdapter = new MemoryAdapter(); } @@ -126,6 +127,7 @@ export class WorkerManager extends Map { shards: shards[i], intents: this.options.intents, workerId: i, + workerProxy: this.options.workerProxy, }); this.set(i, worker); } @@ -246,6 +248,16 @@ export class WorkerManager extends Map { } } break; + case 'WORKER_API_REQUEST': + { + const response = await this.rest.request(message.method, message.url, message.requestOptions); + this.get(message.workerId)!.postMessage({ + nonce: message.nonce, + response, + type: 'API_RESPONSE', + } satisfies ManagerSendApiResponse); + } + break; } } @@ -334,7 +346,8 @@ export class WorkerManager extends Map { token: this.options.token, baseUrl: 'api/v10', domain: 'https://discord.com', - }); //TODO: share ratelimits with all workers + debug: this.options.debug, + }); this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get(); this.options.totalShards ??= this.options.info.shards; this.options = MergeOptions>(WorkerManagerDefaults, this.options); @@ -380,6 +393,14 @@ export type ManagerRequestShardInfo = CreateManagerMessage<'SHARD_INFO', { nonce export type ManagerRequestWorkerInfo = CreateManagerMessage<'WORKER_INFO', { nonce: string }>; export type ManagerSendCacheResult = CreateManagerMessage<'CACHE_RESULT', { nonce: string; result: any }>; export type ManagerSendBotReady = CreateManagerMessage<'BOT_READY'>; +export type ManagerSendApiResponse = CreateManagerMessage< + 'API_RESPONSE', + { + response: any; + error?: any; + nonce: string; + } +>; export type ManagerMessages = | ManagerAllowConnect @@ -388,4 +409,5 @@ export type ManagerMessages = | ManagerRequestShardInfo | ManagerRequestWorkerInfo | ManagerSendCacheResult - | ManagerSendBotReady; + | ManagerSendBotReady + | ManagerSendApiResponse;