diff --git a/package.json b/package.json index 71ddc9c..9f73d82 100644 --- a/package.json +++ b/package.json @@ -20,10 +20,10 @@ "author": "MARCROCK22", "license": "MIT", "devDependencies": { - "@biomejs/biome": "1.9.2", + "@biomejs/biome": "1.9.3", "@commitlint/cli": "^19.5.0", "@commitlint/config-conventional": "^19.5.0", - "@types/node": "^22.6.1", + "@types/node": "^22.7.4", "husky": "^9.1.6", "lint-staged": "^15.2.10", "typescript": "^5.6.2" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a46d056..1c60bf7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -9,17 +9,17 @@ importers: .: devDependencies: '@biomejs/biome': - specifier: 1.9.2 - version: 1.9.2 + specifier: 1.9.3 + version: 1.9.3 '@commitlint/cli': specifier: ^19.5.0 - version: 19.5.0(@types/node@22.6.1)(typescript@5.6.2) + version: 19.5.0(@types/node@22.7.4)(typescript@5.6.2) '@commitlint/config-conventional': specifier: ^19.5.0 version: 19.5.0 '@types/node': - specifier: ^22.6.1 - version: 22.6.1 + specifier: ^22.7.4 + version: 22.7.4 husky: specifier: ^9.1.6 version: 9.1.6 @@ -44,55 +44,55 @@ packages: resolution: {integrity: sha512-EStJpq4OuY8xYfhGVXngigBJRWxftKX9ksiGDnmlY3o7B/V7KIAc9X4oiK87uPJSc/vs5L869bem5fhZa8caZw==} engines: {node: '>=6.9.0'} - '@biomejs/biome@1.9.2': - resolution: {integrity: sha512-4j2Gfwft8Jqp1X0qLYvK4TEy4xhTo4o6rlvJPsjPeEame8gsmbGQfOPBkw7ur+7/Z/f0HZmCZKqbMvR7vTXQYQ==} + '@biomejs/biome@1.9.3': + resolution: {integrity: sha512-POjAPz0APAmX33WOQFGQrwLvlu7WLV4CFJMlB12b6ZSg+2q6fYu9kZwLCOA+x83zXfcPd1RpuWOKJW0GbBwLIQ==} engines: {node: '>=14.21.3'} hasBin: true - '@biomejs/cli-darwin-arm64@1.9.2': - resolution: {integrity: sha512-rbs9uJHFmhqB3Td0Ro+1wmeZOHhAPTL3WHr8NtaVczUmDhXkRDWScaxicG9+vhSLj1iLrW47itiK6xiIJy6vaA==} + '@biomejs/cli-darwin-arm64@1.9.3': + resolution: {integrity: sha512-QZzD2XrjJDUyIZK+aR2i5DDxCJfdwiYbUKu9GzkCUJpL78uSelAHAPy7m0GuPMVtF/Uo+OKv97W3P9nuWZangQ==} engines: {node: '>=14.21.3'} cpu: [arm64] os: [darwin] - '@biomejs/cli-darwin-x64@1.9.2': - resolution: {integrity: sha512-BlfULKijNaMigQ9GH9fqJVt+3JTDOSiZeWOQtG/1S1sa8Lp046JHG3wRJVOvekTPL9q/CNFW1NVG8J0JN+L1OA==} + '@biomejs/cli-darwin-x64@1.9.3': + resolution: {integrity: sha512-vSCoIBJE0BN3SWDFuAY/tRavpUtNoqiceJ5PrU3xDfsLcm/U6N93JSM0M9OAiC/X7mPPfejtr6Yc9vSgWlEgVw==} engines: {node: '>=14.21.3'} cpu: [x64] os: [darwin] - '@biomejs/cli-linux-arm64-musl@1.9.2': - resolution: {integrity: sha512-ZATvbUWhNxegSALUnCKWqetTZqrK72r2RsFD19OK5jXDj/7o1hzI1KzDNG78LloZxftrwr3uI9SqCLh06shSZw==} + '@biomejs/cli-linux-arm64-musl@1.9.3': + resolution: {integrity: sha512-VBzyhaqqqwP3bAkkBrhVq50i3Uj9+RWuj+pYmXrMDgjS5+SKYGE56BwNw4l8hR3SmYbLSbEo15GcV043CDSk+Q==} engines: {node: '>=14.21.3'} cpu: [arm64] os: [linux] - '@biomejs/cli-linux-arm64@1.9.2': - resolution: {integrity: sha512-T8TJuSxuBDeQCQzxZu2o3OU4eyLumTofhCxxFd3+aH2AEWVMnH7Z/c3QP1lHI5RRMBP9xIJeMORqDQ5j+gVZzw==} + '@biomejs/cli-linux-arm64@1.9.3': + resolution: {integrity: sha512-vJkAimD2+sVviNTbaWOGqEBy31cW0ZB52KtpVIbkuma7PlfII3tsLhFa+cwbRAcRBkobBBhqZ06hXoZAN8NODQ==} engines: {node: '>=14.21.3'} cpu: [arm64] os: [linux] - '@biomejs/cli-linux-x64-musl@1.9.2': - resolution: {integrity: sha512-CjPM6jT1miV5pry9C7qv8YJk0FIZvZd86QRD3atvDgfgeh9WQU0k2Aoo0xUcPdTnoz0WNwRtDicHxwik63MmSg==} + '@biomejs/cli-linux-x64-musl@1.9.3': + resolution: {integrity: sha512-TJmnOG2+NOGM72mlczEsNki9UT+XAsMFAOo8J0me/N47EJ/vkLXxf481evfHLlxMejTY6IN8SdRSiPVLv6AHlA==} engines: {node: '>=14.21.3'} cpu: [x64] os: [linux] - '@biomejs/cli-linux-x64@1.9.2': - resolution: {integrity: sha512-T0cPk3C3Jr2pVlsuQVTBqk2qPjTm8cYcTD9p/wmR9MeVqui1C/xTVfOIwd3miRODFMrJaVQ8MYSXnVIhV9jTjg==} + '@biomejs/cli-linux-x64@1.9.3': + resolution: {integrity: sha512-x220V4c+romd26Mu1ptU+EudMXVS4xmzKxPVb9mgnfYlN4Yx9vD5NZraSx/onJnd3Gh/y8iPUdU5CDZJKg9COA==} engines: {node: '>=14.21.3'} cpu: [x64] os: [linux] - '@biomejs/cli-win32-arm64@1.9.2': - resolution: {integrity: sha512-2x7gSty75bNIeD23ZRPXyox6Z/V0M71ObeJtvQBhi1fgrvPdtkEuw7/0wEHg6buNCubzOFuN9WYJm6FKoUHfhg==} + '@biomejs/cli-win32-arm64@1.9.3': + resolution: {integrity: sha512-lg/yZis2HdQGsycUvHWSzo9kOvnGgvtrYRgoCEwPBwwAL8/6crOp3+f47tPwI/LI1dZrhSji7PNsGKGHbwyAhw==} engines: {node: '>=14.21.3'} cpu: [arm64] os: [win32] - '@biomejs/cli-win32-x64@1.9.2': - resolution: {integrity: sha512-JC3XvdYcjmu1FmAehVwVV0SebLpeNTnO2ZaMdGCSOdS7f8O9Fq14T2P1gTG1Q29Q8Dt1S03hh0IdVpIZykOL8g==} + '@biomejs/cli-win32-x64@1.9.3': + resolution: {integrity: sha512-cQMy2zanBkVLpmmxXdK6YePzmZx0s5Z7KEnwmrW54rcXK3myCNbQa09SwGZ8i/8sLw0H9F3X7K4rxVNGU8/D4Q==} engines: {node: '>=14.21.3'} cpu: [x64] os: [win32] @@ -169,8 +169,8 @@ packages: '@types/conventional-commits-parser@5.0.0': resolution: {integrity: sha512-loB369iXNmAZglwWATL+WRe+CRMmmBPtpolYzIebFaX4YA3x+BEfLqhUAV9WanycKI3TG1IMr5bMJDajDKLlUQ==} - '@types/node@22.6.1': - resolution: {integrity: sha512-V48tCfcKb/e6cVUigLAaJDAILdMP0fUW6BidkPK4GpGjXcfbnoHasCZDwz3N3yVt5we2RHm4XTQCpv0KJz9zqw==} + '@types/node@22.7.4': + resolution: {integrity: sha512-y+NPi1rFzDs1NdQHHToqeiX2TIS79SWEAw9GYhkkx8bD0ChpfqC+n2j5OXOCpzfojBEBt6DnEnnG9MY0zk1XLg==} JSONStream@1.3.5: resolution: {integrity: sha512-E+iruNOY8VV9s4JEbe1aNEm6MiszPRr/UfcHMz0TQh1BXSxHK+ASV1R6W4HpjBhSeS+54PIsAMCBmwD06LLsqQ==} @@ -346,8 +346,8 @@ packages: fast-deep-equal@3.1.3: resolution: {integrity: sha512-f3qQ9oQy9j2AhBe/H9VC91wLmKBCCU/gDOnKNAYG5hswO7BLKj09Hc5HYNz9cGI++xlpDCIgDaitVs03ATR84Q==} - fast-uri@3.0.1: - resolution: {integrity: sha512-MWipKbbYiYI0UC7cl8m/i/IWTqfC8YXsqjzybjddLsFjStroQzsHXkc73JutMvBiXmOvapk+axIl79ig5t55Bw==} + fast-uri@3.0.2: + resolution: {integrity: sha512-GR6f0hD7XXyNJa25Tb9BuIdN0tdr+0BMi6/CJPH3wJO1JjNG3n/VsSw38AwRdKZABm8lGbPfakLRkYzx2V9row==} fill-range@7.1.1: resolution: {integrity: sha512-YsGpe3WHLK8ZYi4tWDg2Jy3ebRz2rXowDxnld4bkQB00cc/1Zw9AWnC0i9ztDJitivtQvaI9KaLyKrc+hBW0yg==} @@ -741,46 +741,46 @@ snapshots: js-tokens: 4.0.0 picocolors: 1.1.0 - '@biomejs/biome@1.9.2': + '@biomejs/biome@1.9.3': optionalDependencies: - '@biomejs/cli-darwin-arm64': 1.9.2 - '@biomejs/cli-darwin-x64': 1.9.2 - '@biomejs/cli-linux-arm64': 1.9.2 - '@biomejs/cli-linux-arm64-musl': 1.9.2 - '@biomejs/cli-linux-x64': 1.9.2 - '@biomejs/cli-linux-x64-musl': 1.9.2 - '@biomejs/cli-win32-arm64': 1.9.2 - '@biomejs/cli-win32-x64': 1.9.2 + '@biomejs/cli-darwin-arm64': 1.9.3 + '@biomejs/cli-darwin-x64': 1.9.3 + '@biomejs/cli-linux-arm64': 1.9.3 + '@biomejs/cli-linux-arm64-musl': 1.9.3 + '@biomejs/cli-linux-x64': 1.9.3 + '@biomejs/cli-linux-x64-musl': 1.9.3 + '@biomejs/cli-win32-arm64': 1.9.3 + '@biomejs/cli-win32-x64': 1.9.3 - '@biomejs/cli-darwin-arm64@1.9.2': + '@biomejs/cli-darwin-arm64@1.9.3': optional: true - '@biomejs/cli-darwin-x64@1.9.2': + '@biomejs/cli-darwin-x64@1.9.3': optional: true - '@biomejs/cli-linux-arm64-musl@1.9.2': + '@biomejs/cli-linux-arm64-musl@1.9.3': optional: true - '@biomejs/cli-linux-arm64@1.9.2': + '@biomejs/cli-linux-arm64@1.9.3': optional: true - '@biomejs/cli-linux-x64-musl@1.9.2': + '@biomejs/cli-linux-x64-musl@1.9.3': optional: true - '@biomejs/cli-linux-x64@1.9.2': + '@biomejs/cli-linux-x64@1.9.3': optional: true - '@biomejs/cli-win32-arm64@1.9.2': + '@biomejs/cli-win32-arm64@1.9.3': optional: true - '@biomejs/cli-win32-x64@1.9.2': + '@biomejs/cli-win32-x64@1.9.3': optional: true - '@commitlint/cli@19.5.0(@types/node@22.6.1)(typescript@5.6.2)': + '@commitlint/cli@19.5.0(@types/node@22.7.4)(typescript@5.6.2)': dependencies: '@commitlint/format': 19.5.0 '@commitlint/lint': 19.5.0 - '@commitlint/load': 19.5.0(@types/node@22.6.1)(typescript@5.6.2) + '@commitlint/load': 19.5.0(@types/node@22.7.4)(typescript@5.6.2) '@commitlint/read': 19.5.0 '@commitlint/types': 19.5.0 tinyexec: 0.3.0 @@ -827,7 +827,7 @@ snapshots: '@commitlint/rules': 19.5.0 '@commitlint/types': 19.5.0 - '@commitlint/load@19.5.0(@types/node@22.6.1)(typescript@5.6.2)': + '@commitlint/load@19.5.0(@types/node@22.7.4)(typescript@5.6.2)': dependencies: '@commitlint/config-validator': 19.5.0 '@commitlint/execute-rule': 19.5.0 @@ -835,7 +835,7 @@ snapshots: '@commitlint/types': 19.5.0 chalk: 5.3.0 cosmiconfig: 9.0.0(typescript@5.6.2) - cosmiconfig-typescript-loader: 5.0.0(@types/node@22.6.1)(cosmiconfig@9.0.0(typescript@5.6.2))(typescript@5.6.2) + cosmiconfig-typescript-loader: 5.0.0(@types/node@22.7.4)(cosmiconfig@9.0.0(typescript@5.6.2))(typescript@5.6.2) lodash.isplainobject: 4.0.6 lodash.merge: 4.6.2 lodash.uniq: 4.5.0 @@ -888,9 +888,9 @@ snapshots: '@types/conventional-commits-parser@5.0.0': dependencies: - '@types/node': 22.6.1 + '@types/node': 22.7.4 - '@types/node@22.6.1': + '@types/node@22.7.4': dependencies: undici-types: 6.19.8 @@ -902,7 +902,7 @@ snapshots: ajv@8.17.1: dependencies: fast-deep-equal: 3.1.3 - fast-uri: 3.0.1 + fast-uri: 3.0.2 json-schema-traverse: 1.0.0 require-from-string: 2.0.2 @@ -993,9 +993,9 @@ snapshots: meow: 12.1.1 split2: 4.2.0 - cosmiconfig-typescript-loader@5.0.0(@types/node@22.6.1)(cosmiconfig@9.0.0(typescript@5.6.2))(typescript@5.6.2): + cosmiconfig-typescript-loader@5.0.0(@types/node@22.7.4)(cosmiconfig@9.0.0(typescript@5.6.2))(typescript@5.6.2): dependencies: - '@types/node': 22.6.1 + '@types/node': 22.7.4 cosmiconfig: 9.0.0(typescript@5.6.2) jiti: 1.21.6 typescript: 5.6.2 @@ -1057,7 +1057,7 @@ snapshots: fast-deep-equal@3.1.3: {} - fast-uri@3.0.1: {} + fast-uri@3.0.2: {} fill-range@7.1.1: dependencies: diff --git a/src/api/api.ts b/src/api/api.ts index 3795f0f..7dce284 100644 --- a/src/api/api.ts +++ b/src/api/api.ts @@ -88,7 +88,7 @@ export class ApiHandler { method, url, type: 'WORKER_API_REQUEST', - workerId: (workerData as WorkerData).workerId, + workerId: workerData.workerId, nonce, requestOptions: { auth, ...request }, } satisfies WorkerSendApiRequest, @@ -184,7 +184,7 @@ export class ApiHandler { } next(); - return resolve((result || undefined) as T); + return resolve(result || undefined); }; return new Promise((resolve, reject) => { @@ -215,10 +215,10 @@ export class ApiHandler { errMessage += `${JSON.stringify(result.errors, null, 2)}\n`; } } - if (response.status) { - return new Error(errMessage || response.statusText); + if (errMessage.length) { + return new Error(errMessage); } - return new Error('Unknown error'); + return new Error(response.statusText); } async handle50X(method: HttpMethods, url: `/${string}`, request: ApiRequestOptions, next: () => void) { @@ -318,7 +318,7 @@ export class ApiHandler { } setRatelimitsBucket(route: string, resp: Response) { - if (resp.headers.get('x-ratelimit-limit')) { + if (resp.headers.has('x-ratelimit-limit')) { this.ratelimits.get(route)!.limit = +resp.headers.get('x-ratelimit-limit')!; } @@ -327,7 +327,7 @@ export class ApiHandler { if (this.options.smartBucket) { if ( - resp.headers.get('x-ratelimit-reset-after') && + resp.headers.has('x-ratelimit-reset-after') && !this.ratelimits.get(route)!.resetAfter && Number(resp.headers.get('x-ratelimit-limit')) === Number(resp.headers.get('x-ratelimit-remaining')) + 1 ) { diff --git a/src/client/client.ts b/src/client/client.ts index c9951e9..4a097f7 100644 --- a/src/client/client.ts +++ b/src/client/client.ts @@ -5,6 +5,7 @@ import { type If, type WatcherPayload, type WatcherSendToShard, + hasIntent, lazyLoadPackage, } from '../common'; import { EventHandler } from '../events'; @@ -115,21 +116,9 @@ export class Client extends BaseClient { }, compress: this.options?.gateway?.compress, resharding: { - getInfo: () => this.proxy.gateway.bot.get(), + getInfo: this.options.resharding?.getInfo ?? (() => this.proxy.gateway.bot.get()), interval: this.options?.resharding?.interval, percentage: this.options?.resharding?.percentage, - reloadGuilds: ids => { - this.__handleGuilds = this.__handleGuilds?.concat(ids) ?? ids; - }, - onGuild: id => { - if (this.__handleGuilds) { - const index = this.__handleGuilds.indexOf(id); - if (index === -1) return false; - this.__handleGuilds.splice(index, 1); - return true; - } - return false; - }, }, }); } @@ -202,12 +191,7 @@ export class Client extends BaseClient { this.botId = packet.d.user.id; this.applicationId = packet.d.application.id; this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never; - if ( - !( - this.__handleGuilds?.length && - (this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds - ) - ) { + if (!hasIntent(this.gateway.options.intents, GatewayIntentBits.Guilds)) { if ([...this.gateway.values()].every(shard => shard.data.session_id)) { await this.events?.runEvent('BOT_READY', this, this.me, -1); } @@ -244,8 +228,7 @@ export interface ClientOptions extends BaseClientOptions { reply?: (ctx: CommandContext) => boolean; }; handlePayload?: ShardManagerOptions['handlePayload']; - resharding?: { - interval: number; - percentage: number; + resharding?: Omit, 'getInfo'> & { + getInfo?: NonNullable['getInfo']; }; } diff --git a/src/client/collectors.ts b/src/client/collectors.ts index a6b4975..17b5471 100644 --- a/src/client/collectors.ts +++ b/src/client/collectors.ts @@ -1,5 +1,5 @@ import { error } from 'node:console'; -import { randomUUID } from 'node:crypto'; +import { type UUID, randomUUID } from 'node:crypto'; import type { UsingClient } from '../commands'; import type { Awaitable, CamelCase } from '../common'; import type { CallbackEventHandler, CustomEventsKeys, GatewayEvents } from '../events'; @@ -31,7 +31,7 @@ type RunData = { export class Collectors { readonly values = new Map[]>(); - private generateRandomUUID(name: AllClientEvents) { + private generateRandomUUID(name: AllClientEvents): UUID | '*' { const collectors = this.values.get(name); if (!collectors) return '*'; diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index c138984..fec0153 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,14 +1,18 @@ -import { randomUUID } from 'node:crypto'; +import { type UUID, randomUUID } from 'node:crypto'; import { ApiHandler, Logger } from '..'; import { WorkerAdapter } from '../cache'; -import { type DeepPartial, LogLevels, type When, lazyLoadPackage } from '../common'; +import { type DeepPartial, LogLevels, type When, hasIntent, lazyLoadPackage } from '../common'; import { EventHandler } from '../events'; import { type GatewayDispatchPayload, GatewayIntentBits, type GatewaySendPayload } from '../types'; import { Shard, type ShardManagerOptions, type WorkerData, properties } from '../websocket'; import type { + WorkerDisconnectedAllShardsResharding, + WorkerMessage, WorkerReady, + WorkerReadyResharding, WorkerReceivePayload, WorkerRequestConnect, + WorkerRequestConnectResharding, WorkerSendEval, WorkerSendEvalResponse, WorkerSendInfo, @@ -17,6 +21,7 @@ import type { WorkerShardInfo, WorkerShardsConnected, WorkerStart, + WorkerStartResharding, } from '../websocket/discord/worker'; import type { ManagerMessages } from '../websocket/discord/workermanager'; import type { BaseClientOptions, ServicesOptions, StartOptions } from './base'; @@ -41,6 +46,7 @@ try { workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true', totalShards: Number(process.env.SEYFERT_WORKER_TOTALSHARDS), mode: process.env.SEYFERT_WORKER_MODE, + resharding: process.env.SEYFERT_WORKER_RESHARDING === 'true', } as WorkerData; } catch { // @@ -48,6 +54,7 @@ try { export class WorkerClient extends BaseClient { private __handleGuilds?: Set = new Set(); + private __handleGuildsResharding?: Set; memberUpdateHandler = new MemberUpdateHandler(); presenceUpdateHandler = new PresenceUpdateHandler(); @@ -57,6 +64,8 @@ export class WorkerClient extends BaseClient { promises = new Map void; timeout: NodeJS.Timeout }>(); shards = new Map(); + resharding = new Map(); + private _ready?: boolean; private __setServicesCache?: boolean; declare options: WorkerClientOptions; @@ -146,9 +155,12 @@ export class WorkerClient extends BaseClient { }); } this.postMessage({ - type: 'WORKER_START', + type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START', workerId: workerData.workerId, - } satisfies WorkerStart); + } satisfies WorkerStart | WorkerStartResharding); + if (workerData.resharding) { + this.__handleGuildsResharding = new Set(); + } await super.start(options); await this.loadEvents(options.eventsDir); this.cache.intents = workerData.intents; @@ -162,7 +174,7 @@ export class WorkerClient extends BaseClient { } } - postMessage(body: unknown): unknown { + postMessage(body: WorkerMessage): unknown { if (manager) return manager.postMessage(body); return process.send!(body); } @@ -196,6 +208,17 @@ export class WorkerClient extends BaseClient { } satisfies WorkerSendResultPayload); } break; + case 'ALLOW_CONNECT_RESHARDING': + { + const shard = this.resharding.get(data.shardId); + if (!shard) { + this.logger.fatal('Worker trying reshard non-existent shard'); + return; + } + shard.options.presence = data.presence; + await shard.connect(); + } + break; case 'ALLOW_CONNECT': { const shard = this.shards.get(data.shardId); @@ -207,40 +230,100 @@ export class WorkerClient extends BaseClient { await shard.connect(); } break; + case 'SPAWN_SHARDS_RESHARDING': + { + let shardsConnected = 0; + const self = this; + for (const id of workerData.shards) { + const existsShard = this.resharding.has(id); + if (existsShard) { + this.logger.warn(`Trying to re-spawn existing shard #${id}`); + continue; + } + + const shard = new Shard(id, { + token: workerData.token, + intents: workerData.intents, + info: data.info, + compress: data.compress, + debugger: this.debugger, + properties: { + ...properties, + ...this.options.gateway?.properties, + }, + handlePayload(_, payload) { + if (payload.t === 'GUILD_CREATE' || payload.t === 'GUILD_DELETE') { + self.__handleGuildsResharding!.delete(payload.d.id); + if (!self.__handleGuildsResharding?.size && shardsConnected === workerData.shards.length) { + delete self.__handleGuildsResharding; + self.postMessage({ + type: 'WORKER_READY_RESHARDING', + workerId: workerData.workerId, + } satisfies WorkerReadyResharding); + } + } + + if (payload.t !== 'READY') return; + shardsConnected++; + for (const guild of payload.d.guilds) { + self.__handleGuildsResharding!.add(guild.id); + } + if ( + shardsConnected === workerData.shards.length && + !hasIntent(workerData.intents, GatewayIntentBits.Guilds) + ) { + delete self.__handleGuildsResharding; + self.postMessage({ + type: 'WORKER_READY_RESHARDING', + workerId: workerData.workerId, + } satisfies WorkerReadyResharding); + } + }, + }); + this.resharding.set(id, shard); + this.postMessage({ + type: 'CONNECT_QUEUE_RESHARDING', + shardId: id, + workerId: workerData.workerId, + } satisfies WorkerRequestConnectResharding); + } + } + break; case 'SPAWN_SHARDS': { const onPacket = this.onPacket.bind(this); const handlePayload = this.options?.handlePayload?.bind(this); const self = this; - const { sendPayloadToParent } = this.options; for (const id of workerData.shards) { - let shard = this.shards.get(id); - if (!shard) { - shard = new Shard(id, { - token: workerData.token, - intents: workerData.intents, - info: data.info, - compress: data.compress, - debugger: this.debugger, - properties: { - ...properties, - ...this.options.gateway?.properties, - }, - async handlePayload(shardId, payload) { - await handlePayload?.(shardId, payload); - await onPacket(payload, shardId); - if (sendPayloadToParent) - self.postMessage({ - workerId: workerData.workerId, - shardId, - type: 'RECEIVE_PAYLOAD', - payload, - } satisfies WorkerReceivePayload); - }, - }); - this.shards.set(id, shard); + const existsShard = this.shards.has(id); + if (existsShard) { + this.logger.warn(`Trying to spawn existing shard #${id}`); + continue; } + const shard = new Shard(id, { + token: workerData.token, + intents: workerData.intents, + info: data.info, + compress: data.compress, + debugger: this.debugger, + properties: { + ...properties, + ...this.options.gateway?.properties, + }, + async handlePayload(shardId, payload) { + await handlePayload?.(shardId, payload); + await onPacket(payload, shardId); + if (self.options.sendPayloadToParent) + self.postMessage({ + workerId: workerData.workerId, + shardId, + type: 'RECEIVE_PAYLOAD', + payload, + } satisfies WorkerReceivePayload); + }, + }); + this.shards.set(id, shard); this.postMessage({ type: 'CONNECT_QUEUE', shardId: id, @@ -314,14 +397,47 @@ export class WorkerClient extends BaseClient { evalResponse.resolve(data.response); } break; + case 'WORKER_ALREADY_EXISTS_RESHARDING': + { + this.__handleGuildsResharding = new Set(); + this.postMessage({ + type: 'WORKER_START_RESHARDING', + workerId: workerData.workerId, + } satisfies WorkerStartResharding); + } + break; + case 'DISCONNECT_ALL_SHARDS_RESHARDING': + { + for (const i of this.shards.values()) { + await i.disconnect(); + } + this.postMessage({ + type: 'DISCONNECTED_ALL_SHARDS_RESHARDING', + workerId: workerData.workerId, + } satisfies WorkerDisconnectedAllShardsResharding); + } + break; + case 'CONNECT_ALL_SHARDS_RESHARDING': + { + this.shards.clear(); + const handlePayload = this.options?.handlePayload?.bind(this); + for (const [id, shard] of this.resharding) { + this.shards.set(id, shard); + shard.options.handlePayload = async (shardId, packet) => { + await handlePayload?.(shardId, packet); + return this.onPacket(packet, shardId); + }; + } + this.resharding.clear(); + } + break; } } - private generateNonce(large = true): string { + private generateNonce(): UUID { const uuid = randomUUID(); - const nonce = large ? uuid : uuid.split('-')[0]; - if (this.promises.has(nonce)) return this.generateNonce(large); - return nonce; + if (this.promises.has(uuid)) return this.generateNonce(); + return uuid; } private generateSendPromise(nonce: string, message = 'Timeout'): Promise { @@ -412,19 +528,15 @@ export class WorkerClient extends BaseClient { this.applicationId = packet.d.application.id; this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never; await this.events?.execute(packet.t as never, packet, this, shardId); - if ([...this.shards.values()].every(shard => shard.data.session_id)) { + if (!this._ready && [...this.shards.values()].every(shard => shard.data.session_id)) { + this._ready = true; this.postMessage({ type: 'WORKER_SHARDS_CONNECTED', workerId: this.workerId, } as WorkerShardsConnected); await this.events?.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1); } - if ( - !( - this.__handleGuilds?.size && - (workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds - ) - ) { + if (!hasIntent(workerData.intents, GatewayIntentBits.Guilds)) { if ([...this.shards.values()].every(shard => shard.data.session_id)) { this.postMessage({ type: 'WORKER_READY', diff --git a/src/common/it/utils.ts b/src/common/it/utils.ts index 4c5aa94..6ed9a8d 100644 --- a/src/common/it/utils.ts +++ b/src/common/it/utils.ts @@ -11,7 +11,7 @@ import { type TypeArray, } from '..'; import type { Cache } from '../../cache'; -import { type APIPartialEmoji, FormattingPatterns } from '../../types'; +import { type APIPartialEmoji, FormattingPatterns, GatewayIntentBits } from '../../types'; /** * Calculates the shard ID for a guild based on its ID. @@ -368,3 +368,8 @@ export function hasProps>(target: T, props: TypeArray } return true; } + +export function hasIntent(intents: number, target: keyof typeof GatewayIntentBits | GatewayIntentBits) { + const intent = typeof target === 'string' ? GatewayIntentBits[target] : target; + return (intents & intent) === intent; +} diff --git a/src/websocket/constants/index.ts b/src/websocket/constants/index.ts index 8a6a13d..3d392c1 100644 --- a/src/websocket/constants/index.ts +++ b/src/websocket/constants/index.ts @@ -20,12 +20,6 @@ const ShardManagerDefaults: DeepPartial = { resharding: { interval: 8 * 60 * 60 * 1e3, // 8h percentage: 80, - reloadGuilds() { - throw new Error('Unexpected to run '); - }, - onGuild() { - throw new Error('Unexpected to run '); - }, }, }; diff --git a/src/websocket/discord/basesocket.ts b/src/websocket/discord/basesocket.ts index 6a2cfd4..2a0a71c 100644 --- a/src/websocket/discord/basesocket.ts +++ b/src/websocket/discord/basesocket.ts @@ -4,7 +4,7 @@ import { SeyfertWebSocket } from './socket/custom'; export class BaseSocket { private internal: SeyfertWebSocket | WebSocket; - ping?: () => Promise; + ping: () => Promise; constructor(kind: 'ws' | 'bun', url: string) { this.internal = kind === 'ws' ? new SeyfertWebSocket(url) : new WebSocket(url); diff --git a/src/websocket/discord/shard.ts b/src/websocket/discord/shard.ts index 22e2b09..db37c65 100644 --- a/src/websocket/discord/shard.ts +++ b/src/websocket/discord/shard.ts @@ -91,7 +91,6 @@ export class Shard { ping() { if (!this.websocket) return Promise.resolve(Number.POSITIVE_INFINITY); - //@ts-expect-error return this.websocket.ping(); } @@ -313,7 +312,6 @@ export class Shard { await this.reconnect(); } break; - case GatewayCloseCodes.AuthenticationFailed: case GatewayCloseCodes.DisallowedIntents: case GatewayCloseCodes.InvalidAPIVersion: @@ -322,7 +320,6 @@ export class Shard { case GatewayCloseCodes.ShardingRequired: this.logger.fatal('Cannot reconnect'); break; - default: { this.logger.warn('Unknown close code, trying to reconnect anyways'); diff --git a/src/websocket/discord/sharder.ts b/src/websocket/discord/sharder.ts index 42a2b48..050f3a1 100644 --- a/src/websocket/discord/sharder.ts +++ b/src/websocket/discord/sharder.ts @@ -7,7 +7,7 @@ import { calculateShardId, lazyLoadPackage, } from '../../common'; -import type { MakeDeepPartial } from '../../common/types/util'; +import type { DeepPartial, MakeDeepPartial } from '../../common/types/util'; import { type GatewayDispatchPayload, GatewayOpcodes, @@ -19,7 +19,7 @@ import { ShardManagerDefaults } from '../constants'; import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; import { Shard } from './shard'; -import type { ShardManagerOptions, WorkerData } from './shared'; +import type { ShardData, ShardManagerOptions, WorkerData } from './shared'; let parentPort: import('node:worker_threads').MessagePort; let workerData: WorkerData; @@ -87,8 +87,8 @@ export class ShardManager extends Map { return calculateShardId(guildId, this.totalShards); } - spawn(shardId: number) { - this.debugger?.info(`Spawn shard ${shardId}`); + create(shardId: number) { + this.debugger?.info(`Creating shard ${shardId}`); let shard = this.get(shardId); shard ??= new Shard(shardId, { @@ -110,7 +110,7 @@ export class ShardManager extends Map { async spawnShards(): Promise { const buckets = this.spawnBuckets(); - this.debugger?.info('Spawn shards'); + this.debugger?.info('Spawning shards'); for (const bucket of buckets) { for (const shard of bucket) { if (!shard) { @@ -143,31 +143,40 @@ export class ShardManager extends Map { this.debugger?.info('Starting resharding process'); this.connectQueue.concurrency = info.session_start_limit.max_concurrency; - this.options.totalShards = info.shards; this.options.info.session_start_limit.max_concurrency = info.session_start_limit.max_concurrency; + //waiting for all shards to connect let shardsConnected = 0; + const handleGuilds = new Set(); + let handlePayload = async (sharder: ShardManager, _: number, packet: GatewayDispatchPayload) => { - if ( - (packet.t === 'GUILD_CREATE' || packet.t === 'GUILD_DELETE') && - this.options.resharding.onGuild(packet.d.id) - ) { - return; + if (packet.t === 'GUILD_CREATE' || packet.t === 'GUILD_DELETE') { + handleGuilds.delete(packet.d.id); + if (shardsConnected === info.shards && !handleGuilds.size) { + return cleanProcess(sharder); + } } if (packet.t !== 'READY') return; - this.options.resharding.reloadGuilds(packet.d.guilds.map(x => x.id)); + for (const guild of packet.d.guilds) { + handleGuilds.add(guild.id); + } - if (++shardsConnected < info.shards) return; //waiting for last shard to connect + if (++shardsConnected < info.shards || handleGuilds.size) return; + cleanProcess(sharder); // dont listen more events when all shards are ready + }; + + const cleanProcess = (sharder: ShardManager) => { handlePayload = async () => { // }; - await this.disconnectAll(); + this.disconnectAll(); this.clear(); + this.options.totalShards = this.options.shardEnd = info.shards; for (const [id, shard] of sharder) { shard.options.handlePayload = (shardId, packet) => { return this.options.handlePayload(shardId, packet); @@ -178,19 +187,18 @@ export class ShardManager extends Map { sharder.clear(); }; + const options = MergeOptions(this.options, { + totalShards: info.shards, + shardEnd: info.shards, + } satisfies DeepPartial); + const resharder = new ShardManager({ - ...this.options, + ...options, resharding: { // getInfo mock, we don't need it getInfo: () => ({}) as any, interval: 0, percentage: 0, - reloadGuilds() { - // - }, - onGuild() { - return true; - }, }, handlePayload: (shardId, packet): unknown => { return handlePayload(resharder, shardId, packet); @@ -214,7 +222,7 @@ export class ShardManager extends Map { chunks.forEach((arr: any[], index: number) => { for (let i = 0; i < arr.length; i++) { const id = i + (index > 0 ? index * this.concurrency : 0) + this.shardStart; - chunks[index][i] = this.spawn(id); + chunks[index][i] = this.create(id); } }); this.debugger?.info(`${chunks.length} buckets created`); @@ -223,7 +231,7 @@ export class ShardManager extends Map { forceIdentify(shardId: number) { this.debugger?.info(`Shard #${shardId} force identify`); - return this.spawn(shardId).identify(); + return this.create(shardId).identify(); } disconnect(shardId: number) { @@ -233,10 +241,7 @@ export class ShardManager extends Map { disconnectAll() { this.debugger?.info('Disconnect all shards'); - return new Promise(resolve => { - this.forEach(shard => shard.disconnect()); - resolve(null); - }); + this.forEach(shard => shard.disconnect()); } setShardPresence(shardId: number, payload: GatewayUpdatePresence['d']) { @@ -247,13 +252,10 @@ export class ShardManager extends Map { }); } - setPresence(payload: GatewayUpdatePresence['d']): Promise { - return new Promise(resolve => { - this.forEach(shard => { - this.setShardPresence(shard.id, payload); - }, this); - resolve(); - }); + setPresence(payload: GatewayUpdatePresence['d']) { + this.forEach(shard => { + this.setShardPresence(shard.id, payload); + }, this); } joinVoice( @@ -276,7 +278,6 @@ export class ShardManager extends Map { leaveVoice(guild_id: string) { const shardId = this.calculateShardId(guild_id); - this.debugger?.info(`Shard #${shardId} leave voice in ${guild_id}`); return this.send(shardId, { op: GatewayOpcodes.VoiceStateUpdate, @@ -299,4 +300,11 @@ export class ShardManager extends Map { } this.get(shardId)?.send(false, payload); } + + resume(shardId: number, shardData: MakeRequired) { + if (this.has(shardId)) throw new Error('Cannot override existing shard'); + const shard = this.create(shardId); + shard.data = shardData; + return this.connectQueue.push(shard.connect.bind(shard)); + } } diff --git a/src/websocket/discord/shared.ts b/src/websocket/discord/shared.ts index 685812c..74b906a 100644 --- a/src/websocket/discord/shared.ts +++ b/src/websocket/discord/shared.ts @@ -43,18 +43,6 @@ export interface ShardManagerOptions extends ShardDetails { getInfo(): Promise; interval: number; percentage: number; - /** - * - * @param ids - * @returns - */ - reloadGuilds: (ids: string[]) => unknown; - /** - * - * @param id - * @returns true if deleted - */ - onGuild: (id: string) => boolean; }; } @@ -152,4 +140,5 @@ export interface WorkerData { debug: boolean; workerProxy: boolean; __USING_WATCHER__?: boolean; + resharding: boolean; } diff --git a/src/websocket/discord/socket/custom.ts b/src/websocket/discord/socket/custom.ts index 24f9e8d..137ae26 100644 --- a/src/websocket/discord/socket/custom.ts +++ b/src/websocket/discord/socket/custom.ts @@ -1,4 +1,4 @@ -import { createHash, randomBytes, randomUUID } from 'node:crypto'; +import { type UUID, createHash, randomBytes, randomUUID } from 'node:crypto'; import { request } from 'node:https'; import type { Socket } from 'node:net'; @@ -274,7 +274,7 @@ export class SeyfertWebSocket { }); } - #randomUUID(): string { + #randomUUID(): UUID { const id = randomUUID(); if (this.__promises.has(id)) return this.#randomUUID(); return id; diff --git a/src/websocket/discord/worker.ts b/src/websocket/discord/worker.ts index b798c6b..dda8da2 100644 --- a/src/websocket/discord/worker.ts +++ b/src/websocket/discord/worker.ts @@ -16,6 +16,7 @@ type CreateWorkerMessage = { } & D; export type WorkerRequestConnect = CreateWorkerMessage<'CONNECT_QUEUE', { shardId: number }>; +export type WorkerRequestConnectResharding = CreateWorkerMessage<'CONNECT_QUEUE_RESHARDING', { shardId: number }>; export type WorkerReceivePayload = CreateWorkerMessage< 'RECEIVE_PAYLOAD', { shardId: number; payload: GatewayDispatchPayload } @@ -51,8 +52,11 @@ export type WorkerSendCacheRequest = CreateWorkerMessage< export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>; export type WorkerSendInfo = CreateWorkerMessage<'WORKER_INFO', WorkerInfo & { nonce: string }>; export type WorkerReady = CreateWorkerMessage<'WORKER_READY'>; +export type WorkerReadyResharding = CreateWorkerMessage<'WORKER_READY_RESHARDING'>; export type WorkerShardsConnected = CreateWorkerMessage<'WORKER_SHARDS_CONNECTED'>; export type WorkerStart = CreateWorkerMessage<'WORKER_START'>; +export type WorkerStartResharding = CreateWorkerMessage<'WORKER_START_RESHARDING'>; +export type WorkerDisconnectedAllShardsResharding = CreateWorkerMessage<'DISCONNECTED_ALL_SHARDS_RESHARDING'>; export type WorkerSendApiRequest = CreateWorkerMessage< 'WORKER_API_REQUEST', { @@ -92,4 +96,8 @@ export type WorkerMessage = | WorkerSendApiRequest | WorkerSendEvalResponse | WorkerSendEval - | WorkerStart; + | WorkerStart + | WorkerStartResharding + | WorkerRequestConnectResharding + | WorkerReadyResharding + | WorkerDisconnectedAllShardsResharding; diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index eaacbc6..80d8cd4 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -1,10 +1,10 @@ import cluster, { type Worker as ClusterWorker } from 'node:cluster'; -import { randomUUID } from 'node:crypto'; +import { type UUID, randomUUID } from 'node:crypto'; import { ApiHandler, Logger } from '../..'; import { type Adapter, MemoryAdapter } from '../../cache'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; import { type MakePartial, MergeOptions, lazyLoadPackage } from '../../common'; -import type { GatewayPresenceUpdateData, GatewaySendPayload } from '../../types'; +import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBotResult } from '../../types'; import { WorkerManagerDefaults, properties } from '../constants'; import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; @@ -13,8 +13,38 @@ import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; export class WorkerManager extends Map< number, - (ClusterWorker | import('node:worker_threads').Worker | { ready: boolean }) & { ready?: boolean } + (ClusterWorker | import('node:worker_threads').Worker | { ready?: boolean }) & { + ready?: boolean; + disconnected?: boolean; + resharded?: boolean; + } > { + static prepareSpaces( + options: { + shardStart: number; + shardEnd: number; + shardsPerWorker: number; + }, + logger?: Logger, + ) { + logger?.info('Preparing buckets', options); + + const chunks = DynamicBucket.chunk( + new Array(options.shardEnd - options.shardStart), + options.shardsPerWorker, + ); + + chunks.forEach((shards, index) => { + for (let i = 0; i < shards.length; i++) { + const id = i + (index > 0 ? index * options.shardsPerWorker : 0) + options.shardStart; + chunks[index][i] = id; + } + }); + + logger?.info(`${chunks.length} buckets created`); + return chunks; + } + options: MakePartial, 'adapter'>; debugger?: Logger; connectQueue!: ConnectQueue; @@ -22,7 +52,14 @@ export class WorkerManager extends Map< cacheAdapter: Adapter; promises = new Map void; timeout: NodeJS.Timeout }>(); rest!: ApiHandler; - constructor(options: MakePartial) { + reshardingWorkerQueue: (() => void)[] = []; + private _info?: RESTGetAPIGatewayBotResult; + + constructor( + options: Omit, 'resharding'> & { + resharding?: MakePartial, 'getInfo'>; + }, + ) { super(); this.options = options as WorkerManager['options']; this.cacheAdapter = new MemoryAdapter(); @@ -68,9 +105,12 @@ export class WorkerManager extends Map< return this.options.workers; } - async syncLatency({ shardId, workerId }: { shardId?: number; workerId?: number }) { + async syncLatency({ + shardId, + workerId, + }: { shardId: number; workerId?: number } | { shardId?: number; workerId: number }) { if (typeof shardId !== 'number' && typeof workerId !== 'number') { - return; + throw new Error('Undefined workerId and shardId'); } const id = workerId ?? this.calculateWorkerId(shardId!); @@ -96,28 +136,9 @@ export class WorkerManager extends Map< return workerId; } - prepareSpaces() { - this.debugger?.info('Preparing buckets'); - - const chunks = DynamicBucket.chunk( - new Array(this.shardEnd - this.shardStart), - this.options.shardsPerWorker, - ); - - chunks.forEach((shards, index) => { - for (let i = 0; i < shards.length; i++) { - const id = i + (index > 0 ? index * this.options.shardsPerWorker : 0) + this.shardStart; - chunks[index][i] = id; - } - }); - - this.debugger?.info(`${chunks.length} buckets created`); - return chunks; - } - - postMessage(id: number, body: any) { + postMessage(id: number, body: ManagerMessages) { const worker = this.get(id); - if (!worker) return this.debugger?.error(`Worker ${id} doesnt exists.`); + if (!worker) return this.debugger?.error(`Worker ${id} does not exists.`); switch (this.options.mode) { case 'clusters': (worker as ClusterWorker).send(body); @@ -131,14 +152,14 @@ export class WorkerManager extends Map< } } - async prepareWorkers(shards: number[][]) { + async prepareWorkers(shards: number[][], resharding = false) { const worker_threads = lazyLoadPackage('node:worker_threads'); if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.'); for (let i = 0; i < shards.length; i++) { const workerExists = this.has(i); - if (!workerExists) { - this.workerQueue.push(() => { + if (resharding || !workerExists) { + this[resharding ? 'reshardingWorkerQueue' : 'workerQueue'].push(() => { const worker = this.createWorker({ path: this.options.path, debug: this.options.debug, @@ -147,8 +168,9 @@ export class WorkerManager extends Map< intents: this.options.intents, workerId: i, workerProxy: this.options.workerProxy, - totalShards: this.totalShards, + totalShards: resharding ? this._info!.shards : this.totalShards, mode: this.options.mode, + resharding, }); this.set(i, worker); }); @@ -157,11 +179,21 @@ export class WorkerManager extends Map< } createWorker(workerData: WorkerData) { + if (this.has(workerData.workerId)) { + if (workerData.resharding) { + this.postMessage(workerData.workerId, { + type: 'WORKER_ALREADY_EXISTS_RESHARDING', + } satisfies ManagerWorkerAlreadyExistsResharding); + } + const worker = this.get(workerData.workerId)!; + return worker; + } const worker_threads = lazyLoadPackage('node:worker_threads'); if (!worker_threads) throw new Error('Cannot create worker without worker_threads.'); const env: Record = { SEYFERT_SPAWNING: 'true', }; + if (workerData.resharding) env.SEYFERT_WORKER_RESHARDING = 'true'; for (const i in workerData) { env[`SEYFERT_WORKER_${i.toUpperCase()}`] = workerData[i as keyof WorkerData]; } @@ -190,23 +222,80 @@ export class WorkerManager extends Map< } } - spawn(workerId: number, shardId: number) { + spawn(workerId: number, shardId: number, resharding = false) { this.connectQueue.push(() => { const worker = this.has(workerId); if (!worker) { - this.debugger?.fatal("Trying spawn with worker doesn't exist"); + this.debugger?.fatal(`Trying ${resharding ? 'reshard' : 'spawn'} with worker that doesn't exist`); return; } this.postMessage(workerId, { - type: 'ALLOW_CONNECT', + type: resharding ? 'ALLOW_CONNECT_RESHARDING' : 'ALLOW_CONNECT', shardId, presence: this.options.presence?.(shardId, workerId), - } satisfies ManagerAllowConnect); + } satisfies ManagerAllowConnect | ManagerAllowConnectResharding); }); } async handleWorkerMessage(message: WorkerMessage) { switch (message.type) { + case 'WORKER_READY_RESHARDING': + { + this.get(message.workerId)!.resharded = true; + if (!this.reshardingWorkerQueue.length && [...this.values()].every(w => w.resharded)) { + for (const [id] of this.entries()) { + this.postMessage(id, { + type: 'DISCONNECT_ALL_SHARDS_RESHARDING', + } satisfies DisconnectAllShardsResharding); + } + this.forEach(w => { + delete w.resharded; + }); + } else { + const nextWorker = this.reshardingWorkerQueue.shift(); + if (nextWorker) { + this.debugger?.info('Spawning next worker to reshard'); + nextWorker(); + } else { + this.debugger?.info('No more workers to reshard left'); + } + } + } + break; + case 'DISCONNECTED_ALL_SHARDS_RESHARDING': + { + this.get(message.workerId)!.disconnected = true; + if ([...this.values()].every(w => w.disconnected)) { + this.options.totalShards = this._info!.shards; + this.options.shardEnd = this.options.totalShards = this._info!.shards; + delete this._info; + for (const [id] of this.entries()) { + this.postMessage(id, { + type: 'CONNECT_ALL_SHARDS_RESHARDING', + } satisfies ConnnectAllShardsResharding); + } + this.forEach(w => { + delete w.disconnected; + }); + } + } + break; + case 'WORKER_START_RESHARDING': + { + this.postMessage(message.workerId, { + type: 'SPAWN_SHARDS_RESHARDING', + compress: this.options.compress ?? false, + info: { + ...this.options.info, + shards: this._info!.shards, + }, + properties: { + ...properties, + ...this.options.properties, + }, + } satisfies ManagerSpawnShardsResharding); + } + break; case 'WORKER_START': { this.postMessage(message.workerId, { @@ -223,6 +312,10 @@ export class WorkerManager extends Map< } satisfies ManagerSpawnShards); } break; + + case 'CONNECT_QUEUE_RESHARDING': + this.spawn(message.workerId, message.shardId, true); + break; case 'CONNECT_QUEUE': this.spawn(message.workerId, message.shardId); break; @@ -346,11 +439,10 @@ export class WorkerManager extends Map< } } - private generateNonce(large = true): string { + private generateNonce(): UUID { const uuid = randomUUID(); - const nonce = large ? uuid : uuid.split('-')[0]; - if (this.promises.has(nonce)) return this.generateNonce(large); - return nonce; + if (this.promises.has(uuid)) return this.generateNonce(); + return uuid; } private generateSendPromise(nonce: string, message = 'Timeout'): Promise { @@ -391,7 +483,7 @@ export class WorkerManager extends Map< throw new Error(`Worker #${workerId} doesnt exist`); } - const nonce = this.generateNonce(false); + const nonce = this.generateNonce(); this.postMessage(workerId, { shardId, nonce, type: 'SHARD_INFO' } satisfies ManagerRequestShardInfo); @@ -428,6 +520,7 @@ export class WorkerManager extends Map< this.options.shardEnd ??= this.options.totalShards ?? this.options.info.shards; this.options.totalShards ??= this.options.shardEnd; this.options = MergeOptions>(WorkerManagerDefaults, this.options); + this.options.resharding.getInfo ??= () => this.rest.proxy.gateway.bot.get(); this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker); this.connectQueue = new ConnectQueue(5.5e3, this.concurrency); @@ -444,10 +537,52 @@ export class WorkerManager extends Map< ); } - const spaces = this.prepareSpaces(); + const spaces = WorkerManager.prepareSpaces( + { + shardStart: this.shardStart, + shardEnd: this.shardEnd, + shardsPerWorker: this.shardsPerWorker, + }, + this.debugger, + ); await this.prepareWorkers(spaces); // Start workers queue - return this.workerQueue.shift()?.(); + this.workerQueue.shift()!(); + await this.startResharding(); + } + + async startResharding() { + if (this.options.resharding.interval <= 0) return; + if (this.shardStart !== 0 || this.shardEnd !== this.totalShards) + return this.debugger?.debug('Cannot start resharder'); + setInterval(async () => { + this.debugger?.debug('Checking if reshard is needed'); + const info = await this.options.resharding.getInfo(); + if (info.shards <= this.totalShards) return this.debugger?.debug('Resharding not needed'); + //https://github.com/discordeno/discordeno/blob/6a5f446c0651b9fad9f1550ff1857fe7a026426b/packages/gateway/src/manager.ts#L106C8-L106C94 + const percentage = (info.shards / ((this.totalShards * 2500) / 1000)) * 100; + if (percentage < this.options.resharding.percentage) + return this.debugger?.debug( + `Percentage is not enough to reshard ${percentage}/${this.options.resharding.percentage}`, + ); + + this.debugger?.info('Starting resharding process'); + + this._info = info; + this.connectQueue.concurrency = info.session_start_limit.max_concurrency; + this.options.info.session_start_limit.max_concurrency = info.session_start_limit.max_concurrency; + + const spaces = WorkerManager.prepareSpaces( + { + shardsPerWorker: this.shardsPerWorker, + shardEnd: info.shards, + shardStart: 0, + }, + this.debugger, + ); + await this.prepareWorkers(spaces, true); + return this.reshardingWorkerQueue.shift()!(); + }, this.options.resharding.interval); } } @@ -457,10 +592,21 @@ export type ManagerAllowConnect = CreateManagerMessage< 'ALLOW_CONNECT', { shardId: number; presence: GatewayPresenceUpdateData } >; +export type ManagerAllowConnectResharding = CreateManagerMessage< + 'ALLOW_CONNECT_RESHARDING', + { shardId: number; presence: GatewayPresenceUpdateData } +>; +export type ManagerWorkerAlreadyExistsResharding = CreateManagerMessage<'WORKER_ALREADY_EXISTS_RESHARDING'>; export type ManagerSpawnShards = CreateManagerMessage< 'SPAWN_SHARDS', Pick >; +export type ManagerSpawnShardsResharding = CreateManagerMessage< + 'SPAWN_SHARDS_RESHARDING', + Pick +>; +export type DisconnectAllShardsResharding = CreateManagerMessage<'DISCONNECT_ALL_SHARDS_RESHARDING'>; +export type ConnnectAllShardsResharding = CreateManagerMessage<'CONNECT_ALL_SHARDS_RESHARDING'>; export type ManagerSendPayload = CreateManagerMessage< 'SEND_PAYLOAD', GatewaySendPayload & { shardId: number; nonce: string } @@ -503,4 +649,9 @@ export type ManagerMessages = | ManagerSendBotReady | ManagerSendApiResponse | ManagerSendEvalResponse - | ManagerExecuteEval; + | ManagerExecuteEval + | ManagerWorkerAlreadyExistsResharding + | ManagerSpawnShardsResharding + | ManagerAllowConnectResharding + | DisconnectAllShardsResharding + | ConnnectAllShardsResharding; diff --git a/src/websocket/structures/timeout.ts b/src/websocket/structures/timeout.ts index 0de9296..a7ce26d 100644 --- a/src/websocket/structures/timeout.ts +++ b/src/websocket/structures/timeout.ts @@ -25,43 +25,42 @@ export class ConnectTimeout { } export class ConnectQueue { - readonly queue: { cb: (() => any) | undefined }[] = []; + private queue: ((() => unknown) | undefined)[] = []; + private remaining = 0; protected interval?: NodeJS.Timeout = undefined; constructor( public intervalTime = 5000, public concurrency = 1, - ) {} - - async push(callback: () => any) { - this.queue.push({ cb: callback }); - if (this.queue.length === this.concurrency) { - for (let i = 0; i < this.concurrency; i++) { - await this.queue[i].cb?.(); - this.queue[i].cb = undefined; - } - this.interval = setInterval(() => { - for (let i = 0; i < this.concurrency; i++) { - this.shift(); - } - }, this.intervalTime); - } + ) { + this.remaining = concurrency; } - async shift(): Promise { - const shift = this.queue.shift(); - if (!shift) { + push(callback: () => unknown) { + if (this.remaining === 0) return this.queue.push(callback); + this.remaining--; + if (!this.interval) { + this.startInterval(); + } + + if (this.queue.length < this.concurrency) { + return callback(); + } + return this.queue.push(callback); + } + + startInterval() { + this.interval = setInterval(() => { + let cb: (() => void) | undefined; + while (this.queue.length && !(cb = this.queue.shift())) { + // + } + if (cb) return cb?.(); + if (this.remaining < this.concurrency) return this.remaining++; if (!this.queue.length) { clearInterval(this.interval); this.interval = undefined; } - return; - } - if (!shift.cb) return this.shift(); - await shift.cb?.(); - if (!this.queue.length) { - clearInterval(this.interval); - this.interval = undefined; - } + }, this.intervalTime / this.concurrency); } }