From e4233e6a4019734abc4824964acff1a49a69691d Mon Sep 17 00:00:00 2001 From: MARCROCK22 Date: Sun, 10 Nov 2024 21:51:28 +0000 Subject: [PATCH] fix: types & workerClient#resumeShard --- src/api/api.ts | 18 +++-- src/client/base.ts | 33 +++------ src/client/client.ts | 4 +- src/client/workerclient.ts | 93 ++++++++++++++++---------- src/commands/handler.ts | 22 +++--- src/components/modalcontext.ts | 3 +- src/events/handler.ts | 8 +-- src/websocket/discord/shard.ts | 4 +- src/websocket/discord/shared.ts | 2 + src/websocket/discord/worker.ts | 1 + src/websocket/discord/workermanager.ts | 28 ++++++-- 11 files changed, 124 insertions(+), 92 deletions(-) diff --git a/src/api/api.ts b/src/api/api.ts index de88f4a..a3ab6e7 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -1,5 +1,5 @@ import { type UUID, randomUUID } from 'node:crypto'; -import { type Awaitable, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common'; +import { type Awaitable, BASE_HOST, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common'; import type { WorkerData } from '../websocket'; import type { WorkerSendApiRequest } from '../websocket/discord/worker'; import { CDNRouter, Router } from './Router'; @@ -39,16 +39,12 @@ export class ApiHandler { constructor(options: ApiHandlerOptions) { this.options = { baseUrl: 'api/v10', - domain: 'https://discord.com', + domain: BASE_HOST, type: 'Bot', ...options, userAgent: DefaultUserAgent, }; - if (options.debug) { - this.debugger = new Logger({ - name: '[API]', - }); - } + if (options.debug) this.debug = true; const worker_threads = lazyLoadPackage('node:worker_threads'); @@ -61,6 +57,14 @@ export class ApiHandler { } } + set debug(active: boolean) { + this.debugger = active + ? new Logger({ + name: '[API]', + }) + : undefined; + } + get proxy() { return (this._proxy_ ??= new Router(this).createProxy()); } diff --git a/src/client/base.ts b/src/client/base.ts index 983b99a..5afe4a8 100644 --- a/src/client/base.ts +++ b/src/client/base.ts @@ -59,8 +59,8 @@ import type { LocaleString, RESTPostAPIChannelMessageJSONBody } from '../types'; import type { MessageStructure } from './transformers'; export class BaseClient { - rest!: ApiHandler; - cache!: Cache; + rest = new ApiHandler({ token: 'INVALID' }); + cache = new Cache(0, new MemoryAdapter()); applications = new ApplicationShorter(this); users = new UsersShorter(this); @@ -194,6 +194,7 @@ export class BaseClient { setServices({ rest, cache, langs, middlewares, handleCommand }: ServicesOptions) { if (rest) { + rest.onRatelimit ??= this.rest.onRatelimit?.bind(rest); this.rest = rest; } if (cache) { @@ -213,7 +214,7 @@ export class BaseClient { 'users', 'voiceStates', ]; - let disabledCache: Partial> = this.cache?.disabledCache ?? {}; + let disabledCache: Partial> = this.cache.disabledCache; if (typeof cache.disabledCache === 'boolean') { for (const i of caches) { @@ -227,12 +228,7 @@ export class BaseClient { disabledCache = cache.disabledCache; } - this.cache = new Cache( - this.cache?.intents ?? 0, - cache?.adapter ?? this.cache?.adapter ?? new MemoryAdapter(), - disabledCache, - this, - ); + this.cache = new Cache(this.cache.intents, cache.adapter ?? this.cache.adapter, disabledCache, this); } if (middlewares) { this.middlewares = middlewares; @@ -270,22 +266,12 @@ export class BaseClient { const { token: tokenRC, debug } = await this.getRC(); const token = options?.token ?? tokenRC; + BaseClient.assertString(token, 'token is not a string'); - if (!this.rest) { - BaseClient.assertString(token, 'token is not a string'); - this.rest = new ApiHandler({ - token, - baseUrl: 'api/v10', - domain: 'https://discord.com', - debug, - }); - } + if (this.rest.options.token === 'INVALID') this.rest.options.token = token; + this.rest.debug = debug; - if (this.cache) { - this.cache.__setClient(this); - } else { - this.cache = new Cache(0, new MemoryAdapter(), {}, this); - } + this.cache.__setClient(this); if (!this.handleCommand) this.handleCommand = new HandleCommand(this); @@ -310,7 +296,6 @@ export class BaseClient { } private syncCachePath(cachePath: string) { - this.logger.debug('Syncing commands cache'); return promises.writeFile( cachePath, JSON.stringify( diff --git a/src/client/client.ts b/src/client/client.ts index eb74f6b..ece7aec 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -94,6 +94,7 @@ export class Client extends BaseClient { const { token: tokenRC, intents: intentsRC, debug: debugRC } = await this.getRC(); const token = options?.token ?? tokenRC; const intents = options?.connection?.intents ?? intentsRC; + this.cache.intents = intents; if (!this.gateway) { BaseClient.assertString(token, 'token is not a string'); @@ -123,8 +124,6 @@ export class Client extends BaseClient { }); } - this.cache.intents = this.gateway.options.intents; - if (execute) { await this.execute(options.connection); } else { @@ -138,7 +137,6 @@ export class Client extends BaseClient { this.collectors.run('RAW', packet, this), ]); //ignore promise switch (packet.t) { - // Cases where we must obtain the old data before updating case 'GUILD_MEMBER_UPDATE': { if (!this.memberUpdateHandler.check(packet.d)) { diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 26ac2ca..e85b1a8 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,7 +1,7 @@ import { type UUID, randomUUID } from 'node:crypto'; import { ApiHandler, Logger } from '..'; import { WorkerAdapter } from '../cache'; -import { type DeepPartial, LogLevels, type When, hasIntent, lazyLoadPackage } from '../common'; +import { type DeepPartial, LogLevels, type MakeRequired, type When, hasIntent, lazyLoadPackage } from '../common'; import { EventHandler } from '../events'; import type { GatewayDispatchPayload, GatewaySendPayload } from '../types'; import { Shard, type ShardManagerOptions, type WorkerData, properties } from '../websocket'; @@ -23,13 +23,14 @@ import type { WorkerStart, WorkerStartResharding, } from '../websocket/discord/worker'; -import type { ManagerMessages } from '../websocket/discord/workermanager'; +import type { ManagerMessages, ManagerSpawnShards } from '../websocket/discord/workermanager'; import type { BaseClientOptions, ServicesOptions, StartOptions } from './base'; import { BaseClient } from './base'; import type { Client, ClientOptions } from './client'; import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate'; import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate'; +import type { ShardData } from '../websocket/discord/shared'; import { Collectors } from './collectors'; import { type ClientUserStructure, Transformers } from './transformers'; @@ -40,7 +41,7 @@ try { debug: process.env.SEYFERT_WORKER_DEBUG === 'true', intents: Number(process.env.SEYFERT_WORKER_INTENTS), path: process.env.SEYFERT_WORKER_PATH!, - shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number(id)), + shards: JSON.parse(process.env.SEYFERT_WORKER_SHARDS!), token: process.env.SEYFERT_WORKER_TOKEN!, workerId: Number(process.env.SEYFERT_WORKER_WORKERID), workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true', @@ -48,6 +49,8 @@ try { mode: process.env.SEYFERT_WORKER_MODE as 'custom' | 'threads' | 'clusters', resharding: process.env.SEYFERT_WORKER_RESHARDING === 'true', totalWorkers: Number(process.env.SEYFERT_WORKER_TOTALWORKERS), + info: JSON.parse(process.env.SEYFERT_WORKER_INFO!), + compress: process.env.SEYFERT_WORKER_COMPRESS === 'true', } satisfies WorkerData; } catch { // @@ -155,6 +158,7 @@ export class WorkerClient extends BaseClient { }), }); } + this.cache.intents = workerData.intents; this.postMessage({ type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START', workerId: workerData.workerId, @@ -164,7 +168,6 @@ export class WorkerClient extends BaseClient { } await super.start(options); await this.loadEvents(options.eventsDir); - this.cache.intents = workerData.intents; } async loadEvents(dir?: string) { @@ -295,9 +298,6 @@ export class WorkerClient extends BaseClient { break; case 'SPAWN_SHARDS': { - const onPacket = this.onPacket.bind(this); - const handlePayload = this.options?.handlePayload?.bind(this); - const self = this; for (const id of workerData.shards) { const existsShard = this.shards.has(id); if (existsShard) { @@ -305,28 +305,7 @@ export class WorkerClient extends BaseClient { continue; } - const shard = new Shard(id, { - token: workerData.token, - intents: workerData.intents, - info: data.info, - compress: data.compress, - debugger: this.debugger, - properties: { - ...properties, - ...this.options.gateway?.properties, - }, - async handlePayload(shardId, payload) { - await handlePayload?.(shardId, payload); - await onPacket(payload, shardId); - if (self.options.sendPayloadToParent) - self.postMessage({ - workerId: workerData.workerId, - shardId, - type: 'RECEIVE_PAYLOAD', - payload, - } satisfies WorkerReceivePayload); - }, - }); + const shard = this.createShard(id, data); this.shards.set(id, shard); this.postMessage({ type: 'CONNECT_QUEUE', @@ -380,7 +359,7 @@ export class WorkerClient extends BaseClient { let result: unknown; try { result = await eval(` - (${data.func})(this) + (${data.func})(this, ${data.vars}) `); } catch (e) { result = e; @@ -455,7 +434,7 @@ export class WorkerClient extends BaseClient { }); } - tellWorker(workerId: number, func: (_: this) => R) { + tellWorker>(workerId: number, func: (_: this, vars: V) => R, vars: V) { const nonce = this.generateNonce(); this.postMessage({ type: 'EVAL_TO_WORKER', @@ -463,25 +442,71 @@ export class WorkerClient extends BaseClient { toWorkerId: workerId, workerId: workerData.workerId, nonce, + vars: JSON.stringify(vars), } satisfies WorkerSendToWorkerEval); return this.generateSendPromise(nonce); } - tellWorkers(func: (_: this) => R) { + tellWorkers>(func: (_: this, vars: V) => R, vars: V) { const promises: Promise[] = []; for (let i = 0; i < workerData.totalWorkers; i++) { - promises.push(this.tellWorker(i, func)); + promises.push(this.tellWorker(i, func, vars)); } return Promise.all(promises); } + createShard(id: number, data: Pick) { + const onPacket = this.onPacket.bind(this); + const handlePayload = this.options?.handlePayload?.bind(this); + const self = this; + const shard = new Shard(id, { + token: workerData.token, + intents: workerData.intents, + info: data.info, + compress: data.compress, + debugger: this.debugger, + properties: { + ...properties, + ...this.options.gateway?.properties, + }, + async handlePayload(shardId, payload) { + await handlePayload?.(shardId, payload); + await onPacket(payload, shardId); + if (self.options.sendPayloadToParent) + self.postMessage({ + workerId: workerData.workerId, + shardId, + type: 'RECEIVE_PAYLOAD', + payload, + } satisfies WorkerReceivePayload); + }, + }); + + return shard; + } + + async resumeShard(shardId: number, shardData: MakeRequired) { + const exists = (await this.tellWorkers((r, vars) => r.shards.has(vars.shardId), { shardId })).some(x => x); + if (exists) throw new Error('Cannot override existing shard'); + const shard = this.createShard(shardId, { + info: this.workerData.info, + compress: this.workerData.compress, + }); + shard.data = shardData; + this.shards.set(shardId, shard); + return this.postMessage({ + workerId: this.workerId, + shardId, + type: 'CONNECT_QUEUE', + }); + } + protected async onPacket(packet: GatewayDispatchPayload, shardId: number) { Promise.allSettled([ this.events?.runEvent('RAW', this, packet, shardId, false), this.collectors.run('RAW', packet, this), ]); //ignore promise switch (packet.t) { - //// Cases where we must obtain the old data before updating case 'GUILD_MEMBER_UPDATE': { if (!this.memberUpdateHandler.check(packet.d)) { diff --git a/src/commands/handler.ts b/src/commands/handler.ts index 2bb4e43..2cf0072 100644 --- a/src/commands/handler.ts +++ b/src/commands/handler.ts @@ -466,16 +466,18 @@ export class CommandHandler extends BaseHandler { stablishContextCommandDefaults(commandInstance: InstanceType): ContextMenuCommand | false { if (!(commandInstance instanceof ContextMenuCommand)) return false; commandInstance.onAfterRun ??= this.client.options.commands?.defaults?.onAfterRun; - //@ts-expect-error magic. - commandInstance.onBotPermissionsFail ??= this.client.options.commands?.defaults?.onBotPermissionsFail; - //@ts-expect-error magic. - commandInstance.onInternalError ??= this.client.options.commands?.defaults?.onInternalError; - //@ts-expect-error magic. - commandInstance.onMiddlewaresError ??= this.client.options.commands?.defaults?.onMiddlewaresError; - //@ts-expect-error magic. - commandInstance.onPermissionsFail ??= this.client.options.commands?.defaults?.onPermissionsFail; - //@ts-expect-error magic. - commandInstance.onRunError ??= this.client.options.commands?.defaults?.onRunError; + + if (this.client.options.commands?.defaults?.onBotPermissionsFail) + commandInstance.onBotPermissionsFail ??= this.client.options.commands?.defaults?.onBotPermissionsFail; + + if (this.client.options.commands?.defaults?.onInternalError) + commandInstance.onInternalError ??= this.client.options.commands.defaults.onInternalError; + + if (this.client.options.commands?.defaults?.onMiddlewaresError) + commandInstance.onMiddlewaresError ??= this.client.options.commands.defaults.onMiddlewaresError; + + if (this.client.options.commands?.defaults?.onRunError) + commandInstance.onRunError ??= this.client.options.commands.defaults.onRunError; return commandInstance; } diff --git a/src/components/modalcontext.ts b/src/components/modalcontext.ts index c4e1c31..1eff411 100644 --- a/src/components/modalcontext.ts +++ b/src/components/modalcontext.ts @@ -9,6 +9,7 @@ import type { ModalCreateBodyRequest, UnionToTuple, } from '../common'; +import type { Interaction } from '../structures/Interaction'; import { MessageFlags } from '../types'; export interface ModalContext extends BaseContext, ExtendContext {} @@ -101,7 +102,7 @@ export class ModalContext extends return this.interaction.deleteResponse(); } - modal(body: ModalCreateBodyRequest) { + modal(body: ModalCreateBodyRequest): ReturnType { //@ts-expect-error return this.interaction.modal(body); } diff --git a/src/events/handler.ts b/src/events/handler.ts index 6abb746..cb7961b 100644 --- a/src/events/handler.ts +++ b/src/events/handler.ts @@ -224,21 +224,21 @@ export class EventHandler extends BaseHandler { async runCustom(name: T, ...args: ResolveEventRunParams) { const Event = this.values[name]; if (!Event) { - // @ts-expect-error working with non-existent types is hard + // @ts-expect-error return this.client.collectors.run(name, args, this.client); } try { if (Event.data.once && Event.fired) { - // @ts-expect-error working with non-existent types is hard + // @ts-expect-error return this.client.collectors.run(name, args, this.client); } Event.fired = true; this.logger.debug(`executed a custom event [${name}]`, Event.data.once ? 'once' : ''); await Promise.all([ - // @ts-expect-error working with non-existent types is hard + // @ts-expect-error Event.run(...args, this.client), - // @ts-expect-error working with non-existent types is hard + // @ts-expect-error this.client.collectors.run(name, args, this.client), ]); } catch (e) { diff --git a/src/websocket/discord/shard.ts b/src/websocket/discord/shard.ts index db37c65..847dbdb 100644 --- a/src/websocket/discord/shard.ts +++ b/src/websocket/discord/shard.ts @@ -105,8 +105,8 @@ export class Shard { this.debugger?.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`); - // @ts-expect-error @types/bun cause erros in compile - // biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol + // @ts-expect-error Use native websocket when using Bun + // biome-ignore lint/correctness/noUndeclaredVariables: /\ this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL); this.websocket.onmessage = ({ data }: { data: string | Buffer }) => { diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 61e2582..1324dbd 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -140,6 +140,8 @@ export interface WorkerData { workerId: number; debug: boolean; workerProxy: boolean; + info: APIGatewayBotInfo; + compress: boolean; __USING_WATCHER__?: boolean; resharding: boolean; } diff --git a/src/websocket/discord/worker.ts b/src/websocket/discord/worker.ts index 5f79906..08a3ef7 100644 --- a/src/websocket/discord/worker.ts +++ b/src/websocket/discord/worker.ts @@ -81,6 +81,7 @@ export type WorkerSendToWorkerEval = CreateWorkerMessage< { func: string; nonce: string; + vars: string; toWorkerId: number; } >; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index 6e25d69..92b9c6c 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -4,7 +4,7 @@ import type { Worker as WorkerThreadsWorker } from 'node:worker_threads'; import { ApiHandler, Logger, type UsingClient, type WorkerClient } from '../..'; import { type Adapter, MemoryAdapter } from '../../cache'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; -import { type MakePartial, MergeOptions, lazyLoadPackage } from '../../common'; +import { BASE_HOST, type MakePartial, MergeOptions, lazyLoadPackage } from '../../common'; import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBotResult } from '../../types'; import { WorkerManagerDefaults, properties } from '../constants'; import { DynamicBucket } from '../structures'; @@ -169,6 +169,11 @@ export class WorkerManager extends Map< mode: this.options.mode, resharding, totalWorkers: shards.length, + info: { + ...this.options.info, + shards: this.totalShards, + }, + compress: this.options.compress, }); this.set(i, worker); }); @@ -193,7 +198,8 @@ export class WorkerManager extends Map< }; if (workerData.resharding) env.SEYFERT_WORKER_RESHARDING = 'true'; for (const i in workerData) { - env[`SEYFERT_WORKER_${i.toUpperCase()}`] = workerData[i as keyof WorkerData]; + const data = workerData[i as keyof WorkerData]; + env[`SEYFERT_WORKER_${i.toUpperCase()}`] = typeof data === 'object' && data ? JSON.stringify(data) : data; } switch (this.options.mode) { case 'threads': { @@ -425,6 +431,7 @@ export class WorkerManager extends Map< func: message.func, type: 'EXECUTE_EVAL_TO_WORKER', toWorkerId: message.toWorkerId, + vars: message.vars, } satisfies ManagerExecuteEvalToWorker); this.generateSendPromise(nonce, 'Eval timeout').then(val => this.postMessage(message.workerId, { @@ -503,20 +510,25 @@ export class WorkerManager extends Map< return this.generateSendPromise(nonce, 'Get worker info timeout'); } - tellWorker(workerId: number, func: (_: WorkerClient & UsingClient) => R) { + tellWorker>( + workerId: number, + func: (_: WorkerClient & UsingClient, vars: V) => R, + vars: V, + ) { const nonce = this.generateNonce(); this.postMessage(workerId, { type: 'EXECUTE_EVAL', func: func.toString(), nonce, + vars: JSON.stringify(vars), } satisfies ManagerExecuteEval); return this.generateSendPromise(nonce); } - tellWorkers(func: (_: WorkerClient & UsingClient) => R) { + tellWorkers>(func: (_: WorkerClient & UsingClient, vars: V) => R, vars: V) { const promises: Promise[] = []; for (const i of this.keys()) { - promises.push(this.tellWorker(i, func)); + promises.push(this.tellWorker(i, func, vars)); } return Promise.all(promises); } @@ -530,7 +542,7 @@ export class WorkerManager extends Map< this.rest ??= new ApiHandler({ token: this.options.token, baseUrl: 'api/v10', - domain: 'https://discord.com', + domain: BASE_HOST, debug: this.options.debug, }); this.options.info ??= await this.rest.proxy.gateway.bot.get(); @@ -562,7 +574,7 @@ export class WorkerManager extends Map< }, this.debugger, ); - await this.prepareWorkers(spaces); + this.prepareWorkers(spaces); // Start workers queue this.workerQueue.shift()!(); await this.startResharding(); @@ -645,6 +657,7 @@ export type ManagerExecuteEvalToWorker = CreateManagerMessage< { func: string; nonce: string; + vars: string; toWorkerId: number; } >; @@ -653,6 +666,7 @@ export type ManagerExecuteEval = CreateManagerMessage< 'EXECUTE_EVAL', { func: string; + vars: string; nonce: string; } >;