diff --git a/src/client/client.ts b/src/client/client.ts index 09b7059..1320904 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -1,12 +1,13 @@ import { parentPort, workerData } from 'node:worker_threads'; import type { Command, CommandContext, Message, SubCommand } from '..'; -import type { - DeepPartial, - GatewayDispatchPayload, - GatewayPresenceUpdateData, - If, - WatcherPayload, - WatcherSendToShard, +import { + GatewayIntentBits, + type DeepPartial, + type GatewayDispatchPayload, + type GatewayPresenceUpdateData, + type If, + type WatcherPayload, + type WatcherSendToShard, } from '../common'; import { EventHandler } from '../events'; import { ClientUser } from '../structures'; @@ -145,15 +146,18 @@ export class Client extends BaseClient { this.botId = packet.d.user.id; this.applicationId = packet.d.application.id; this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; - if (!this.__handleGuilds?.size) { + if ( + !this.__handleGuilds?.size || + !((this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds) + ) { if ( [...this.gateway.values()].every(shard => shard.data.session_id) && this.events.values.BOT_READY && (this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true) ) { await this.events.runEvent('BOT_READY', this, this.me, -1); - delete this.__handleGuilds; } + delete this.__handleGuilds; } this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`); break; @@ -168,6 +172,7 @@ export class Client extends BaseClient { ) { await this.events.runEvent('BOT_READY', this, this.me, -1); } + if (!this.__handleGuilds.size) delete this.__handleGuilds; return; } break; diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index a51224c..eee401e 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -2,7 +2,7 @@ import { workerData as __workerData__, parentPort as manager } from 'node:worker import type { Cache } from '../cache'; import { WorkerAdapter } from '../cache'; import type { GatewayDispatchPayload, GatewaySendPayload, When } from '../common'; -import { LogLevels, Logger, type DeepPartial } from '../common'; +import { GatewayIntentBits, LogLevels, Logger, type DeepPartial } from '../common'; import { EventHandler } from '../events'; import { ClientUser } from '../structures'; import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket'; @@ -60,6 +60,14 @@ export class WorkerClient extends BaseClient { return workerData.workerId; } + get latency() { + let acc = 0; + + this.shards.forEach(s => (acc += s.latency)); + + return acc / this.shards.size; + } + async start(options: Omit, 'httpConnection' | 'token' | 'connection'> = {}) { await super.start(options); await this.loadEvents(options.eventsDir); @@ -208,7 +216,10 @@ export class WorkerClient extends BaseClient { this.botId = packet.d.user.id; this.applicationId = packet.d.application.id; this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; - if (!this.__handleGuilds?.size) { + if ( + !this.__handleGuilds?.size || + !((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds) + ) { if ( [...this.shards.values()].every(shard => shard.data.session_id) && this.events.values.WORKER_READY && @@ -219,8 +230,8 @@ export class WorkerClient extends BaseClient { workerId: this.workerId, } as WorkerReady); await this.events.runEvent('WORKER_READY', this, this.me, -1); - delete this.__handleGuilds; } + delete this.__handleGuilds; } this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`); break; @@ -245,6 +256,7 @@ export class WorkerClient extends BaseClient { } as WorkerReady); await this.events.runEvent('WORKER_READY', this, this.me, -1); } + if (!this.__handleGuilds.size) delete this.__handleGuilds; return; } } diff --git a/src/common/types/util.ts b/src/common/types/util.ts index b98335e..77a352d 100644 --- a/src/common/types/util.ts +++ b/src/common/types/util.ts @@ -14,7 +14,7 @@ export type ToClass = new ( export type StringToNumber = T extends `${infer N extends number}` ? N : never; -export type MakePartial = T & { [P in K]?: T[P] }; +export type MakePartial = Omit & { [P in K]?: T[P] }; export type DeepPartial = { [K in keyof T]?: T[K] extends Record diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index f53d313..576b029 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -1,7 +1,15 @@ import { randomUUID } from 'node:crypto'; import { Worker } from 'node:worker_threads'; +import { ApiHandler, Router } from '../..'; import { MemoryAdapter, type Adapter } from '../../cache'; -import { Logger, MergeOptions, type GatewayPresenceUpdateData, type GatewaySendPayload } from '../../common'; +import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; +import { + Logger, + type MakePartial, + MergeOptions, + type GatewayPresenceUpdateData, + type GatewaySendPayload, +} from '../../common'; import { WorkerManagerDefaults } from '../constants'; import { SequentialBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; @@ -9,39 +17,18 @@ import { MemberUpdateHandler } from './events/memberUpdate'; import { PresenceUpdateHandler } from './events/presenceUpdate'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; - -export class WorkerManager extends Map { - options: Required; +export class WorkerManager extends Map { + options!: Required; debugger?: Logger; - connectQueue: ConnectQueue; + connectQueue!: ConnectQueue; cacheAdapter: Adapter; promises = new Map void; timeout: NodeJS.Timeout }>(); memberUpdateHandler = new MemberUpdateHandler(); presenceUpdateHandler = new PresenceUpdateHandler(); - constructor(options: WorkerManagerOptions) { + rest!: ApiHandler; + constructor(options: MakePartial) { super(); - options.totalShards ??= options.info.shards; this.options = MergeOptions>(WorkerManagerDefaults, options); - this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker); - this.options.info.shards = options.totalShards; - options.shardEnd ??= options.totalShards; - options.shardStart ??= 0; - this.connectQueue = new ConnectQueue(5.5e3, this.concurrency); - - if (this.options.debug) { - this.debugger = new Logger({ - name: '[WorkerManager]', - }); - } - - if (this.totalShards / this.shardsPerWorker > this.workers) { - throw new Error( - `Cannot create enough shards in the specified workers, minimum: ${Math.ceil( - this.totalShards / this.shardsPerWorker, - )}`, - ); - } - this.cacheAdapter = new MemoryAdapter(); } @@ -49,6 +36,10 @@ export class WorkerManager extends Map { this.cacheAdapter = adapter; } + setRest(rest: ApiHandler) { + this.rest = rest; + } + get remaining() { return this.options.info.session_start_limit.remaining; } @@ -244,10 +235,14 @@ export class WorkerManager extends Map { break; case 'WORKER_READY': { - if (message.workerId === [...this.keys()].at(-1)) { + this.get(message.workerId)!.ready = true; + if ([...this.values()].every(w => w.ready)) { this.get(this.keys().next().value)?.postMessage({ type: 'BOT_READY', } satisfies ManagerSendBotReady); + this.forEach(w => { + delete w.ready; + }); } } break; @@ -330,6 +325,38 @@ export class WorkerManager extends Map { } async start() { + const rc = await BaseClient.prototype.getRC(); + + this.options.debug ||= rc.debug; + this.options.intents ||= rc.intents ?? 0; + this.options.token ??= rc.token; + this.rest ??= new ApiHandler({ + token: this.options.token, + baseUrl: 'api/v10', + domain: 'https://discord.com', + }); //TODO: share ratelimits with all workers + this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get(); + this.options.totalShards ??= this.options.info.shards; + this.options = MergeOptions>(WorkerManagerDefaults, this.options); + this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker); + this.options.info.shards = this.options.totalShards; + this.options.shardEnd ??= this.options.totalShards; + this.options.shardStart ??= 0; + this.connectQueue = new ConnectQueue(5.5e3, this.concurrency); + + if (this.options.debug) { + this.debugger = new Logger({ + name: '[WorkerManager]', + }); + } + if (this.totalShards / this.shardsPerWorker > this.workers) { + throw new Error( + `Cannot create enough shards in the specified workers, minimum: ${Math.ceil( + this.totalShards / this.shardsPerWorker, + )}`, + ); + } + const spaces = this.prepareSpaces(); await this.prepareWorkers(spaces); }