diff --git a/.gitignore b/.gitignore index aadbea9..704df77 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ packages/core/docs.json node_modules .pnp .pnp.js +.vscode # testing coverage diff --git a/packages/api-types/CHANGELOG.md b/packages/api-types/CHANGELOG.md index c73cdde..1d8ff6d 100644 --- a/packages/api-types/CHANGELOG.md +++ b/packages/api-types/CHANGELOG.md @@ -1,6 +1,6 @@ # @biscuitland/api-types -## 2.0.4 +## 2.0.5 ### Major Changes diff --git a/packages/api-types/README.md b/packages/api-types/README.md index 220c94e..3a32a09 100644 --- a/packages/api-types/README.md +++ b/packages/api-types/README.md @@ -25,7 +25,7 @@ import type { DiscordUser } from '@biscuitland/api-types'; ## Example for [Deno](https://deno.land/) ```ts -import type { DiscordUser } from "https://unpkg.com/@biscuitland/api-types@2.0.4/dist/index.d.ts"; +import type { DiscordUser } from "https://unpkg.com/@biscuitland/api-types@2.0.5/dist/index.d.ts"; ``` We deliver this package through [unpkg](https://unpkg.com/) and it does contain constants and routes too diff --git a/packages/api-types/package.json b/packages/api-types/package.json index dac4930..6367740 100644 --- a/packages/api-types/package.json +++ b/packages/api-types/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/api-types", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/api-types/src/utils/constants.ts b/packages/api-types/src/utils/constants.ts index 4a85383..07ecee1 100644 --- a/packages/api-types/src/utils/constants.ts +++ b/packages/api-types/src/utils/constants.ts @@ -5,7 +5,7 @@ export const BASE_URL = 'https://discord.com/api'; export const API_VERSION = 10; /** https://github.com/oasisjs/biscuit/releases */ -export const BISCUIT_VERSION = '2.0.4'; +export const BISCUIT_VERSION = '2.0.5'; /** https://discord.com/developers/docs/reference#user-agent */ export const USER_AGENT = `DiscordBot (https://github.com/oasisjs/biscuit, v${BISCUIT_VERSION})`; diff --git a/packages/cache/CHANGELOG.md b/packages/cache/CHANGELOG.md index 1a214c6..e2ef1d7 100644 --- a/packages/cache/CHANGELOG.md +++ b/packages/cache/CHANGELOG.md @@ -1,6 +1,6 @@ # @biscuitland/cache -## 2.0.4 +## 2.0.5 ### Major Changes @@ -9,4 +9,4 @@ ### Patch Changes - Updated dependencies - - @biscuitland/api-types@2.0.4 + - @biscuitland/api-types@2.0.5 diff --git a/packages/cache/package.json b/packages/cache/package.json index f9f62ca..8871044 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/cache", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", @@ -23,7 +23,7 @@ } }, "dependencies": { - "@biscuitland/api-types": "^2.0.4", + "@biscuitland/api-types": "^2.0.5", "ioredis": "^5.2.2" }, "devDependencies": { diff --git a/packages/core/CHANGELOG.md b/packages/core/CHANGELOG.md index 60af055..be50f81 100644 --- a/packages/core/CHANGELOG.md +++ b/packages/core/CHANGELOG.md @@ -1,6 +1,6 @@ # @biscuitland/core -## 2.0.4 +## 2.0.5 ### Major Changes @@ -9,6 +9,6 @@ ### Patch Changes - Updated dependencies - - @biscuitland/api-types@2.0.4 - - @biscuitland/rest@2.0.4 - - @biscuitland/ws@2.0.4 + - @biscuitland/api-types@2.0.5 + - @biscuitland/rest@2.0.5 + - @biscuitland/ws@2.0.5 diff --git a/packages/core/package.json b/packages/core/package.json index f667f98..c3d2eab 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/core", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", @@ -23,9 +23,9 @@ } }, "dependencies": { - "@biscuitland/api-types": "^2.0.4", - "@biscuitland/rest": "^2.0.4", - "@biscuitland/ws": "^2.0.4" + "@biscuitland/api-types": "^2.0.5", + "@biscuitland/rest": "^2.0.5", + "@biscuitland/ws": "^2.0.5" }, "devDependencies": { "tsup": "^6.1.3" diff --git a/packages/core/src/biscuit.ts b/packages/core/src/biscuit.ts index cdf9d22..aae5e5e 100644 --- a/packages/core/src/biscuit.ts +++ b/packages/core/src/biscuit.ts @@ -10,13 +10,11 @@ import type { Localization, Snowflake, DiscordGetGatewayBot, + DiscordGatewayPayload } from '@biscuitland/api-types'; -import { ApplicationCommandTypes, GatewayOpcodes } from '@biscuitland/api-types'; +import { ApplicationCommandTypes, GatewayOpcodes , -// routes - -import { APPLICATION_COMMANDS, GUILD_APPLICATION_COMMANDS, GUILD_APPLICATION_COMMANDS_PERMISSIONS, @@ -24,6 +22,8 @@ import { USER } from '@biscuitland/api-types'; +// routes + import type { PermissionResolvable } from './structures/special/permissions'; import type { Activities, StatusTypes } from './structures/presence'; @@ -36,14 +36,13 @@ import { User } from './structures/user'; import type { RestAdapter } from '@biscuitland/rest'; import { DefaultRestAdapter } from '@biscuitland/rest'; -import type { WsAdapter } from '@biscuitland/ws'; -import { DefaultWsAdapter } from '@biscuitland/ws'; +import type { Shard } from '@biscuitland/ws'; +import { ShardManager } from '@biscuitland/ws'; import type { EventAdapter } from './adapters/event-adapter'; import { DefaultEventAdapter } from './adapters/default-event-adapter'; import { Util } from './utils/util'; -import { Shard } from '@biscuitland/ws'; // PRESENCE @@ -146,7 +145,7 @@ export interface BiscuitOptions { }; ws: { - adapter?: { new (...args: any[]): WsAdapter }; + adapter?: { new (...args: any[]): ShardManager }; options: any; }; } @@ -180,9 +179,9 @@ export class Session { options: null, }, ws: { - adapter: DefaultWsAdapter, + adapter: ShardManager, options: null, - }, + } }; options: BiscuitOptions; @@ -190,7 +189,7 @@ export class Session { readonly events: EventAdapter; readonly rest: RestAdapter; - readonly ws: WsAdapter; + readonly ws: ShardManager; private adapters = new Map(); @@ -210,20 +209,17 @@ export class Session { // makeWs - const defHandler: DiscordRawEventHandler = (shard, event) => { - let data = event as any; - // let data = JSON.parse(message) as DiscordGatewayPayload; + const defHandler = (shard: Shard, payload: DiscordGatewayPayload) => { + Actions.raw(this, shard.options.id, payload); - Actions.raw(this, shard.id, data); - - if (!data.t || !data.d) { + if (!payload.t || !payload.d) { return; } - Actions[data.t as keyof typeof Actions]?.( + Actions[payload.t as keyof typeof Actions]?.( this, - shard.id, - data.d as any + shard.options.id, + payload.d as any ); }; @@ -231,21 +227,28 @@ export class Session { this.options.ws.options = { handleDiscordPayload: defHandler, - gatewayConfig: { - token: this.options.token, - intents: this.options.intents, - }, + gateway: { + url: '', + shards: '', - intents: this.options.intents, - token: this.options.token, + session_start_limit: { + total: 1000, + remaining: 1000, + reset_after: 3600000, + + max_concurrency: 1 + } + }, + config: { + token: this.options.token, + intents: this.options.intents, + }, }; } // makeEvents - this.events = this.options.events?.adapter - ? new this.options.events.adapter() - : new DefaultEventAdapter(); + this.events = this.options.events?.adapter ? new this.options.events.adapter() : new DefaultEventAdapter(); this.ws = this.getWs(); this.token = options.token; @@ -282,7 +285,7 @@ export class Session { * @inheritDoc */ - private getWs(): WsAdapter { + private getWs(): ShardManager { return this.getAdapter( this.options.ws.adapter!, this.options.ws.options @@ -294,23 +297,10 @@ export class Session { */ async start(): Promise { - const nonParsed = await this.rest.get('/gateway/bot'); + const gateway = await this.rest.get('/gateway/bot'); + this.ws.options.gateway = gateway; - this.ws.options.gatewayBot = { - url: nonParsed.url, - shards: nonParsed.shards, - sessionStartLimit: { - total: nonParsed.session_start_limit.total, - remaining: nonParsed.session_start_limit.remaining, - resetAfter: nonParsed.session_start_limit.reset_after, - maxConcurrency: nonParsed.session_start_limit.max_concurrency, - }, - }; - - this.ws.options.lastShardId = this.ws.options.gatewayBot.shards - 1; - this.ws.agent.options.totalShards = this.ws.options.gatewayBot.shards; - - this.ws.shards(); + this.ws.spawns(); } // USEFUL METHODS @@ -338,7 +328,7 @@ export class Session { * } */ editStatus(shardId: number, status: StatusUpdate, prio = true): void { - const shard = this.ws.agent.shards.get(shardId); + const shard = this.ws.shards.get(shardId); if (!shard) { throw new Error(`Unknown shard ${shardId}`); @@ -350,7 +340,7 @@ export class Session { status: status.status, since: null, afk: false, - activities: status.activities.map((activity) => { + activities: status.activities.map(activity => { return { name: activity.name, type: activity.type, @@ -399,7 +389,7 @@ export class Session { permissions: options, }, { - headers: { 'Authorization': `Bearer ${bearerToken}` }, + headers: { Authorization: `Bearer ${bearerToken}` }, }, ); } @@ -463,7 +453,7 @@ export class Session { guildId ? GUILD_APPLICATION_COMMANDS(this.applicationId, guildId) : APPLICATION_COMMANDS(this.applicationId), - options.map((o) => + options.map(o => this.isContextApplicationCommand(o) ? { name: o.name, diff --git a/packages/helpers/CHANGELOG.md b/packages/helpers/CHANGELOG.md index e460369..e981432 100644 --- a/packages/helpers/CHANGELOG.md +++ b/packages/helpers/CHANGELOG.md @@ -1,9 +1,9 @@ # @biscuitland/helpers -## 2.0.4 +## 2.0.5 ### Patch Changes - Updated dependencies - - @biscuitland/api-types@2.0.4 - - @biscuitland/core@2.0.4 + - @biscuitland/api-types@2.0.5 + - @biscuitland/core@2.0.5 diff --git a/packages/helpers/package.json b/packages/helpers/package.json index 07fcfb8..27dfa9b 100644 --- a/packages/helpers/package.json +++ b/packages/helpers/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/helpers", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", @@ -23,8 +23,8 @@ } }, "dependencies": { - "@biscuitland/api-types": "^2.0.4", - "@biscuitland/core": "^2.0.4" + "@biscuitland/api-types": "^2.0.5", + "@biscuitland/core": "^2.0.5" }, "devDependencies": { "tsup": "^6.1.3" diff --git a/packages/rest/CHANGELOG.md b/packages/rest/CHANGELOG.md index eb3895f..ebce8d7 100644 --- a/packages/rest/CHANGELOG.md +++ b/packages/rest/CHANGELOG.md @@ -1,6 +1,6 @@ # @biscuitland/rest -## 2.0.4 +## 2.0.5 ### Major Changes @@ -9,4 +9,4 @@ ### Patch Changes - Updated dependencies - - @biscuitland/api-types@2.0.4 + - @biscuitland/api-types@2.0.5 diff --git a/packages/rest/package.json b/packages/rest/package.json index 7700d43..576b950 100644 --- a/packages/rest/package.json +++ b/packages/rest/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/rest", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", @@ -23,7 +23,7 @@ } }, "dependencies": { - "@biscuitland/api-types": "^2.0.4" + "@biscuitland/api-types": "^2.0.5" }, "devDependencies": { "tsup": "^6.1.3" diff --git a/packages/ws/CHANGELOG.md b/packages/ws/CHANGELOG.md index b717ce3..1e51d0d 100644 --- a/packages/ws/CHANGELOG.md +++ b/packages/ws/CHANGELOG.md @@ -1,6 +1,6 @@ # @biscuitland/ws -## 2.0.4 +## 2.0.5 ### Major Changes @@ -9,4 +9,4 @@ ### Patch Changes - Updated dependencies - - @biscuitland/api-types@2.0.4 + - @biscuitland/api-types@2.0.5 diff --git a/packages/ws/package.json b/packages/ws/package.json index 7c0973c..7c53af4 100644 --- a/packages/ws/package.json +++ b/packages/ws/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/ws", - "version": "2.0.4", + "version": "2.0.5", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", @@ -23,7 +23,8 @@ } }, "dependencies": { - "@biscuitland/api-types": "^2.0.4", + "@biscuitland/api-types": "^2.0.5", + "@sapphire/async-queue": "^1.5.0", "ws": "^8.8.1" }, "devDependencies": { diff --git a/packages/ws/src/adapters/default-ws-adapter.ts b/packages/ws/src/adapters/default-ws-adapter.ts deleted file mode 100644 index 15e95a8..0000000 --- a/packages/ws/src/adapters/default-ws-adapter.ts +++ /dev/null @@ -1,234 +0,0 @@ -import { createLeakyBucket } from '../utils/bucket-util'; - -import type { LeakyBucket } from '../utils/bucket-util'; - -import type { GatewayBot, PickPartial } from '@biscuitland/api-types'; -import type { WsAdapter } from './ws-adapter'; - -import type { - Shard, - ShardGatewayConfig, - ShardOptions, -} from '../services/shard'; - -import { Agent } from '../services/agent'; - -export class DefaultWsAdapter implements WsAdapter { - static readonly DEFAULTS = { - spawnShardDelay: 5300, - - shardsPerWorker: 25, - totalWorkers: 4, - - gatewayBot: { - url: 'wss://gateway.discord.gg', - shards: 1, - - sessionStartLimit: { - total: 1000, - remaining: 1000, - resetAfter: 0, - maxConcurrency: 1, - }, - }, - - firstShardId: 0, - - lastShardId: 1, - }; - - buckets = new Map< - number, - { - workers: { id: number; queue: number[] }[]; - leak: LeakyBucket; - } - >(); - - options: Options; - - agent: Agent; - - constructor(options: DefaultWsOptions) { - this.options = Object.assign(Object.create(DefaultWsAdapter.DEFAULTS), options); - - this.options.firstShardId = this.options.firstShardId ?? 0; - this.options.lastShardId = this.options.lastShardId ?? this.options.totalShards - 1 ?? 1; - - this.agent = new Agent({ - totalShards: this.options.totalShards ?? this.options.gatewayBot.shards ?? 1, - gatewayConfig: this.options.gatewayConfig, - createShardOptions: this.options.createShardOptions, - - handleMessage: (shard: Shard, message: MessageEvent) => { - return this.options.handleDiscordPayload(shard, message); - }, - - handleIdentify: (id: number) => { - return this.buckets.get(id % this.options.gatewayBot.sessionStartLimit.maxConcurrency)!.leak.acquire(1); - }, - }); - } - - /** - * @inheritDoc - */ - - prepareBuckets() { - for ( - let i = 0; - i < this.options.gatewayBot.sessionStartLimit.maxConcurrency; - ++i - ) { - this.buckets.set(i, { - workers: [], - leak: createLeakyBucket({ - max: 1, - refillAmount: 1, - refillInterval: this.options.spawnShardDelay, - }), - }); - } - - for ( - let shardId = this.options.firstShardId; - shardId <= this.options.lastShardId; - ++shardId - ) { - if (shardId >= this.agent.options.totalShards) { - throw new Error( - `Shard (id: ${shardId}) is bigger or equal to the used amount of used shards which is ${this.agent.options.totalShards}` - ); - } - - const bucketId = shardId % this.options.gatewayBot.sessionStartLimit.maxConcurrency; - const bucket = this.buckets.get(bucketId); - - if (!bucket) { - throw new Error( - `Shard (id: ${shardId}) got assigned to an illegal bucket id: ${bucketId}, expected a bucket id between 0 and ${ - this.options.gatewayBot.sessionStartLimit - .maxConcurrency - 1 - }` - ); - } - - const workerId = this.workerId(shardId); - const worker = bucket.workers.find(w => w.id === workerId); - - if (worker) { - worker.queue.push(shardId); - } else { - bucket.workers.push({ id: workerId, queue: [shardId] }); - } - } - } - - /** - * @inheritDoc - */ - - prepareShards() { - this.buckets.forEach((bucket, bucketId) => { - for (const worker of bucket.workers) { - for (const shardId of worker.queue) { - this.workerToIdentify(worker.id, shardId, bucketId); - } - } - }); - } - - /** - * @inheritDoc - */ - - calculateTotalShards(): number { - if (this.agent.options.totalShards < 100) { - return this.agent.options.totalShards; - } - - return ( - Math.ceil( - this.agent.options.totalShards / - (this.options.gatewayBot.sessionStartLimit - .maxConcurrency === 1 - ? 16 - : this.options.gatewayBot.sessionStartLimit - .maxConcurrency) - ) * this.options.gatewayBot.sessionStartLimit.maxConcurrency - ); - } - - /** - * @inheritDoc - */ - - workerToIdentify(_workerId: number, shardId: number, _bucketId: number) { - return this.agent.identify(shardId); - } - - /** - * @inheritDoc - */ - - workerId(shardId: number) { - let workerId = Math.floor(shardId / this.options.shardsPerWorker); - - if (workerId >= this.options.totalWorkers) { - workerId = this.options.totalWorkers - 1; - } - - return workerId; - } - - /** - * @inheritDoc - */ - - shards() { - this.prepareBuckets(); - - this.prepareShards(); - } -} - -export type DefaultWsOptions = Pick< - Options, - Exclude -> & - Partial; - -interface Options { - /** Delay in milliseconds to wait before spawning next shard. */ - spawnShardDelay: number; - - /** The amount of shards to load per worker. */ - shardsPerWorker: number; - - /** The total amount of workers to use for your bot. */ - totalWorkers: number; - - /** Total amount of shards your bot uses. Useful for zero-downtime updates or resharding. */ - totalShards: number; - - /** Id of the first Shard which should get controlled by this manager. */ - firstShardId: number; - - /** Id of the last Shard which should get controlled by this manager. */ - lastShardId: number; - - createShardOptions?: Omit< - ShardOptions, - 'id' | 'totalShards' | 'requestIdentify' | 'gatewayConfig' - >; - - /** Important data which is used by the manager to connect shards to the gateway. */ - gatewayBot: GatewayBot; - - // REMOVE THIS - - gatewayConfig: PickPartial; - - /** Sends the discord payload to another guild. */ - handleDiscordPayload: (shard: Shard, data: MessageEvent) => any; -} diff --git a/packages/ws/src/adapters/ws-adapter.ts b/packages/ws/src/adapters/ws-adapter.ts deleted file mode 100644 index 89f63f4..0000000 --- a/packages/ws/src/adapters/ws-adapter.ts +++ /dev/null @@ -1,29 +0,0 @@ -import type { Agent } from '../services/agent'; -import type { GatewayBot } from '@biscuitland/api-types'; - -export interface WsAdapter { - options: Partial; - - /** - * @inheritDoc - */ - - agent: Agent; - - /** - * @inheritDoc - */ - - shards(): void; -} - -interface Options { - /** Id of the first Shard which should get controlled by this manager. */ - firstShardId: number; - - /** Id of the last Shard which should get controlled by this manager. */ - lastShardId: number; - - /** Important data which is used by the manager to connect shards to the gateway. */ - gatewayBot: GatewayBot; -} diff --git a/packages/ws/src/index.ts b/packages/ws/src/index.ts index 20295a7..ce1dbff 100644 --- a/packages/ws/src/index.ts +++ b/packages/ws/src/index.ts @@ -1,9 +1,2 @@ -export { WsAdapter } from './adapters/ws-adapter'; - -export { - DefaultWsAdapter, - DefaultWsOptions, -} from './adapters/default-ws-adapter'; - -export { AgentOptions, Agent } from './services/agent'; +export { ShardManager } from './services/shard-manager'; export { Shard } from './services/shard'; diff --git a/packages/ws/src/services/agent.ts b/packages/ws/src/services/agent.ts deleted file mode 100644 index f47d9f3..0000000 --- a/packages/ws/src/services/agent.ts +++ /dev/null @@ -1,113 +0,0 @@ -import type { ShardGatewayConfig, ShardOptions } from './shard'; -import type { PickPartial } from '@biscuitland/api-types'; - -import { Shard } from './shard'; - -export class Agent { - static readonly DEFAULTS = { - shardIds: [], - - totalShards: 1, - }; - - options: Options; - shards: Map; - - constructor(options: AgentOptions) { - this.options = Object.assign(Agent.DEFAULTS, options); - - const { handleIdentify } = this.options; - - this.shards = new Map( - this.options.shardIds.map(id => { - const shard = new Shard({ - id, - totalShards: this.options.totalShards, - gatewayConfig: this.options.gatewayConfig, - - handleMessage: (shard, message) => { - return this.options.handleMessage(shard, message); - }, - - async handleIdentify() { - return await handleIdentify(id); - }, - - ...this.options.createShardOptions, - }); - - return [id, shard] as const; - }) - ); - } - - /** - * @inheritDoc - */ - - async identify(id: number) { - // @ts-ignore - let shard = this.shards.get(id); - - if (!shard) { - const { handleIdentify } = this.options; - - shard = new Shard({ - id, - totalShards: this.options.totalShards, - gatewayConfig: this.options.gatewayConfig, - - handleMessage: (shard, message) => { - return this.options.handleMessage(shard, message); - }, - - async handleIdentify() { - return await handleIdentify(id); - }, - - ...this.options.createShardOptions, - }); - - this.shards.set(id, shard); - } - - return await shard.identify(); - } - - /** - * @inheritDoc - */ - - async scale() { - // - } -} - -export type AgentOptions = Pick< - Options, - Exclude -> & - Partial; - -interface Options { - /** Ids of the Shards which should be managed. */ - shardIds: number[]; - - /** Total amount of Shard used by the bot. */ - totalShards: number; - - /** Options which are used to create a new Shard. */ - createShardOptions?: Omit< - ShardOptions, - 'id' | 'totalShards' | 'requestIdentify' | 'gatewayConfig' - >; - - /** Gateway configuration which is used when creating a Shard. */ - gatewayConfig: PickPartial; - - /** Sends the discord payload to another guild. */ - handleMessage: (shard: Shard, data: MessageEvent) => any; - - /** This function communicates with the parent manager. */ - handleIdentify(shardId: number): Promise; -} diff --git a/packages/ws/src/services/shard-manager.ts b/packages/ws/src/services/shard-manager.ts new file mode 100644 index 0000000..2b6c9df --- /dev/null +++ b/packages/ws/src/services/shard-manager.ts @@ -0,0 +1,121 @@ +import type { ShardManagerOptions, SMO } from '../types'; +import type { LeakyBucket } from '../utils/bucket'; + +import { Shard } from './shard'; + +import { createLeakyBucket } from '../utils/bucket'; +import { Options } from '../utils/options'; + +export class ShardManager { + static readonly DEFAULTS = { + workers: { + shards: 25, + amount: 5, + delay: 5000 + }, + + shards: { + timeout: 15000, + delay: 5000 + } + }; + + readonly options: SMO; + + readonly buckets = new Map< + number, + { + workers: { id: number; queue: number[] }[]; + leak: LeakyBucket; + } + >(); + + readonly shards = new Map(); + + constructor(options: ShardManagerOptions) { + this.options = Options({}, ShardManager.DEFAULTS, options); + } + + /** Invokes internal processing and respawns shards */ + async respawns(): Promise { + // + } + + /** Invoke internal processing and spawns shards */ + async spawns(): Promise { + const { gateway, workers } = this.options; + + /** Creates the necessary buckets according to concurrency */ + for (let i = 0; i < gateway.session_start_limit.max_concurrency; i++) { + this.buckets.set(i, { + workers: [], + leak: createLeakyBucket({ + max: 1, + refillAmount: 1, + refillInterval: workers.delay, + }), + }); + } + + /** Create the start sequence of the shards inside the buckets. */ + for (let i = 0; i < gateway.shards; i++) { + const bucketID = i % gateway.session_start_limit.max_concurrency; + const bucket = this.buckets.get(bucketID); + + if (bucket) { + const workerID = Math.floor(i / workers.shards); + const worker = bucket.workers.find(w => w.id === workerID); + + if (worker) { + worker.queue.push(i); + } else { + bucket.workers.push({ id: workerID, queue: [i] }); + } + } + } + + /** Route all shards to workers */ + this.buckets.forEach(async bucket => { + for (const worker of bucket.workers) { + + for (const id of worker.queue) { + await this.connect(id); + } + + } + }); + } + + /** Invokes the bucket to prepare the connection to the shard */ + private async connect(id: number): Promise { + const { gateway } = this.options; + + let shard = this.shards.get(id); + + if (!shard) { + shard = new Shard({ + id, + + gateway: this.options.gateway, + + shards: this.options.shards, + + config: this.options.config, + + handlePayloads: async (shard, payload) => { + await this.options.handleDiscordPayload(shard, payload); // remove await? + }, + + handleIdentify: async (id: number) => { + await this.buckets.get(id % gateway.session_start_limit.max_concurrency)!.leak.acquire(1); // remove await? + } + }); + + this.shards.set(id, shard); + } + + await shard.connect(); + + return shard; + } +} diff --git a/packages/ws/src/services/shard.ts b/packages/ws/src/services/shard.ts index 0d80bea..322bd3a 100644 --- a/packages/ws/src/services/shard.ts +++ b/packages/ws/src/services/shard.ts @@ -1,789 +1,444 @@ -import type { - DiscordGatewayPayload, - DiscordHello, - DiscordReady, - PickPartial, -} from '@biscuitland/api-types'; -import type { LeakyBucket } from '../utils/bucket-util'; +import type { DiscordGatewayPayload } from '@biscuitland/api-types'; +import type { ShardOptions, SO, ShardStatus } from '../types'; -import { createLeakyBucket } from '../utils/bucket-util'; +import type { LeakyBucket } from '../utils/bucket'; -import { - Constants, - GatewayOpcodes, - GatewayCloseEventCodes, -} from '@biscuitland/api-types'; +import { GatewayOpcodes } from '@biscuitland/api-types'; +import { createLeakyBucket } from '../utils/bucket'; -import WebSocket from 'ws'; -import { inflateSync } from 'node:zlib'; +import { WebSocket } from 'ws'; +import { Options } from '../utils/options'; -export const DEFAULT_HEARTBEAT_INTERVAL = 45000; - -export const MAX_GATEWAY_REQUESTS_PER_INTERVAL = 120; -export const GATEWAY_RATE_LIMIT_RESET_INTERVAL = 60_000; - -export type PickOptions = Pick< - ShardOptions, - Exclude -> & - Partial; - -const decoder = new TextDecoder(); - - -export interface ShardOptions { - /** Id of the shard which should be created. */ - id: number; - - /** Gateway configuration for the shard. */ - gatewayConfig: PickPartial; - - /** The total amount of shards which are used to communicate with Discord. */ - totalShards: number; - - /** The maximum of requests which can be send to discord per rate limit tick. */ - maxRequestsPerRateLimitTick: number; - - /** The previous payload sequence number. */ - previousSequenceNumber: number; - - /** In which interval (in milliseconds) the gateway resets it's rate limit. */ - rateLimitResetInterval: number; - - /** Sends the discord payload to another guild. */ - handleMessage(shard: Shard, message: MessageEvent): unknown; - - /** This function communicates with the management process. */ - handleIdentify: (shardId: number) => Promise; -} - -export interface ShardGatewayConfig { - /** Whether incoming payloads are compressed using zlib. */ - compress: boolean; - - /** The calculated intent value of the events which the shard should receive. */ - intents: number; - - /** Identify properties to use */ - properties: { - /** Operating system the shard runs on. */ - os: 'darwin' | 'linux' | 'windows'; - - /** The "browser" where this shard is running on. */ - browser: string; - - /** The device on which the shard is running. */ - device: string; - }; - - /** Bot token which is used to connect to Discord */ - token: string; - - /** The URL of the gateway which should be connected to. */ - url: string; - - /** The gateway version which should be used.*/ - version: number; -} - -export interface ShardSocketRequest { - /** The OP-Code for the payload to send. */ - op: GatewayOpcodes; - - /** Payload data. */ - d: unknown; -} - -export interface ShardEvents { - /** A heartbeat has been send. */ - heartbeat?(shard: Shard): unknown; - - /** A heartbeat ACK was received. */ - heartbeatAck?(shard: Shard): unknown; - - /** Shard has received a Hello payload. */ - hello?(shard: Shard): unknown; - - /** The Shards session has been invalidated. */ - - invalidSession?(shard: Shard, resumable: boolean): unknown; - /** The shard has started a resume action. */ - resuming?(shard: Shard): unknown; - - /** The shard has successfully resumed an old session. */ - resumed?(shard: Shard): unknown; - - /** Discord has requested the Shard to reconnect. */ - requestedReconnect?(shard: Shard): unknown; - /** The shard started to connect to Discord's gateway. */ - connecting?(shard: Shard): unknown; - - /** The shard is connected with Discord's gateway. */ - connected?(shard: Shard): unknown; - - /** The shard has been disconnected from Discord's gateway. */ - disconnected?(shard: Shard): unknown; - - /** The shard has started to identify itself to Discord. */ - identifying?(shard: Shard): unknown; - - /** The shard has successfully been identified itself with Discord. */ - identified?(shard: Shard): unknown; - - /** The shard has received a message from Discord. */ - message?(shard: Shard, payload: DiscordGatewayPayload): unknown; -} - -export interface ShardHeart { - /** Whether or not the heartbeat was acknowledged by Discord in time. */ - acknowledged: boolean; - - /** Interval between heartbeats requested by Discord. */ - interval: number; - - /** Id of the interval, which is used for sending the heartbeats. */ - intervalId?: number; - - /** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */ - lastAck?: number; - - /** Unix timestamp (in milliseconds) when the last heartbeat was sent. */ - lastBeat?: number; - - /** Round trip time (in milliseconds) from Shard to Discord and back. */ - rtt?: number; - - /** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */ - timeoutId?: number; -} - -export enum ShardSocketCloseCodes { - /** A regular Shard shutdown. */ - Shutdown = 3000, - - /** A resume has been requested and therefore the old connection needs to be closed. */ - ResumeClosingOldConnection = 3024, - - /** Did not receive a heartbeat ACK in time. */ - ZombiedConnection = 3010, - - /** Discordeno's gateway tests hae been finished, therefore the Shard can be turned off. */ - TestingFinished = 3064, - - /** Special close code reserved for Discordeno's zero-downtime resharding system. */ - Resharded = 3065, - - /** Shard is re-identifying therefore the old connection needs to be closed. */ - ReIdentifying = 3066, -} - -export enum ShardState { - /** Shard is fully connected to the gateway and receiving events from Discord. */ - Connected = 0, - - /** Shard started to connect to the gateway. */ - Connecting = 1, - - /** Shard got disconnected and reconnection actions have been started. */ - Disconnected = 2, - - /** The shard is connected to the gateway but only heartbeating. */ - Unidentified = 3, - - /** Shard is trying to identify with the gateway to create a new session. */ - Identifying = 4, - - /** Shard is trying to resume a session with the gateway. */ - Resuming = 5, - - /** Shard got shut down studied or due to a not (self) fixable error and may not attempt to reconnect on its own. */ - Offline = 6, -} +const textDecoder = new TextDecoder(); export class Shard { static readonly DEFAULTS = { - /** The maximum of requests which can be send to discord per rate limit tick. */ - maxRequestsPerRateLimitTick: MAX_GATEWAY_REQUESTS_PER_INTERVAL, - - /** The previous payload sequence number. */ - previousSequenceNumber: null, - - /** In which interval (in milliseconds) the gateway resets it's rate limit. */ - rateLimitResetInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL, + // }; - options: ShardOptions; + readonly options: SO; - offlineSendQueue: any[]; + heartbeatInterval: any | null = null; + heartbeatAck = false; - totalShards: number; + heartbeatAt = -1; + interval = 45000; - sessionId!: string; + resumeURL: string | null = null; + sessionID: string | null = null; - resolves: Map< - 'READY' | 'RESUMED' | 'INVALID_SESSION', - (payload: DiscordGatewayPayload) => void - >; + sequence = 0 ; - socket: WebSocket | undefined; + resolves: Map void> = new Map(); + + status: ShardStatus = 'Disconnected'; bucket: LeakyBucket; - events: ShardEvents; + trace: any = null; - state: ShardState; + ws: WebSocket | null = null; - heart: ShardHeart; - - id: number; - - constructor(options: PickOptions) { - this.options = Object.assign(options, Shard.DEFAULTS); - - if (!options.gatewayConfig) { - this.options.gatewayConfig = { - properties: { - os: 'linux', - device: 'Biscuit', - browser: 'Biscuit', - }, - - compress: false, - version: Constants.API_VERSION, - intents: 0, - - token: this.options.gatewayConfig.token, - - url: 'wss://gateway.discord.gg', - }; - } - - this.options.gatewayConfig = { - compress: this.options.gatewayConfig.compress ?? false, - intents: this.options.gatewayConfig.intents ?? 0, - properties: { - os: this.options.gatewayConfig?.properties?.os ?? 'linux', - device: - this.options.gatewayConfig?.properties?.device ?? 'Biscuit', - browser: - this.options.gatewayConfig?.properties?.browser ?? - 'Biscuit', - }, - url: this.options.gatewayConfig.url ?? 'wss://gateway.discord.gg', - - token: this.options.gatewayConfig.token, - version: - this.options.gatewayConfig.version ?? Constants.API_VERSION, - }; - - this.offlineSendQueue = []; - - this.resolves = new Map< - 'READY' | 'RESUMED' | 'INVALID_SESSION', - (payload: DiscordGatewayPayload) => void - >(); + constructor(options: ShardOptions) { + this.options = Options({}, Shard.DEFAULTS, options); this.bucket = createLeakyBucket({ - max: MAX_GATEWAY_REQUESTS_PER_INTERVAL, - refillInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL, - refillAmount: MAX_GATEWAY_REQUESTS_PER_INTERVAL, + max: 120, + refillInterval: 60000, + refillAmount: 120, + }); + } + + resume() { + this.status = 'Resuming'; + + this.send({ + op: GatewayOpcodes.Resume, + d: { + token: `Bot ${this.options.config.token}`, + session_id: this.sessionID, + seq: this.sequence, + } + }); + } + + destroy() { + this.ws = null; + + this.bucket = createLeakyBucket({ + max: 120, + refillInterval: 60000, + refillAmount: 120, }); - this.events = {} as ShardEvents; + this.sequence = 0; + this.resumeURL = null; + this.sessionID = null; - this.heart = { - acknowledged: false, - interval: DEFAULT_HEARTBEAT_INTERVAL, - }; - - this.totalShards = this.options.totalShards, - - this.state = ShardState.Offline; - - this.id = options.id; + this.heartbeatInterval = null; } - /** - * @inheritDoc - */ - - async startHeartbeating(interval: number) { - this.heart.interval = interval; - - if ( - [ShardState.Disconnected, ShardState.Offline].includes(this.state) - ) { - this.state = ShardState.Unidentified; - } - - const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5)); - - const it1: any = setTimeout(() => { - if (this.state === ShardState.Connected) { - this.socket?.send( - JSON.stringify({ - op: GatewayOpcodes.Heartbeat, - d: this.options.previousSequenceNumber, - }) - ); - - this.heart.lastBeat = Date.now(); - this.heart.acknowledged = false; - - const it: any = setInterval(async () => { - if (!this.heart.acknowledged) { - this.close( - ShardSocketCloseCodes.ZombiedConnection, - 'Zombied connection, did not receive an heartbeat ACK in time.' - ); - - return await this.identify(); - } - - this.heart.acknowledged = false; - - if (this.state === ShardState.Connected) { - this.socket?.send( - JSON.stringify({ - op: GatewayOpcodes.Heartbeat, - d: this.options.previousSequenceNumber, - }) - ); - - this.heart.lastBeat = Date.now(); - - this.events.heartbeat?.(this); - } - }, this.heart.interval); - - this.heart.intervalId = it; - } - }, jitter); - - this.heart.timeoutId = it1; - } - - /** - * @inheritDoc - */ - - async stopHeartbeating() { - clearInterval(this.heart.intervalId); - - clearTimeout(this.heart.timeoutId); - } - - /** - * @inheritDoc - */ - - async handleMessage(message: MessageEvent): Promise { - let data = message.data; - - if (this.options.gatewayConfig.compress && data instanceof Blob) { - // @ts-ignore - data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer()))); - } - - if (typeof data !== 'string') { + connect() { + if (this.ws && this.ws.readyState !== WebSocket.CLOSED) { return; } - const messageData = JSON.parse(data) as DiscordGatewayPayload; + this.status = 'Connecting'; - switch (messageData.op) { - case GatewayOpcodes.Heartbeat: { - if (!this.isOpen()) { - return; + if (this.sessionID && this.resumeURL) { + this.ws = new WebSocket(this.resumeURL); + } else { + this.ws = new WebSocket('wss://gateway.discord.gg/?v=10&encoding=json'); + } + + this.ws.on('message', this.onMessage.bind(this)); + this.ws.on('close', this.onClose.bind(this)); + this.ws.on('error', this.onError.bind(this)); + this.ws.on('open', this.onOpen.bind(this)); + + return new Promise(resolve => { + this.resolves.set('READY', () => { + setTimeout(() => resolve(true), this.options.shards.timeout); + }); + }); + } + + identify() { + this.status = 'Identifying'; + + this.send({ + op: GatewayOpcodes.Identify, + d: { + token: `Bot ${this.options.config.token}`, + compress: false, + properties: { + os: 'linux', + device: 'Biscuit', + browser: 'Biscuit' + }, + intents: this.options.config.intents, + shard: [this.options.id, this.options.gateway.shards], + } + }); + } + + heartbeat(requested = false) { + if (this.status === 'Resuming' || this.status === 'Identifying') { + return; + } + + if (!requested) { + if (!this.heartbeatAt) { + // eslint-disable-next-line no-console + console.log(JSON.stringify({ + heartbeatInterval: this.heartbeatInterval, + heartbeatAck: this.heartbeatAck, + timestamp: Date.now(), + status: this.status + })); + + this.disconnect(); + return; + } + + this.heartbeatAck = false; + } + + this.heartbeatAt = Date.now(); + + this.send({ + op: GatewayOpcodes.Heartbeat, + d: this.sequence, + }, true); + } + + disconnect(reconnect = false) { + if (!this.ws) { + return; + } + + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + + if (this.ws.readyState !== WebSocket.CLOSED) { + this.ws.removeAllListeners(); + + if (this.sessionID && reconnect) { + if (this.ws.readyState !== WebSocket.OPEN) { + this.ws.close(4999, 'Reconnect'); + } else { + this.ws.terminate(); } + } else { + this.ws.close(1000, 'Normal Close'); + } + } - this.heart.lastBeat = Date.now(); + this.ws = null; - if (this.state === ShardState.Connected) { - this.socket?.send( - JSON.stringify({ - op: GatewayOpcodes.Heartbeat, - d: this.options.previousSequenceNumber, - }), - ); - this.events.heartbeat?.(this); + this.status = 'Disconnected'; + + this.resolves = new Map(); + this.heartbeatAck = true; + + if (reconnect) { + if (this.sessionID) { + this.connect(); + } else { + // this.connect(); + } + } else { + this.destroy(); + } + } + + async send(payload: Partial, priority = false) { + if (this.ws && this.ws.readyState === WebSocket.OPEN) { + await this.bucket.acquire(1, priority); + + this.ws.send(JSON.stringify(payload)); + } + } + + private async onMessage(data: any, isBinary: boolean) { + const payload = this.pack(data as Buffer | ArrayBuffer, isBinary); + + if (payload.s != null) { + this.sequence = payload.s; + } + + switch (payload.op) { + case GatewayOpcodes.Dispatch: + + switch (payload.t) { + case 'READY': + this.debug([`[READY] shard id: ${this.options.id}`]); + + this.status = 'Ready'; + + // @ts-ignore + this.resumeURL = `${payload.d.resume_gateway_url}/?v=10&encoding=json`; + + // @ts-ignore + this.sessionID = payload.d.session_id; + + // @ts-ignore + this.sequence = 0; + + this.resolves.get('READY')?.(payload); + this.resolves.delete('READY'); + + break; + + case 'RESUMED': + this.status = 'Ready'; + + this.resolves.get('RESUMED')?.(payload); + this.resolves.delete('RESUMED'); + + break; } break; - } - case GatewayOpcodes.Hello: { - const interval = (messageData.d as DiscordHello).heartbeat_interval; - this.startHeartbeating(interval); + case GatewayOpcodes.Heartbeat: + this.heartbeat(true); - if (this.state !== ShardState.Resuming) { + break; + case GatewayOpcodes.Reconnect: + this.disconnect(true); + break; + + case GatewayOpcodes.InvalidSession: + + if (payload.d) { + this.resume(); + } else { + this.sessionID = null; + this.sequence = 0; + + this.identify(); + } + + break; + + case GatewayOpcodes.Hello: + + // @ts-ignore + if (payload.d.heartbeat_interval > 0) { + if (this.heartbeatInterval) { + clearInterval(this.heartbeatInterval); + } + + // @ts-ignore + this.heartbeatInterval = setInterval(() => this.heartbeat(), payload.d.heartbeat_interval); + + // @ts-ignore + this.interval = payload.d.heartbeat_interval; + } + + if (this.status !== 'Resuming') { this.bucket = createLeakyBucket({ max: this.safe(), - refillInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL, + refillInterval: 60000, refillAmount: this.safe(), waiting: this.bucket.waiting, }); } - this.events.hello?.(this); - - break; - } - case GatewayOpcodes.HeartbeatACK: { - this.heart.acknowledged = true; - this.heart.lastAck = Date.now(); - - if (this.heart.lastBeat) { - this.heart.rtt = this.heart.lastAck - this.heart.lastBeat; - } - - this.events.heartbeatAck?.(this); - - break; - } - case GatewayOpcodes.Reconnect: { - this.events.requestedReconnect?.(this); - - await this.resume(); - - break; - } - case GatewayOpcodes.InvalidSession: { - const resumable = messageData.d as boolean; - - this.events.invalidSession?.(this, resumable); - - await this.delay(Math.floor((Math.random() * 4 + 1) * 1000)); - - this.resolves.get('INVALID_SESSION')?.(messageData); - this.resolves.delete('INVALID_SESSION'); - - if (!resumable) { - await this.identify(); - - break; - } - - await this.resume(); - - break; - } - } - - if (messageData.t === 'RESUMED') { - - this.state = ShardState.Connected; - this.events.resumed?.(this); - - this.offlineSendQueue.map(resolve => resolve()); - - this.resolves.get('RESUMED')?.(messageData); - this.resolves.delete('RESUMED'); - } else if (messageData.t === 'READY') { - const payload = messageData.d as DiscordReady; - - this.sessionId = payload.session_id; - this.state = ShardState.Connected; - - this.offlineSendQueue.map(resolve => resolve()); - - this.resolves.get('READY')?.(messageData); - this.resolves.delete('READY'); - } - - if (messageData.s !== null) { - this.options.previousSequenceNumber = messageData.s; - } - - this.events.message?.(this, messageData); - this.options.handleMessage(this, messageData as any); - } - - /** - * @inheritDoc - */ - - async shutdown(): Promise { - this.close(ShardSocketCloseCodes.Shutdown, 'Shard shutting down.'); - - this.state = ShardState.Offline; - } - - /** - * @inheritDoc - */ - async handleIdentify(): Promise { - return await this.options.handleIdentify(this.id); - } - - /** - * @inheritDoc - */ - - async identify(): Promise { - if (this.state === ShardState.Connected) { - this.close( - ShardSocketCloseCodes.ReIdentifying, - 'Re-identifying closure of old connection.' - ); - } - - this.state = ShardState.Identifying; - this.events.identifying?.(this); - - if (!this.isOpen()) { - await this.connect(); - } - - await this.handleIdentify(); - - this.send( - { - op: GatewayOpcodes.Identify, - d: { - token: `Bot ${this.options.gatewayConfig.token}`, - compress: this.options.gatewayConfig.compress, - properties: this.options.gatewayConfig.properties, - intents: this.options.gatewayConfig.intents, - shard: [this.id, this.totalShards] - }, - }, - true - ); - - return new Promise(resolve => { - this.resolves.set('READY', () => { - this.events.identified?.(this); - resolve(); - }); - - this.resolves.set('INVALID_SESSION', () => { - this.resolves.delete('READY'); - resolve(); - }); - }); - } - - /** - * @inheritDoc - */ - - async connect(): Promise { - if ( - ![ShardState.Identifying, ShardState.Resuming].includes(this.state!) - ) { - this.state = ShardState.Connecting; - } - - this.events.connecting?.(this); - - const socket = new WebSocket( - `${this.options.gatewayConfig.url}/?v=${this.options.gatewayConfig.version}&encoding=json` - ); - - this.socket = socket; - - socket.onerror = (event: any) => console.log(event); - - socket.onclose = (event: any) => this.handleClose(event); - - socket.onmessage = (message: any) => { - this.handleMessage(message); - }; - - return new Promise(resolve => { - socket.onopen = () => { - if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) { - this.state = ShardState.Unidentified; - } - - this.events.connected?.(this); - - resolve(); - }; - }); - } - - /** - * @inheritDoc - */ - - async resume(): Promise { - if (this.isOpen()) { - this.close( - ShardSocketCloseCodes.ResumeClosingOldConnection, - 'Reconnecting the shard, closing old connection.' - ); - } - - if (!this.sessionId) { - return await this.identify(); - } - - this.state = ShardState.Resuming; - - await this.connect(); - - this.send( - { - op: GatewayOpcodes.Resume, - d: { - token: `Bot ${this.options.gatewayConfig.token}`, - session_id: this.sessionId, - seq: this.options.previousSequenceNumber ?? 0, - }, - }, - true - ); - - return new Promise(resolve => { - this.resolves.set('RESUMED', () => resolve()); - - this.resolves.set('INVALID_SESSION', () => { - this.resolves.delete('RESUMED'); - resolve(); - }); - }); - } - - /** - * @inheritDoc - */ - - async send(message: ShardSocketRequest, highPriority: boolean) { - await this.checkOffline(highPriority); - - await this.bucket.acquire(1, highPriority); - - await this.checkOffline(highPriority); - - this.socket?.send(JSON.stringify(message)); - } - - /** - * @inheritDoc - */ - - async handleClose(close: CloseEvent): Promise { - this.stopHeartbeating(); - - switch (close.code) { - case ShardSocketCloseCodes.TestingFinished: { - this.state = ShardState.Offline; - this.events.disconnected?.(this); - - return; - } - - case ShardSocketCloseCodes.Shutdown: - case ShardSocketCloseCodes.ReIdentifying: - case ShardSocketCloseCodes.Resharded: - case ShardSocketCloseCodes.ResumeClosingOldConnection: - case ShardSocketCloseCodes.ZombiedConnection: { - this.state = ShardState.Disconnected; - this.events.disconnected?.(this); - - return; - } - - case GatewayCloseEventCodes.UnknownOpcode: - case GatewayCloseEventCodes.NotAuthenticated: - case GatewayCloseEventCodes.InvalidSeq: - case GatewayCloseEventCodes.RateLimited: - case GatewayCloseEventCodes.SessionTimedOut: { - this.state = ShardState.Identifying; - this.events.disconnected?.(this); - - return await this.identify(); - } - - case GatewayCloseEventCodes.AuthenticationFailed: - case GatewayCloseEventCodes.InvalidShard: - case GatewayCloseEventCodes.ShardingRequired: - case GatewayCloseEventCodes.InvalidApiVersion: - case GatewayCloseEventCodes.InvalidIntents: - case GatewayCloseEventCodes.DisallowedIntents: { - this.state = ShardState.Offline; - this.events.disconnected?.(this); - - throw new Error( - close.reason || - 'Discord gave no reason! GG! You broke Discord!' - ); - } - - case GatewayCloseEventCodes.UnknownError: - case GatewayCloseEventCodes.DecodeError: - case GatewayCloseEventCodes.AlreadyAuthenticated: - default: { - this.state = ShardState.Resuming; - this.events.disconnected?.(this); - - return await this.resume(); - } - } - } - - /** - * @inheritDoc - */ - - async checkOffline(highPriority: boolean): Promise { - if (!this.isOpen()) { - await new Promise(resolve => { - if (highPriority) { - this.offlineSendQueue.unshift(resolve); + if (this.sessionID) { + this.resume(); } else { - this.offlineSendQueue.push(resolve); + this.identify(); + this.heartbeat(); } - }); + + break; + case GatewayOpcodes.HeartbeatACK: + this.heartbeatAck = true; + + break; + + } + + // @ts-ignore + if (payload?.d?._trace) { + // @ts-ignore + this.trace = JSON.parse(payload.d._trace); + } + + this.options.handlePayloads(this, payload); + } + + private async onClose(code: number) { + this.debug([`[onClose] shard id: ${this.options.id}`, code]); + + switch (code) { + case 1001: + // Discord WebSocket requesting client reconnect + this.disconnect(true); + break; + + case 1006: + // problems with connections + this.disconnect(true); + break; + + case 4000: + // Unknown error + this.disconnect(); + break; + + case 4001: + // Unknown opcode + this.disconnect(); + break; + + case 4002: + // Decode error + this.disconnect(); + break; + + case 4003: + // Not authenticated + this.sessionID = null; + this.disconnect(); + break; + + case 4004: + // Authentication failed + this.sessionID = null; + this.disconnect(); + break; + + case 4005: + // Already authenticated + this.sessionID = null; + this.disconnect(); + break; + + case 4007: + // Invalid sequence + this.sequence = 0; + this.disconnect(); + break; + + case 4008: + // Rate limited + this.disconnect(); + break; + + case 4009: + // Session timed out + this.disconnect(); + break; + + case 4010: + // Invalid shard + this.sessionID = null; + this.disconnect(); + break; + + case 4011: + // Sharding required + this.sessionID = null; + this.disconnect(); + break; + + case 4012: + // Invalid API version + this.sessionID = null; + this.disconnect(); + break; + + case 4013: + // Invalid intent(s) + this.sessionID = null; + this.disconnect(); + break; + + case 4014: + // Disallowed intent(s) + this.sessionID = null; + this.disconnect(); + break; + + default: + this.disconnect(); + break; } } - /** - * @inheritDoc - */ - - close(code: number, reason: string): void { - if (this.socket?.readyState !== WebSocket.OPEN) { - return; - } - - return this.socket?.close(code, reason); + private async onError(error: Error) { + this.debug([`[onError] shard id: ${this.options.id}`, error]); } - /** - * @inheritDoc - */ - - isOpen(): boolean { - return this.socket?.readyState === WebSocket.OPEN; + private async onOpen() { + this.status = 'Handshaking'; + this.heartbeatAck = true; } - /** - * @inheritDoc - */ + /** temporal */ + debug(_messages: unknown[]) { + // for (let index = 0; index < messages.length; index++) { + // const message = messages[index]; - async delay(ms: number): Promise { - return new Promise((res): any => - setTimeout((): void => { - res(); - }, ms) - ); + // // eslint-disable-next-line no-console + // console.log(message); + // } } - /** - * @inheritDoc - */ + /** temporal */ + pack(data: Buffer | ArrayBuffer, _isBinary: boolean): DiscordGatewayPayload { + return JSON.parse(textDecoder.decode(new Uint8Array(data))) as DiscordGatewayPayload; + } - safe(): number { - const requests = - this.options.maxRequestsPerRateLimitTick - - Math.ceil( - this.options.rateLimitResetInterval / this.heart.interval - ) * - 2; + /** temporal */ + safe() { + const requests = 120 - Math.ceil(60000 / this.interval) * 2; return requests < 0 ? 0 : requests; } diff --git a/packages/ws/src/types.ts b/packages/ws/src/types.ts new file mode 100644 index 0000000..fca384b --- /dev/null +++ b/packages/ws/src/types.ts @@ -0,0 +1,91 @@ +import type { DiscordGatewayPayload, DiscordGetGatewayBot, GatewayIntents } from '@biscuitland/api-types'; + +import type { ShardManager } from './services/shard-manager'; +import type { Shard } from './services/shard'; + +/** ShardManager */ + +export type ShardManagerOptions = Pick> & Partial; + +export interface SMO { + /** Function for interpretation of messages from discord */ + handleDiscordPayload: (shard: Shard, payload: DiscordGatewayPayload) => unknown; + + /** Based on the information in Get Gateway */ + gateway: DiscordGetGatewayBot; + + /** Workers options */ + workers: ShardManagerWorkersOptions; + + /** Authentication */ + config: { + intents?: GatewayIntents; + token: string; + }; + + /** Options shards */ + shards: ShardManagerShardsOptions; +} + +export interface ShardManagerWorkersOptions { + /** + * Number of shards per worker + * @default 25 + */ + shards: number; + + /** + * Number of workers + * @default 5 + */ + amount: number; + + /** + * Waiting time between workers + * @default 5000 + */ + delay: number; +} + +export interface ShardManagerShardsOptions { + /** + * Waiting time to receive the ready event. + * @default 15000 + */ + timeout: number; + + /** + * Waiting time between shards + * @default 5000 + */ + delay: number; +} + +/** Shard */ + +export type ShardOptions = Pick> & Partial; + +export interface SO { + /** Shard Id */ + id: number; + + /** Based on the information in Get Gateway */ + gateway: DiscordGetGatewayBot; + + /** Options shards */ + shards: ShardManagerShardsOptions; + + /** Authentication */ + config: { + intents?: GatewayIntents; + token: string; + }; + + /** Function for interpretation of messages from discord */ + handlePayloads: (shard: Shard, data: DiscordGatewayPayload) => Promise; + + /** Notify the manager if the shard is ready. */ + handleIdentify: (id: number) => Promise; +} + +export type ShardStatus = 'Disconnected' | 'Handshaking' | 'Connecting' | 'Heartbeating' | 'Identifying' | 'Resuming' | 'Ready'; diff --git a/packages/ws/src/utils/bucket-util.ts b/packages/ws/src/utils/bucket.ts similarity index 99% rename from packages/ws/src/utils/bucket-util.ts rename to packages/ws/src/utils/bucket.ts index 3a7b97c..2c44fb2 100644 --- a/packages/ws/src/utils/bucket-util.ts +++ b/packages/ws/src/utils/bucket.ts @@ -1,4 +1,4 @@ -/** unnecessary */ +/** Create from scratch */ import type { PickPartial } from '@biscuitland/api-types'; diff --git a/packages/ws/src/utils/options.ts b/packages/ws/src/utils/options.ts new file mode 100644 index 0000000..bf43dbf --- /dev/null +++ b/packages/ws/src/utils/options.ts @@ -0,0 +1,42 @@ +const isPlainObject = (value: any) => { + return ( + value !== null + && typeof value === 'object' + && typeof value.constructor === 'function' + // eslint-disable-next-line no-prototype-builtins + && (value.constructor.prototype.hasOwnProperty('isPrototypeOf') || Object.getPrototypeOf(value.constructor.prototype) === null) + ) + || (value && Object.getPrototypeOf(value) === null); +}; + +const isObject = (o: any) => { + return !!o && typeof o === 'object' && !Array.isArray(o); +}; + +export const Options = (defaults: any, ...options: any[]): any => { + if (!options.length) { + return defaults; + } + + const source = options.shift(); + + if (isObject(defaults) && isPlainObject(source)) { + Object.entries(source).forEach(([key, value]) => { + if (typeof value === 'undefined') { + return; + } + + if (isPlainObject(value)) { + if (!(key in defaults)) { + Object.assign(defaults, { [key]: {} }); + } + + Options(defaults[key], value); + } else { + Object.assign(defaults, { [key]: value }); + } + }); + } + + return Options(defaults, ...options); +}; diff --git a/packages/ws/src/utils/shard-util.ts b/packages/ws/src/utils/shard-util.ts deleted file mode 100644 index 43dd934..0000000 --- a/packages/ws/src/utils/shard-util.ts +++ /dev/null @@ -1,18 +0,0 @@ -/** unnecessary */ - -import type { Shard } from '../services/shard'; - -export async function checkOffline( - shard: Shard, - highPriority: boolean -): Promise { - if (!shard.isOpen()) { - await new Promise(resolve => { - if (highPriority) { - shard.offlineSendQueue.unshift(resolve); - } else { - shard.offlineSendQueue.push(resolve); - } - }); - } -}