From e7777c8591aeb611eb1583931db1e9623418eac3 Mon Sep 17 00:00:00 2001 From: socram03 Date: Fri, 15 Dec 2023 14:45:50 -0400 Subject: [PATCH] fix(ws): omg websocket start --- packages/common/src/Util.ts | 6 +- packages/core/src/session.ts | 26 +- packages/helpers/src/components/SelectMenu.ts | 8 +- packages/ws/src/SharedTypes.ts | 3 +- packages/ws/src/constants/index.ts | 19 +- packages/ws/src/discord/heartbeater.ts | 128 ------ packages/ws/src/discord/shard.ts | 426 +++++++++--------- packages/ws/src/discord/sharder.ts | 90 ++-- packages/ws/src/discord/shared.ts | 23 +- packages/ws/src/structures/index.ts | 14 +- packages/ws/src/structures/timeout.ts | 28 ++ 11 files changed, 330 insertions(+), 441 deletions(-) delete mode 100644 packages/ws/src/discord/heartbeater.ts create mode 100644 packages/ws/src/structures/timeout.ts diff --git a/packages/common/src/Util.ts b/packages/common/src/Util.ts index 042620d..fa851e6 100644 --- a/packages/common/src/Util.ts +++ b/packages/common/src/Util.ts @@ -7,7 +7,9 @@ export function isObject(o: any) { export function Options(defaults: any, ...options: any[]): T { const option = options.shift(); - if (!option) return defaults; + if (!option) { + return defaults; + } return Options( { @@ -15,7 +17,7 @@ export function Options(defaults: any, ...options: any[]): T { ...Object.fromEntries( Object.entries(defaults).map(([key, value]) => [ key, - isObject(value) ? Options(value, option?.[key] || {}) : option?.[key] || value + isObject(value) ? Options(value, option?.[key] || {}) : option?.[key] ?? value ]) ) }, diff --git a/packages/core/src/session.ts b/packages/core/src/session.ts index 66e4130..6f2e7fc 100644 --- a/packages/core/src/session.ts +++ b/packages/core/src/session.ts @@ -1,10 +1,10 @@ -import { GatewayIntentBits, Identify, When } from "@biscuitland/common"; -import type { BiscuitRESTOptions, CDNRoutes, Routes } from "@biscuitland/rest"; -import { BiscuitREST, CDN, Router } from "@biscuitland/rest"; -import { GatewayEvents, ShardManager, ShardManagerOptions } from "@biscuitland/ws"; -import EventEmitter2 from "eventemitter2"; -import { MainManager, getBotIdFromToken } from "."; -import { Handler, actionHandler } from "./events/handler"; +import { GatewayIntentBits, Identify, When } from '@biscuitland/common'; +import type { BiscuitRESTOptions, CDNRoutes, Routes } from '@biscuitland/rest'; +import { BiscuitREST, CDN, Router } from '@biscuitland/rest'; +import { GatewayEvents, ShardManager, ShardManagerOptions } from '@biscuitland/ws'; +import EventEmitter2 from 'eventemitter2'; +import { MainManager, getBotIdFromToken } from '.'; +import { Handler, actionHandler } from './events/handler'; export class Session extends EventEmitter2 { constructor(public options: BiscuitOptions) { @@ -67,7 +67,7 @@ export class Session extends EventEmitter2 { if (!rest) { return new BiscuitREST({ token: this.options.token, - ...this.options.defaultRestOptions, + ...this.options.defaultRestOptions }); } @@ -75,7 +75,7 @@ export class Session extends EventEmitter2 { return rest; } - throw new Error("[CORE] REST not found"); + throw new Error('[CORE] REST not found'); } async start() { @@ -92,10 +92,10 @@ export class Session extends EventEmitter2 { // @ts-expect-error actionHandler([ctx, { t, d }, shard]); }, - ...this.options.defaultGatewayOptions, + ...this.options.defaultGatewayOptions }); - ctx.once("READY", (payload) => { + ctx.once('READY', (payload) => { const { user, application } = payload; this.botId = user.id; this.applicationId = application.id; @@ -110,12 +110,12 @@ export class Session extends EventEmitter2 { } } -export type HandlePayload = Pick["handlePayload"]; +export type HandlePayload = Pick['handlePayload']; export interface BiscuitOptions { token: string; intents: number | GatewayIntentBits; rest?: BiscuitREST; defaultRestOptions?: Partial; - defaultGatewayOptions?: Identify>>; + defaultGatewayOptions?: Identify>>; } diff --git a/packages/helpers/src/components/SelectMenu.ts b/packages/helpers/src/components/SelectMenu.ts index 2267dbc..6edcdec 100644 --- a/packages/helpers/src/components/SelectMenu.ts +++ b/packages/helpers/src/components/SelectMenu.ts @@ -9,10 +9,10 @@ import { APIUserSelectComponent, ChannelType, ComponentType, - TypeArray, -} from "@biscuitland/common"; -import { OptionValuesLength } from ".."; -import { BaseComponent } from "./BaseComponent"; + TypeArray +} from '@biscuitland/common'; +import { OptionValuesLength } from '..'; +import { BaseComponent } from './BaseComponent'; class SelectMenu { setCustomId(id: string): this { diff --git a/packages/ws/src/SharedTypes.ts b/packages/ws/src/SharedTypes.ts index e26a653..4742283 100644 --- a/packages/ws/src/SharedTypes.ts +++ b/packages/ws/src/SharedTypes.ts @@ -11,6 +11,7 @@ import type { GatewayAutoModerationActionExecutionDispatchData, GatewayChannelPinsUpdateDispatchData, GatewayChannelUpdateDispatchData, + GatewayDispatchEvents, GatewayGuildBanAddDispatchData, GatewayGuildBanRemoveDispatchData, GatewayGuildCreateDispatchData, @@ -58,8 +59,6 @@ import type { RestToKeys } from '@biscuitland/common'; -import { GatewayDispatchEvents } from '@biscuitland/common'; - /** https://discord.com/developers/docs/topics/gateway-events#update-presence */ export interface StatusUpdate { /** The user's activities */ diff --git a/packages/ws/src/constants/index.ts b/packages/ws/src/constants/index.ts index 4501f98..761dc25 100644 --- a/packages/ws/src/constants/index.ts +++ b/packages/ws/src/constants/index.ts @@ -37,21 +37,4 @@ export interface IdentifyProperties { device: string; } -enum ShardState { - /** Shard is fully connected to the gateway and receiving events from Discord. */ - Connected = 0, - /** Shard started to connect to the gateway. This is only used if the shard is not currently trying to identify or resume. */ - Connecting = 1, - /** Shard got disconnected and reconnection actions have been started. */ - Disconnected = 2, - /** The shard is connected to the gateway but only heartbeating. At this state the shard has not been identified with discord. */ - Unidentified = 3, - /** Shard is trying to identify with the gateway to create a new session. */ - Identifying = 4, - /** Shard is trying to resume a session with the gateway. */ - Resuming = 5, - /** Shard got shut down studied or due to a not (self) fixable error and may not attempt to reconnect on its own. */ - Offline = 6 -} - -export { COMPRESS, ShardManagerDefaults, ShardState, properties }; +export { COMPRESS, ShardManagerDefaults, properties }; diff --git a/packages/ws/src/discord/heartbeater.ts b/packages/ws/src/discord/heartbeater.ts deleted file mode 100644 index aafde7b..0000000 --- a/packages/ws/src/discord/heartbeater.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { GatewayHeartbeatRequest, GatewayHello, GatewayOpcodes, GatewayReceivePayload } from '@biscuitland/common'; -import { Shard } from './shard.js'; -import { ShardSocketCloseCodes } from './shared.js'; - -export interface ShardHeart { - /** Whether or not the heartbeat was acknowledged by Discord in time. */ - ack: boolean; - /** Interval between heartbeats requested by Discord. */ - interval: number; - /** Id of the interval, which is used for sending the heartbeats. */ - intervalId?: NodeJS.Timeout; - /** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */ - lastAck?: number; - /** Unix timestamp (in milliseconds) when the last heartbeat was sent. */ - lastBeat?: number; - /** Round trip time (in milliseconds) from Shard to Discord and back. - * Calculated using the heartbeat system. - * Note: this value is undefined until the first heartbeat to Discord has happened. - */ - rtt?: number; - /** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */ - timeoutId?: NodeJS.Timeout; - /** internal value */ - toString(): string; -} - -export class ShardHeartBeater { - heart: ShardHeart = { - ack: false, - interval: 30_000 - }; - // biome-ignore lint/nursery/noEmptyBlockStatements: - constructor(public shard: Shard) { } - - acknowledge(ack = true) { - this.heart.ack = ack; - } - - handleHeartbeat(_packet: Extract) { - this.shard.logger.debug(`[Shard #${this.shard.id}] received hearbeat event`); - this.heartbeat(false); - } - - /** - * sends a heartbeat whenever its needed - * fails if heart.interval is null - */ - heartbeat(acknowledgeAck: boolean) { - if (acknowledgeAck) { - if (!this.heart.lastAck) { - this.shard.logger.debug(`[Shard #${this.shard.id}] Heartbeat not acknowledged.`); - this.shard.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection, did not receive an heartbeat ACK in time.'); - this.shard.identify(true); - } - this.heart.lastAck = undefined; - } - - this.heart.lastBeat = Date.now(); - - // avoid creating a bucket here - this.shard.websocket?.send( - JSON.stringify({ - op: GatewayOpcodes.Heartbeat, - d: this.shard.data.resumeSeq - }) - ); - } - - stopHeartbeating() { - clearInterval(this.heart.intervalId); - clearTimeout(this.heart.timeoutId); - } - - startHeartBeating() { - this.shard.logger.debug(`[Shard #${this.shard.id}] scheduling heartbeat!`); - - if (!this.shard.isOpen()) return; - - // The first heartbeat needs to be send with a random delay between `0` and `interval` - // Using a `setTimeout(_, jitter)` here to accomplish that. - // `Math.random()` can be `0` so we use `0.5` if this happens - // Reference: https://discord.com/developers/docs/topics/gateway#heartbeating - const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5)); - - this.heart.timeoutId = setTimeout(() => { - // send a heartbeat - this.heartbeat(false); - this.heart.intervalId = setInterval(() => { - this.acknowledge(false); - this.heartbeat(false); - }, this.heart.interval); - }, jitter); - } - - handleHello(packet: GatewayHello) { - if (packet.d.heartbeat_interval > 0) { - if (this.heart.interval != null) { - this.stopHeartbeating(); - } - - this.heart.interval = packet.d.heartbeat_interval; - this.heart.intervalId = setInterval(() => { - this.acknowledge(false); - this.heartbeat(false); - }, this.heart.interval); - } - - this.startHeartBeating(); - - if (this.shard.data.session_id) { - this.shard.resume(); - } else { - this.shard.identify() - } - } - - onpacket(packet: GatewayReceivePayload) { - switch (packet.op) { - case GatewayOpcodes.Heartbeat: - return this.handleHeartbeat(packet); - case GatewayOpcodes.Hello: - return this.handleHello(packet); - case GatewayOpcodes.HeartbeatAck: - this.acknowledge(); - return (this.heart.lastAck = Date.now()); - } - } -} diff --git a/packages/ws/src/discord/shard.ts b/packages/ws/src/discord/shard.ts index fbfc143..bc33af3 100644 --- a/packages/ws/src/discord/shard.ts +++ b/packages/ws/src/discord/shard.ts @@ -1,76 +1,77 @@ -import { - GATEWAY_BASE_URL, - GatewayCloseCodes, - GatewayDispatchEvents, - GatewayDispatchPayload, - GatewayOpcodes, - GatewayReadyDispatchData, - GatewayReceivePayload, - GatewaySendPayload, - type Logger, -} from "@biscuitland/common"; -import { setTimeout as delay } from "node:timers/promises"; -import { inflateSync } from "node:zlib"; -import WS, { WebSocket, type CloseEvent } from "ws"; -import { ShardState, properties } from "../constants"; -import { DynamicBucket, PriorityQueue } from "../structures"; -import { ShardHeartBeater } from "./heartbeater.js"; -import { ShardData, ShardOptions, ShardSocketCloseCodes } from "./shared.js"; +import { inflateSync } from 'node:zlib'; +import type { GatewayReceivePayload, GatewaySendPayload, Logger } from '@biscuitland/common'; +import { GatewayCloseCodes, GatewayDispatchEvents, GatewayOpcodes } from '@biscuitland/common'; +import type WS from 'ws'; +import { type CloseEvent, WebSocket } from 'ws'; +import { properties } from '../constants'; +import { ConnectTimeout, DynamicBucket, PriorityQueue } from '../structures'; +import type { ShardData, ShardOptions } from './shared'; +import { ShardSocketCloseCodes } from './shared'; export class Shard { logger: Logger; - data: Partial | ShardData; + data: Partial | ShardData = { + resumeSeq: null + }; + websocket: WebSocket | null = null; - heartbeater: ShardHeartBeater; + connectTimeout = new ConnectTimeout(); + heart: { + interval: number; + nodeInterval?: NodeJS.Timeout; + lastAck?: number; + lastBeat?: number; + ack: boolean; + } = { + interval: 30e3, + ack: true + }; + bucket: DynamicBucket; offlineSendQueue = new PriorityQueue<(_?: unknown) => void>(); + constructor(public id: number, protected options: ShardOptions) { this.options.ratelimitOptions ??= { rateLimitResetInterval: 60_000, - maxRequestsPerRateLimitTick: 120, + maxRequestsPerRateLimitTick: 120 }; this.logger = options.logger; - this.data = { - resumeSeq: null, - resume_gateway_url: GATEWAY_BASE_URL, - }; - - this.heartbeater = new ShardHeartBeater(this); const safe = this.calculateSafeRequests(); this.bucket = new DynamicBucket({ limit: safe, refillAmount: safe, refillInterval: 6e4, - logger: this.logger, + logger: this.logger }); } - isOpen() { + get latency() { + return this.heart.lastAck && this.heart.lastBeat ? this.heart.lastAck - this.heart.lastBeat : Infinity; + } + + get isOpen() { return this.websocket?.readyState === WebSocket.OPEN; } - /** - * the state of the current shard - */ - get state() { - return this.data.shardState ?? ShardState.Offline; - } - - set state(st: ShardState) { - this.data.shardState = st; - } - get gatewayURL() { - return this.data.resume_gateway_url ?? this.options.info.url; + return this.options.info.url; } - connect() { - if (![ShardState.Resuming, ShardState.Identifying].includes(this.state)) { - this.state = ShardState.Connecting; - } + get resumeGatewayURL() { + return this.data.resume_gateway_url; + } - this.websocket = new WebSocket(this.gatewayURL); + get currentGatewayURL() { + return this.resumeGatewayURL ?? this.options.info.url; + } + + async connect() { + await this.connectTimeout.wait(); + + this.logger.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`); + + this.websocket = new WebSocket(this.currentGatewayURL); this.websocket!.onmessage = (event) => this.handleMessage(event); @@ -78,34 +79,21 @@ export class Shard { this.websocket!.onerror = (event) => this.logger.error(event); - return new Promise((resolve, reject) => { - const timer = setTimeout(reject, 30_000); - this.websocket!.onopen = () => { - if (![ShardState.Resuming, ShardState.Identifying].includes(this.state)) { - this.state = ShardState.Unidentified; - } - - clearTimeout(timer); - resolve(this); - }; - - this.heartbeater = new ShardHeartBeater(this); - }); + this.websocket!.onopen = () => { + this.heart.ack = true; + }; } - checkOffline(priority: number) { - if (!this.isOpen()) { - return new Promise((resolve) => this.offlineSendQueue.push(resolve, priority)); - } - return Promise.resolve(); + async send(priority: number, message: T) { + this.logger.info(`[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify(message.d, null, 1)}`); + await this.checkOffline(priority); + await this.bucket.acquire(priority); + await this.checkOffline(priority); + this.websocket?.send(JSON.stringify(message)); } async identify() { - this.logger.debug(`[Shard #${this.id}] on identify ${this.isOpen()}`); - - this.state = ShardState.Identifying; - - this.send(0, { + await this.send(0, { op: GatewayOpcodes.Identify, d: { token: `Bot ${this.options.token}`, @@ -113,48 +101,168 @@ export class Shard { properties, shard: [this.id, this.options.info.shards], intents: this.options.intents, - }, + presence: this.options.presence + } }); } - reconnect() { - this.heartbeater.stopHeartbeating() - this.disconnect(); - return this.connect(); + get resumable() { + return !!(this.data.resume_gateway_url && this.data.session_id && this.data.resumeSeq !== null); } - resume() { - this.state = ShardState.Resuming; - const data = { - seq: this.data.resumeSeq!, - session_id: this.data.session_id!, - token: `Bot ${this.options.token}`, - }; - return this.send(0, { d: data, op: GatewayOpcodes.Resume }); + async resume() { + await this.send(0, { + op: GatewayOpcodes.Resume, + d: { + seq: this.data.resumeSeq!, + session_id: this.data.session_id!, + token: `Bot ${this.options.token}` + } + }); } - /** - * Send a message to Discord Gateway. - * sets up the buckets aswell for every path - * these buckets are dynamic memory however a good practice is to use 'WebSocket.send' directly - * in simpler terms, do not use where we don't want buckets - */ - async send(priority: number, message: T) { - // Before acquiring a token from the bucket, check whether the shard is currently offline or not. - // Else bucket and token wait time just get wasted. - await this.checkOffline(priority); + async heartbeat(requested: boolean) { + this.logger.debug(`[Shard #${this.id}] Sending ${requested ? '' : 'un'}requested heartbeat (Ack=${this.heart.ack})`); + if (!requested) { + if (!this.heart.ack) { + await this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection'); + return; + } + this.heart.ack = false; + } - // pause the function execution for the bucket to be acquired - await this.bucket.acquire(priority); + this.heart.lastBeat = Date.now(); - // It's possible, that the shard went offline after a token has been acquired from the bucket. - await this.checkOffline(priority); - - // send the payload at last - this.websocket?.send(JSON.stringify(message)); + this.websocket!.send( + JSON.stringify({ + op: GatewayOpcodes.Heartbeat, + d: this.data.resumeSeq ?? null + }) + ); } - protected handleMessage({ data }: WS.MessageEvent) { + async disconnect() { + this.logger.info(`[Shard #${this.id}] Disconnecting`); + await this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request'); + } + + async reconnect() { + this.logger.info(`[Shard #${this.id}] Reconnecting`); + await this.disconnect(); + await this.connect(); + } + + async onpacket(packet: GatewayReceivePayload) { + if (packet.s !== null) { + this.data.resumeSeq = packet.s; + } + + this.logger.debug(`[Shard #${this.id}]`, packet.t ? packet.t : GatewayOpcodes[packet.op], this.data.resumeSeq); + + switch (packet.op) { + case GatewayOpcodes.Hello: + clearInterval(this.heart.nodeInterval); + + this.heart.interval = packet.d.heartbeat_interval; + + // await delay(Math.ceil(this.heart.interval * (Math.random() || 0.5))); + await this.heartbeat(false); + this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval); + + if (this.resumable) { + return this.resume(); + } + await this.identify(); + break; + case GatewayOpcodes.HeartbeatAck: + this.heart.ack = true; + this.heart.lastAck = Date.now(); + break; + case GatewayOpcodes.Heartbeat: + this.heartbeat(true); + break; + case GatewayOpcodes.Reconnect: + await this.reconnect(); + break; + case GatewayOpcodes.InvalidSession: + if (packet.d) { + if (!this.resumable) { + return this.logger.fatal(`[Shard #${this.id}] This is a completely unexpected error message.`); + } + await this.resume(); + } else { + this.data.resumeSeq = 0; + this.data.session_id = undefined; + await this.identify(); + } + break; + case GatewayOpcodes.Dispatch: + switch (packet.t) { + case GatewayDispatchEvents.Resumed: + this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); + break; + case GatewayDispatchEvents.Ready: + this.data.resume_gateway_url = packet.d.resume_gateway_url; + this.data.session_id = packet.d.session_id; + this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); + this.options.handlePayload(this.id, packet); + break; + default: + this.options.handlePayload(this.id, packet); + break; + } + break; + } + } + + protected async handleClosed(close: CloseEvent) { + clearInterval(this.heart.nodeInterval); + this.logger.warn(`[Shard #${this.id}] ${GatewayCloseCodes[close.code] ?? close.code}`); + + switch (close.code) { + case ShardSocketCloseCodes.Shutdown: + break; + case 1000: + case 1001: + case 1006: + case ShardSocketCloseCodes.ZombiedConnection: + case GatewayCloseCodes.UnknownError: + case GatewayCloseCodes.UnknownOpcode: + case GatewayCloseCodes.DecodeError: + case GatewayCloseCodes.NotAuthenticated: + case GatewayCloseCodes.AlreadyAuthenticated: + case GatewayCloseCodes.InvalidSeq: + case GatewayCloseCodes.RateLimited: + case GatewayCloseCodes.SessionTimedOut: + this.logger.info(`[Shard #${this.id}] Trying to reconnect`); + await this.reconnect(); + break; + + case GatewayCloseCodes.AuthenticationFailed: + case GatewayCloseCodes.DisallowedIntents: + case GatewayCloseCodes.InvalidAPIVersion: + case GatewayCloseCodes.InvalidIntents: + case GatewayCloseCodes.InvalidShard: + case GatewayCloseCodes.ShardingRequired: + this.logger.fatal(`[Shard #${this.id}] cannot reconnect`); + break; + + default: + this.logger.warn(`[Shard #${this.id}] Unknown close code, trying to reconnect anyways`); + await this.reconnect(); + break; + } + } + + async close(code: number, reason: string) { + if (this.websocket?.readyState !== WebSocket.OPEN) { + return this.logger.warn(`${new Error('418').stack} [Shard #${this.id}] Is not open`); + } + this.logger.warn(`${new Error('418').stack} [Shard #${this.id}] Called close`); + this.websocket?.close(code, reason); + } + + protected async handleMessage({ data }: WS.MessageEvent) { if (data instanceof Buffer) { data = inflateSync(data); } @@ -165,124 +273,30 @@ export class Shard { * data: "Already authenticated." * } */ - if ((data as string).startsWith("{")) data = JSON.parse(data as string); + if ((data as string).startsWith('{')) { + data = JSON.parse(data as string); + } const packet = data as unknown as GatewayReceivePayload; - // emit other events - this.onpacket(packet); + return this.onpacket(packet); } - async onpacket(packet: GatewayReceivePayload | GatewayDispatchPayload) { - if (packet.s !== null) { - this.data.resumeSeq = packet.s; - } - - this.logger.debug(`[Shard #${this.id}]`, packet.t, packet.op); - - this.heartbeater.onpacket(packet); - - switch (packet.op) { - case GatewayOpcodes.Reconnect: - this.reconnect(); - break; - case GatewayOpcodes.InvalidSession: { - const resumable = packet.d && this.data.session_id - // We need to wait for a random amount of time between 1 and 5 - // Reference: https://discord.com/developers/docs/topics/gateway#resuming - await delay(Math.floor((Math.random() * 4 + 1) * 1000)); - - if (!resumable) { - this.data.resumeSeq = 0; - this.data.session_id = undefined; - await this.connect(); - break; - } - await this.resume(); - break; - } - } - - switch (packet.t) { - case GatewayDispatchEvents.Resumed: - this.state = ShardState.Connected; - this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); - break; - case GatewayDispatchEvents.Ready: { - const payload = packet.d as GatewayReadyDispatchData; - this.data.resume_gateway_url = payload.resume_gateway_url; - this.data.session_id = payload.session_id; - this.state = ShardState.Connected; - this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); - this.options.handlePayload(this.id, packet); - break; - } - default: - this.options.handlePayload(this.id, packet as GatewayDispatchPayload); - break; + checkOffline(priority: number) { + if (!this.isOpen) { + return new Promise((resolve) => this.offlineSendQueue.push(resolve, priority)); } + return Promise.resolve(); } - close(code: number, reason: string) { - if (this.websocket?.readyState !== WebSocket.OPEN) return; - this.websocket?.close(code, reason); - } - - disconnect() { - this.logger.info(`[Shard #${this.id}]`, "Disconnect", ...arguments); - this.close(ShardSocketCloseCodes.Shutdown, "Shard down request"); - this.state = ShardState.Offline; - } - - protected async handleClosed(close: CloseEvent) { - this.heartbeater.stopHeartbeating(); - - switch (close.code) { - case ShardSocketCloseCodes.Shutdown: - case ShardSocketCloseCodes.ReIdentifying: - case ShardSocketCloseCodes.Resharded: - case ShardSocketCloseCodes.ResumeClosingOldConnection: - case ShardSocketCloseCodes.ZombiedConnection: - this.state = ShardState.Disconnected; - return; - - case GatewayCloseCodes.UnknownOpcode: - case GatewayCloseCodes.NotAuthenticated: - case GatewayCloseCodes.InvalidSeq: - case GatewayCloseCodes.RateLimited: - case GatewayCloseCodes.SessionTimedOut: - this.logger.debug(`[Shard #${this.id}] Gateway connection closing requiring re-identify. Code: ${close.code}`); - this.state = ShardState.Identifying; - - this.connect(); - break; - case GatewayCloseCodes.AuthenticationFailed: - case GatewayCloseCodes.InvalidShard: - case GatewayCloseCodes.ShardingRequired: - case GatewayCloseCodes.InvalidAPIVersion: - case GatewayCloseCodes.InvalidIntents: - case GatewayCloseCodes.DisallowedIntents: - this.state = ShardState.Offline; - - throw new Error(close.reason || "Discord gave no reason! GG! You broke Discord!"); - // Gateway connection closes on which a resume is allowed. - default: - this.logger.info(`[Shard #${this.id}] (${close.code}) closed shard #${this.id}. Resuming...`); - this.state = ShardState.Resuming; - - this.disconnect(); - await this.connect(); - } - } - - /** Calculate the amount of requests which can safely be made per rate limit interval, before the gateway gets disconnected due to an exceeded rate limit. */ calculateSafeRequests(): number { - // * 2 adds extra safety layer for discords OP 1 requests that we need to respond to const safeRequests = this.options.ratelimitOptions!.maxRequestsPerRateLimitTick - - Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heartbeater!.heart.interval) * 2; + Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heart.interval) * 2; - if (safeRequests < 0) return 0; + if (safeRequests < 0) { + return 0; + } return safeRequests; } } diff --git a/packages/ws/src/discord/sharder.ts b/packages/ws/src/discord/sharder.ts index 0acf244..029655f 100644 --- a/packages/ws/src/discord/sharder.ts +++ b/packages/ws/src/discord/sharder.ts @@ -1,35 +1,30 @@ -import { - APIGatewayBotInfo, - Collection, - GatewayOpcodes, - GatewayUpdatePresence, - GatewayVoiceStateUpdate, - LogLevels, - Logger, - ObjectToLower, - Options, - toSnakeCase, -} from "@biscuitland/common"; -import { ShardManagerDefaults } from "../constants"; -import { SequentialBucket } from "../structures"; -import { Shard } from "./shard.js"; -import { ShardManagerOptions } from "./shared"; +import type { + APIGatewayBotInfo, + GatewayUpdatePresence, + GatewayVoiceStateUpdate, + // Logger, + ObjectToLower +} from '@biscuitland/common'; +import { Collection, GatewayOpcodes, LogLevels, Logger, Options, toSnakeCase } from '@biscuitland/common'; +import { ShardManagerDefaults } from '../constants'; +import { SequentialBucket } from '../structures'; +import { Shard } from './shard.js'; +import type { ShardManagerOptions } from './shared'; export class ShardManager extends Collection { connectQueue: SequentialBucket; - options: Required; + options: ShardManagerOptions; logger: Logger; constructor(options: ShardManagerOptions) { super(); this.options = Options>(ShardManagerDefaults, options, { info: { shards: options.totalShards } }); - this.connectQueue = new SequentialBucket(this.concurrency); this.logger = new Logger({ active: this.options.debug, - name: "[ShardManager]", - logLevel: LogLevels.Debug, + name: '[ShardManager]', + logLevel: LogLevels.Debug }); } @@ -42,7 +37,7 @@ export class ShardManager extends Collection { } calculeShardId(guildId: string) { - return Number((BigInt(guildId) >> 22n) % BigInt(this.options.totalShards)); + return Number((BigInt(guildId) >> 22n) % BigInt(this.options.totalShards ?? 1)); } spawn(shardId: number) { @@ -57,6 +52,7 @@ export class ShardManager extends Collection { properties: this.options.properties, logger: this.logger, compress: false, + presence: this.options.presence }); this.set(shardId, shard); @@ -67,10 +63,12 @@ export class ShardManager extends Collection { async spawnShards(): Promise { const buckets = this.spawnBuckets(); - this.logger.info("Spawn shards"); + this.logger.info('Spawn shards'); for (const bucket of buckets) { for (const shard of bucket) { - if (!shard) break; + if (!shard) { + break; + } this.logger.info(`${shard.id} add to connect queue`); await this.connectQueue.push(shard.connect.bind(shard)); } @@ -82,10 +80,9 @@ export class ShardManager extends Collection { * https://discord.com/developers/docs/topics/gateway#sharding-max-concurrency */ spawnBuckets(): Shard[][] { - this.logger.info("Preparing buckets"); + this.logger.info('#0 Preparing buckets'); const chunks = SequentialBucket.chunk(new Array(this.options.totalShards), this.concurrency); - - // biome-ignore lint/complexity/noForEach: i mean is the same thing, but we need the index; + // biome-ignore lint/complexity/noForEach: in maps its okay chunks.forEach((arr: any[], index: number) => { for (let i = 0; i < arr.length; i++) { const id = i + (index > 0 ? index * this.concurrency : 0); @@ -107,32 +104,33 @@ export class ShardManager extends Collection { } disconnectAll() { - this.logger.info("Disconnect all shards"); - return new Promise((resolve) => { - // biome-ignore lint/complexity/noForEach: In maps, for each and for of have same performance + this.logger.info('Disconnect all shards'); + return new Promise((_resolve) => { + // biome-ignore lint/complexity/noForEach: in maps its okay this.forEach((shard) => shard.disconnect()); - resolve(null); + _resolve(null); }); } - setShardPresence(shardId: number, payload: GatewayUpdatePresence["d"]) { + setShardPresence(shardId: number, payload: GatewayUpdatePresence['d']) { this.logger.info(`Shard #${shardId} update presence`); return this.get(shardId)?.send(1, { op: GatewayOpcodes.PresenceUpdate, - d: payload, - }); - } - setPresence(payload: GatewayUpdatePresence["d"]): Promise | undefined { - return new Promise((resolve) => { - // biome-ignore lint/complexity/noForEach: In maps, for each and for of have same performance - this.forEach((shard) => { - this.setShardPresence(shard.id, payload); - }, this); - resolve(); + d: payload }); } - joinVoice(guild_id: string, channel_id: string, options: ObjectToLower>) { + setPresence(payload: GatewayUpdatePresence['d']): Promise | undefined { + return new Promise((_resolve) => { + // biome-ignore lint/complexity/noForEach: in maps its okay + this.forEach((_shard) => { + this.setShardPresence(_shard.id, payload); + }, this); + _resolve(); + }); + } + + joinVoice(guild_id: string, channel_id: string, options: ObjectToLower>) { const shardId = this.calculeShardId(guild_id); this.logger.info(`Shard #${shardId} join voice ${channel_id} in ${guild_id}`); @@ -141,8 +139,8 @@ export class ShardManager extends Collection { d: { guild_id, channel_id, - ...toSnakeCase(options), - }, + ...toSnakeCase(options) + } }); } @@ -156,8 +154,8 @@ export class ShardManager extends Collection { guild_id, channel_id: null, self_mute: false, - self_deaf: false, - }, + self_deaf: false + } }); } } diff --git a/packages/ws/src/discord/shared.ts b/packages/ws/src/discord/shared.ts index 220a1f4..bd846ee 100644 --- a/packages/ws/src/discord/shared.ts +++ b/packages/ws/src/discord/shared.ts @@ -1,5 +1,5 @@ -import { APIGatewayBotInfo, GatewayDispatchPayload, GatewayIntentBits, Logger } from '@biscuitland/common'; -import { IdentifyProperties, ShardState } from '../constants'; +import type { APIGatewayBotInfo, GatewayDispatchPayload, GatewayIntentBits, GatewayPresenceUpdateData, Logger } from '@biscuitland/common'; +import type { IdentifyProperties } from '../constants'; export interface ShardManagerOptions extends ShardDetails { /** Important data which is used by the manager to connect shards to the gateway. */ @@ -22,12 +22,10 @@ export interface ShardManagerOptions extends ShardDetails { * wheter to send debug information to the console */ debug?: boolean; + presence?: GatewayPresenceUpdateData; } export interface ShardData { - /** state */ - shardState: ShardState; - /** resume seq to resume connections */ resumeSeq: number | null; @@ -76,21 +74,10 @@ export interface ShardOptions extends ShardDetails { }; logger: Logger; compress: boolean; + presence?: GatewayPresenceUpdateData; } export enum ShardSocketCloseCodes { - /** A regular Shard shutdown. */ Shutdown = 3000, - /** A resume has been requested and therefore the old connection needs to be closed. */ - ResumeClosingOldConnection = 3024, - /** Did not receive a heartbeat ACK in time. - * Closing the shard and creating a new session. - */ - ZombiedConnection = 3010, - /** Discordeno's gateway tests hae been finished, therefore the Shard can be turned off. */ - TestingFinished = 3064, - /** Special close code reserved for Discordeno's zero-downtime resharding system. */ - Resharded = 3065, - /** Shard is re-identifying therefore the old connection needs to be closed. */ - ReIdentifying = 3066 + ZombiedConnection = 3010 } diff --git a/packages/ws/src/structures/index.ts b/packages/ws/src/structures/index.ts index c7bda7b..bd0413b 100644 --- a/packages/ws/src/structures/index.ts +++ b/packages/ws/src/structures/index.ts @@ -1,4 +1,7 @@ -import { Logger, delay } from '@biscuitland/common'; +import type { Logger } from '@biscuitland/common'; +import { delay } from '@biscuitland/common'; + +export * from './timeout'; /** * just any kind of request to queue and resolve later @@ -54,7 +57,9 @@ export class DynamicBucket { } get remaining(): number { - if (this.limit < this.used) return 0; + if (this.limit < this.used) { + return 0; + } return this.limit - this.used; } @@ -119,8 +124,8 @@ export class DynamicBucket { /** Pauses the execution until the request is available to be made. */ async acquire(priority: number): Promise { - return await new Promise((resolve) => { - this.queue.push(resolve, priority); + return await new Promise((_resolve) => { + this.queue.push(_resolve, priority); // biome-ignore lint/complexity/noVoid: void this.processQueue(); }); @@ -233,6 +238,7 @@ export abstract class Queue { public toArray(): T[] { return Array.from(this); } + public toString() { return this.head?.toString() || ''; } diff --git a/packages/ws/src/structures/timeout.ts b/packages/ws/src/structures/timeout.ts new file mode 100644 index 0000000..384060b --- /dev/null +++ b/packages/ws/src/structures/timeout.ts @@ -0,0 +1,28 @@ +export class ConnectTimeout { + promises: { promise: Promise; resolve: (x: boolean) => any }[] = []; + interval?: NodeJS.Timeout = undefined; + // biome-ignore lint/nursery/noEmptyBlockStatements: + constructor(readonly intervalTime = 5000) {} + + wait() { + let resolve = (_x: boolean) => { + // + }; + const promise = new Promise((r) => (resolve = r)); + if (!this.promises.length) { + this.interval = setInterval(() => { + this.shift(); + }, this.intervalTime); + } + this.promises.push({ resolve, promise }); + return promise; + } + + shift() { + this.promises.shift()?.resolve(true); + if (!this.promises.length) { + clearInterval(this.interval); + this.interval = undefined; + } + } +}