fix(ws): omg websocket start

This commit is contained in:
socram03 2023-12-15 14:45:50 -04:00
parent ca116f85ee
commit e7777c8591
11 changed files with 330 additions and 441 deletions

View File

@ -7,7 +7,9 @@ export function isObject(o: any) {
export function Options<T>(defaults: any, ...options: any[]): T { export function Options<T>(defaults: any, ...options: any[]): T {
const option = options.shift(); const option = options.shift();
if (!option) return defaults; if (!option) {
return defaults;
}
return Options( return Options(
{ {
@ -15,7 +17,7 @@ export function Options<T>(defaults: any, ...options: any[]): T {
...Object.fromEntries( ...Object.fromEntries(
Object.entries(defaults).map(([key, value]) => [ Object.entries(defaults).map(([key, value]) => [
key, key,
isObject(value) ? Options(value, option?.[key] || {}) : option?.[key] || value isObject(value) ? Options(value, option?.[key] || {}) : option?.[key] ?? value
]) ])
) )
}, },

View File

@ -1,10 +1,10 @@
import { GatewayIntentBits, Identify, When } from "@biscuitland/common"; import { GatewayIntentBits, Identify, When } from '@biscuitland/common';
import type { BiscuitRESTOptions, CDNRoutes, Routes } from "@biscuitland/rest"; import type { BiscuitRESTOptions, CDNRoutes, Routes } from '@biscuitland/rest';
import { BiscuitREST, CDN, Router } from "@biscuitland/rest"; import { BiscuitREST, CDN, Router } from '@biscuitland/rest';
import { GatewayEvents, ShardManager, ShardManagerOptions } from "@biscuitland/ws"; import { GatewayEvents, ShardManager, ShardManagerOptions } from '@biscuitland/ws';
import EventEmitter2 from "eventemitter2"; import EventEmitter2 from 'eventemitter2';
import { MainManager, getBotIdFromToken } from "."; import { MainManager, getBotIdFromToken } from '.';
import { Handler, actionHandler } from "./events/handler"; import { Handler, actionHandler } from './events/handler';
export class Session<On extends boolean = boolean> extends EventEmitter2 { export class Session<On extends boolean = boolean> extends EventEmitter2 {
constructor(public options: BiscuitOptions) { constructor(public options: BiscuitOptions) {
@ -67,7 +67,7 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
if (!rest) { if (!rest) {
return new BiscuitREST({ return new BiscuitREST({
token: this.options.token, token: this.options.token,
...this.options.defaultRestOptions, ...this.options.defaultRestOptions
}); });
} }
@ -75,7 +75,7 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
return rest; return rest;
} }
throw new Error("[CORE] REST not found"); throw new Error('[CORE] REST not found');
} }
async start() { async start() {
@ -92,10 +92,10 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
// @ts-expect-error // @ts-expect-error
actionHandler([ctx, { t, d }, shard]); actionHandler([ctx, { t, d }, shard]);
}, },
...this.options.defaultGatewayOptions, ...this.options.defaultGatewayOptions
}); });
ctx.once("READY", (payload) => { ctx.once('READY', (payload) => {
const { user, application } = payload; const { user, application } = payload;
this.botId = user.id; this.botId = user.id;
this.applicationId = application.id; this.applicationId = application.id;
@ -110,12 +110,12 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
} }
} }
export type HandlePayload = Pick<ShardManagerOptions, "handlePayload">["handlePayload"]; export type HandlePayload = Pick<ShardManagerOptions, 'handlePayload'>['handlePayload'];
export interface BiscuitOptions { export interface BiscuitOptions {
token: string; token: string;
intents: number | GatewayIntentBits; intents: number | GatewayIntentBits;
rest?: BiscuitREST; rest?: BiscuitREST;
defaultRestOptions?: Partial<BiscuitRESTOptions>; defaultRestOptions?: Partial<BiscuitRESTOptions>;
defaultGatewayOptions?: Identify<Partial<Omit<ShardManagerOptions, "token" | "intents">>>; defaultGatewayOptions?: Identify<Partial<Omit<ShardManagerOptions, 'token' | 'intents'>>>;
} }

View File

@ -9,10 +9,10 @@ import {
APIUserSelectComponent, APIUserSelectComponent,
ChannelType, ChannelType,
ComponentType, ComponentType,
TypeArray, TypeArray
} from "@biscuitland/common"; } from '@biscuitland/common';
import { OptionValuesLength } from ".."; import { OptionValuesLength } from '..';
import { BaseComponent } from "./BaseComponent"; import { BaseComponent } from './BaseComponent';
class SelectMenu<Select extends APISelectMenuComponent = APISelectMenuComponent,> extends BaseComponent<Select> { class SelectMenu<Select extends APISelectMenuComponent = APISelectMenuComponent,> extends BaseComponent<Select> {
setCustomId(id: string): this { setCustomId(id: string): this {

View File

@ -11,6 +11,7 @@ import type {
GatewayAutoModerationActionExecutionDispatchData, GatewayAutoModerationActionExecutionDispatchData,
GatewayChannelPinsUpdateDispatchData, GatewayChannelPinsUpdateDispatchData,
GatewayChannelUpdateDispatchData, GatewayChannelUpdateDispatchData,
GatewayDispatchEvents,
GatewayGuildBanAddDispatchData, GatewayGuildBanAddDispatchData,
GatewayGuildBanRemoveDispatchData, GatewayGuildBanRemoveDispatchData,
GatewayGuildCreateDispatchData, GatewayGuildCreateDispatchData,
@ -58,8 +59,6 @@ import type {
RestToKeys RestToKeys
} from '@biscuitland/common'; } from '@biscuitland/common';
import { GatewayDispatchEvents } from '@biscuitland/common';
/** https://discord.com/developers/docs/topics/gateway-events#update-presence */ /** https://discord.com/developers/docs/topics/gateway-events#update-presence */
export interface StatusUpdate { export interface StatusUpdate {
/** The user's activities */ /** The user's activities */

View File

@ -37,21 +37,4 @@ export interface IdentifyProperties {
device: string; device: string;
} }
enum ShardState { export { COMPRESS, ShardManagerDefaults, properties };
/** Shard is fully connected to the gateway and receiving events from Discord. */
Connected = 0,
/** Shard started to connect to the gateway. This is only used if the shard is not currently trying to identify or resume. */
Connecting = 1,
/** Shard got disconnected and reconnection actions have been started. */
Disconnected = 2,
/** The shard is connected to the gateway but only heartbeating. At this state the shard has not been identified with discord. */
Unidentified = 3,
/** Shard is trying to identify with the gateway to create a new session. */
Identifying = 4,
/** Shard is trying to resume a session with the gateway. */
Resuming = 5,
/** Shard got shut down studied or due to a not (self) fixable error and may not attempt to reconnect on its own. */
Offline = 6
}
export { COMPRESS, ShardManagerDefaults, ShardState, properties };

View File

@ -1,128 +0,0 @@
import { GatewayHeartbeatRequest, GatewayHello, GatewayOpcodes, GatewayReceivePayload } from '@biscuitland/common';
import { Shard } from './shard.js';
import { ShardSocketCloseCodes } from './shared.js';
export interface ShardHeart {
/** Whether or not the heartbeat was acknowledged by Discord in time. */
ack: boolean;
/** Interval between heartbeats requested by Discord. */
interval: number;
/** Id of the interval, which is used for sending the heartbeats. */
intervalId?: NodeJS.Timeout;
/** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */
lastAck?: number;
/** Unix timestamp (in milliseconds) when the last heartbeat was sent. */
lastBeat?: number;
/** Round trip time (in milliseconds) from Shard to Discord and back.
* Calculated using the heartbeat system.
* Note: this value is undefined until the first heartbeat to Discord has happened.
*/
rtt?: number;
/** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */
timeoutId?: NodeJS.Timeout;
/** internal value */
toString(): string;
}
export class ShardHeartBeater {
heart: ShardHeart = {
ack: false,
interval: 30_000
};
// biome-ignore lint/nursery/noEmptyBlockStatements: <explanation>
constructor(public shard: Shard) { }
acknowledge(ack = true) {
this.heart.ack = ack;
}
handleHeartbeat(_packet: Extract<GatewayReceivePayload, GatewayHeartbeatRequest>) {
this.shard.logger.debug(`[Shard #${this.shard.id}] received hearbeat event`);
this.heartbeat(false);
}
/**
* sends a heartbeat whenever its needed
* fails if heart.interval is null
*/
heartbeat(acknowledgeAck: boolean) {
if (acknowledgeAck) {
if (!this.heart.lastAck) {
this.shard.logger.debug(`[Shard #${this.shard.id}] Heartbeat not acknowledged.`);
this.shard.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection, did not receive an heartbeat ACK in time.');
this.shard.identify(true);
}
this.heart.lastAck = undefined;
}
this.heart.lastBeat = Date.now();
// avoid creating a bucket here
this.shard.websocket?.send(
JSON.stringify({
op: GatewayOpcodes.Heartbeat,
d: this.shard.data.resumeSeq
})
);
}
stopHeartbeating() {
clearInterval(this.heart.intervalId);
clearTimeout(this.heart.timeoutId);
}
startHeartBeating() {
this.shard.logger.debug(`[Shard #${this.shard.id}] scheduling heartbeat!`);
if (!this.shard.isOpen()) return;
// The first heartbeat needs to be send with a random delay between `0` and `interval`
// Using a `setTimeout(_, jitter)` here to accomplish that.
// `Math.random()` can be `0` so we use `0.5` if this happens
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating
const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5));
this.heart.timeoutId = setTimeout(() => {
// send a heartbeat
this.heartbeat(false);
this.heart.intervalId = setInterval(() => {
this.acknowledge(false);
this.heartbeat(false);
}, this.heart.interval);
}, jitter);
}
handleHello(packet: GatewayHello) {
if (packet.d.heartbeat_interval > 0) {
if (this.heart.interval != null) {
this.stopHeartbeating();
}
this.heart.interval = packet.d.heartbeat_interval;
this.heart.intervalId = setInterval(() => {
this.acknowledge(false);
this.heartbeat(false);
}, this.heart.interval);
}
this.startHeartBeating();
if (this.shard.data.session_id) {
this.shard.resume();
} else {
this.shard.identify()
}
}
onpacket(packet: GatewayReceivePayload) {
switch (packet.op) {
case GatewayOpcodes.Heartbeat:
return this.handleHeartbeat(packet);
case GatewayOpcodes.Hello:
return this.handleHello(packet);
case GatewayOpcodes.HeartbeatAck:
this.acknowledge();
return (this.heart.lastAck = Date.now());
}
}
}

View File

@ -1,76 +1,77 @@
import { import { inflateSync } from 'node:zlib';
GATEWAY_BASE_URL, import type { GatewayReceivePayload, GatewaySendPayload, Logger } from '@biscuitland/common';
GatewayCloseCodes, import { GatewayCloseCodes, GatewayDispatchEvents, GatewayOpcodes } from '@biscuitland/common';
GatewayDispatchEvents, import type WS from 'ws';
GatewayDispatchPayload, import { type CloseEvent, WebSocket } from 'ws';
GatewayOpcodes, import { properties } from '../constants';
GatewayReadyDispatchData, import { ConnectTimeout, DynamicBucket, PriorityQueue } from '../structures';
GatewayReceivePayload, import type { ShardData, ShardOptions } from './shared';
GatewaySendPayload, import { ShardSocketCloseCodes } from './shared';
type Logger,
} from "@biscuitland/common";
import { setTimeout as delay } from "node:timers/promises";
import { inflateSync } from "node:zlib";
import WS, { WebSocket, type CloseEvent } from "ws";
import { ShardState, properties } from "../constants";
import { DynamicBucket, PriorityQueue } from "../structures";
import { ShardHeartBeater } from "./heartbeater.js";
import { ShardData, ShardOptions, ShardSocketCloseCodes } from "./shared.js";
export class Shard { export class Shard {
logger: Logger; logger: Logger;
data: Partial<ShardData> | ShardData; data: Partial<ShardData> | ShardData = {
resumeSeq: null
};
websocket: WebSocket | null = null; websocket: WebSocket | null = null;
heartbeater: ShardHeartBeater; connectTimeout = new ConnectTimeout();
heart: {
interval: number;
nodeInterval?: NodeJS.Timeout;
lastAck?: number;
lastBeat?: number;
ack: boolean;
} = {
interval: 30e3,
ack: true
};
bucket: DynamicBucket; bucket: DynamicBucket;
offlineSendQueue = new PriorityQueue<(_?: unknown) => void>(); offlineSendQueue = new PriorityQueue<(_?: unknown) => void>();
constructor(public id: number, protected options: ShardOptions) { constructor(public id: number, protected options: ShardOptions) {
this.options.ratelimitOptions ??= { this.options.ratelimitOptions ??= {
rateLimitResetInterval: 60_000, rateLimitResetInterval: 60_000,
maxRequestsPerRateLimitTick: 120, maxRequestsPerRateLimitTick: 120
}; };
this.logger = options.logger; this.logger = options.logger;
this.data = {
resumeSeq: null,
resume_gateway_url: GATEWAY_BASE_URL,
};
this.heartbeater = new ShardHeartBeater(this);
const safe = this.calculateSafeRequests(); const safe = this.calculateSafeRequests();
this.bucket = new DynamicBucket({ this.bucket = new DynamicBucket({
limit: safe, limit: safe,
refillAmount: safe, refillAmount: safe,
refillInterval: 6e4, refillInterval: 6e4,
logger: this.logger, logger: this.logger
}); });
} }
isOpen() { get latency() {
return this.heart.lastAck && this.heart.lastBeat ? this.heart.lastAck - this.heart.lastBeat : Infinity;
}
get isOpen() {
return this.websocket?.readyState === WebSocket.OPEN; return this.websocket?.readyState === WebSocket.OPEN;
} }
/**
* the state of the current shard
*/
get state() {
return this.data.shardState ?? ShardState.Offline;
}
set state(st: ShardState) {
this.data.shardState = st;
}
get gatewayURL() { get gatewayURL() {
return this.data.resume_gateway_url ?? this.options.info.url; return this.options.info.url;
} }
connect() { get resumeGatewayURL() {
if (![ShardState.Resuming, ShardState.Identifying].includes(this.state)) { return this.data.resume_gateway_url;
this.state = ShardState.Connecting;
} }
this.websocket = new WebSocket(this.gatewayURL); get currentGatewayURL() {
return this.resumeGatewayURL ?? this.options.info.url;
}
async connect() {
await this.connectTimeout.wait();
this.logger.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`);
this.websocket = new WebSocket(this.currentGatewayURL);
this.websocket!.onmessage = (event) => this.handleMessage(event); this.websocket!.onmessage = (event) => this.handleMessage(event);
@ -78,34 +79,21 @@ export class Shard {
this.websocket!.onerror = (event) => this.logger.error(event); this.websocket!.onerror = (event) => this.logger.error(event);
return new Promise<Shard>((resolve, reject) => {
const timer = setTimeout(reject, 30_000);
this.websocket!.onopen = () => { this.websocket!.onopen = () => {
if (![ShardState.Resuming, ShardState.Identifying].includes(this.state)) { this.heart.ack = true;
this.state = ShardState.Unidentified;
}
clearTimeout(timer);
resolve(this);
}; };
this.heartbeater = new ShardHeartBeater(this);
});
} }
checkOffline(priority: number) { async send<T extends GatewaySendPayload = GatewaySendPayload>(priority: number, message: T) {
if (!this.isOpen()) { this.logger.info(`[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify(message.d, null, 1)}`);
return new Promise((resolve) => this.offlineSendQueue.push(resolve, priority)); await this.checkOffline(priority);
} await this.bucket.acquire(priority);
return Promise.resolve(); await this.checkOffline(priority);
this.websocket?.send(JSON.stringify(message));
} }
async identify() { async identify() {
this.logger.debug(`[Shard #${this.id}] on identify ${this.isOpen()}`); await this.send(0, {
this.state = ShardState.Identifying;
this.send(0, {
op: GatewayOpcodes.Identify, op: GatewayOpcodes.Identify,
d: { d: {
token: `Bot ${this.options.token}`, token: `Bot ${this.options.token}`,
@ -113,48 +101,168 @@ export class Shard {
properties, properties,
shard: [this.id, this.options.info.shards], shard: [this.id, this.options.info.shards],
intents: this.options.intents, intents: this.options.intents,
}, presence: this.options.presence
}
}); });
} }
reconnect() { get resumable() {
this.heartbeater.stopHeartbeating() return !!(this.data.resume_gateway_url && this.data.session_id && this.data.resumeSeq !== null);
this.disconnect();
return this.connect();
} }
resume() { async resume() {
this.state = ShardState.Resuming; await this.send(0, {
const data = { op: GatewayOpcodes.Resume,
d: {
seq: this.data.resumeSeq!, seq: this.data.resumeSeq!,
session_id: this.data.session_id!, session_id: this.data.session_id!,
token: `Bot ${this.options.token}`, token: `Bot ${this.options.token}`
}; }
return this.send(0, { d: data, op: GatewayOpcodes.Resume }); });
} }
/** async heartbeat(requested: boolean) {
* Send a message to Discord Gateway. this.logger.debug(`[Shard #${this.id}] Sending ${requested ? '' : 'un'}requested heartbeat (Ack=${this.heart.ack})`);
* sets up the buckets aswell for every path if (!requested) {
* these buckets are dynamic memory however a good practice is to use 'WebSocket.send' directly if (!this.heart.ack) {
* in simpler terms, do not use where we don't want buckets await this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection');
*/ return;
async send<T extends GatewaySendPayload = GatewaySendPayload>(priority: number, message: T) { }
// Before acquiring a token from the bucket, check whether the shard is currently offline or not. this.heart.ack = false;
// Else bucket and token wait time just get wasted.
await this.checkOffline(priority);
// pause the function execution for the bucket to be acquired
await this.bucket.acquire(priority);
// It's possible, that the shard went offline after a token has been acquired from the bucket.
await this.checkOffline(priority);
// send the payload at last
this.websocket?.send(JSON.stringify(message));
} }
protected handleMessage({ data }: WS.MessageEvent) { this.heart.lastBeat = Date.now();
this.websocket!.send(
JSON.stringify({
op: GatewayOpcodes.Heartbeat,
d: this.data.resumeSeq ?? null
})
);
}
async disconnect() {
this.logger.info(`[Shard #${this.id}] Disconnecting`);
await this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request');
}
async reconnect() {
this.logger.info(`[Shard #${this.id}] Reconnecting`);
await this.disconnect();
await this.connect();
}
async onpacket(packet: GatewayReceivePayload) {
if (packet.s !== null) {
this.data.resumeSeq = packet.s;
}
this.logger.debug(`[Shard #${this.id}]`, packet.t ? packet.t : GatewayOpcodes[packet.op], this.data.resumeSeq);
switch (packet.op) {
case GatewayOpcodes.Hello:
clearInterval(this.heart.nodeInterval);
this.heart.interval = packet.d.heartbeat_interval;
// await delay(Math.ceil(this.heart.interval * (Math.random() || 0.5)));
await this.heartbeat(false);
this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval);
if (this.resumable) {
return this.resume();
}
await this.identify();
break;
case GatewayOpcodes.HeartbeatAck:
this.heart.ack = true;
this.heart.lastAck = Date.now();
break;
case GatewayOpcodes.Heartbeat:
this.heartbeat(true);
break;
case GatewayOpcodes.Reconnect:
await this.reconnect();
break;
case GatewayOpcodes.InvalidSession:
if (packet.d) {
if (!this.resumable) {
return this.logger.fatal(`[Shard #${this.id}] This is a completely unexpected error message.`);
}
await this.resume();
} else {
this.data.resumeSeq = 0;
this.data.session_id = undefined;
await this.identify();
}
break;
case GatewayOpcodes.Dispatch:
switch (packet.t) {
case GatewayDispatchEvents.Resumed:
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
break;
case GatewayDispatchEvents.Ready:
this.data.resume_gateway_url = packet.d.resume_gateway_url;
this.data.session_id = packet.d.session_id;
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
this.options.handlePayload(this.id, packet);
break;
default:
this.options.handlePayload(this.id, packet);
break;
}
break;
}
}
protected async handleClosed(close: CloseEvent) {
clearInterval(this.heart.nodeInterval);
this.logger.warn(`[Shard #${this.id}] ${GatewayCloseCodes[close.code] ?? close.code}`);
switch (close.code) {
case ShardSocketCloseCodes.Shutdown:
break;
case 1000:
case 1001:
case 1006:
case ShardSocketCloseCodes.ZombiedConnection:
case GatewayCloseCodes.UnknownError:
case GatewayCloseCodes.UnknownOpcode:
case GatewayCloseCodes.DecodeError:
case GatewayCloseCodes.NotAuthenticated:
case GatewayCloseCodes.AlreadyAuthenticated:
case GatewayCloseCodes.InvalidSeq:
case GatewayCloseCodes.RateLimited:
case GatewayCloseCodes.SessionTimedOut:
this.logger.info(`[Shard #${this.id}] Trying to reconnect`);
await this.reconnect();
break;
case GatewayCloseCodes.AuthenticationFailed:
case GatewayCloseCodes.DisallowedIntents:
case GatewayCloseCodes.InvalidAPIVersion:
case GatewayCloseCodes.InvalidIntents:
case GatewayCloseCodes.InvalidShard:
case GatewayCloseCodes.ShardingRequired:
this.logger.fatal(`[Shard #${this.id}] cannot reconnect`);
break;
default:
this.logger.warn(`[Shard #${this.id}] Unknown close code, trying to reconnect anyways`);
await this.reconnect();
break;
}
}
async close(code: number, reason: string) {
if (this.websocket?.readyState !== WebSocket.OPEN) {
return this.logger.warn(`${new Error('418').stack} [Shard #${this.id}] Is not open`);
}
this.logger.warn(`${new Error('418').stack} [Shard #${this.id}] Called close`);
this.websocket?.close(code, reason);
}
protected async handleMessage({ data }: WS.MessageEvent) {
if (data instanceof Buffer) { if (data instanceof Buffer) {
data = inflateSync(data); data = inflateSync(data);
} }
@ -165,124 +273,30 @@ export class Shard {
* data: "Already authenticated." * data: "Already authenticated."
* } * }
*/ */
if ((data as string).startsWith("{")) data = JSON.parse(data as string); if ((data as string).startsWith('{')) {
data = JSON.parse(data as string);
}
const packet = data as unknown as GatewayReceivePayload; const packet = data as unknown as GatewayReceivePayload;
// emit other events return this.onpacket(packet);
this.onpacket(packet);
} }
async onpacket(packet: GatewayReceivePayload | GatewayDispatchPayload) { checkOffline(priority: number) {
if (packet.s !== null) { if (!this.isOpen) {
this.data.resumeSeq = packet.s; return new Promise((resolve) => this.offlineSendQueue.push(resolve, priority));
}
return Promise.resolve();
} }
this.logger.debug(`[Shard #${this.id}]`, packet.t, packet.op);
this.heartbeater.onpacket(packet);
switch (packet.op) {
case GatewayOpcodes.Reconnect:
this.reconnect();
break;
case GatewayOpcodes.InvalidSession: {
const resumable = packet.d && this.data.session_id
// We need to wait for a random amount of time between 1 and 5
// Reference: https://discord.com/developers/docs/topics/gateway#resuming
await delay(Math.floor((Math.random() * 4 + 1) * 1000));
if (!resumable) {
this.data.resumeSeq = 0;
this.data.session_id = undefined;
await this.connect();
break;
}
await this.resume();
break;
}
}
switch (packet.t) {
case GatewayDispatchEvents.Resumed:
this.state = ShardState.Connected;
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
break;
case GatewayDispatchEvents.Ready: {
const payload = packet.d as GatewayReadyDispatchData;
this.data.resume_gateway_url = payload.resume_gateway_url;
this.data.session_id = payload.session_id;
this.state = ShardState.Connected;
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
this.options.handlePayload(this.id, packet);
break;
}
default:
this.options.handlePayload(this.id, packet as GatewayDispatchPayload);
break;
}
}
close(code: number, reason: string) {
if (this.websocket?.readyState !== WebSocket.OPEN) return;
this.websocket?.close(code, reason);
}
disconnect() {
this.logger.info(`[Shard #${this.id}]`, "Disconnect", ...arguments);
this.close(ShardSocketCloseCodes.Shutdown, "Shard down request");
this.state = ShardState.Offline;
}
protected async handleClosed(close: CloseEvent) {
this.heartbeater.stopHeartbeating();
switch (close.code) {
case ShardSocketCloseCodes.Shutdown:
case ShardSocketCloseCodes.ReIdentifying:
case ShardSocketCloseCodes.Resharded:
case ShardSocketCloseCodes.ResumeClosingOldConnection:
case ShardSocketCloseCodes.ZombiedConnection:
this.state = ShardState.Disconnected;
return;
case GatewayCloseCodes.UnknownOpcode:
case GatewayCloseCodes.NotAuthenticated:
case GatewayCloseCodes.InvalidSeq:
case GatewayCloseCodes.RateLimited:
case GatewayCloseCodes.SessionTimedOut:
this.logger.debug(`[Shard #${this.id}] Gateway connection closing requiring re-identify. Code: ${close.code}`);
this.state = ShardState.Identifying;
this.connect();
break;
case GatewayCloseCodes.AuthenticationFailed:
case GatewayCloseCodes.InvalidShard:
case GatewayCloseCodes.ShardingRequired:
case GatewayCloseCodes.InvalidAPIVersion:
case GatewayCloseCodes.InvalidIntents:
case GatewayCloseCodes.DisallowedIntents:
this.state = ShardState.Offline;
throw new Error(close.reason || "Discord gave no reason! GG! You broke Discord!");
// Gateway connection closes on which a resume is allowed.
default:
this.logger.info(`[Shard #${this.id}] (${close.code}) closed shard #${this.id}. Resuming...`);
this.state = ShardState.Resuming;
this.disconnect();
await this.connect();
}
}
/** Calculate the amount of requests which can safely be made per rate limit interval, before the gateway gets disconnected due to an exceeded rate limit. */
calculateSafeRequests(): number { calculateSafeRequests(): number {
// * 2 adds extra safety layer for discords OP 1 requests that we need to respond to
const safeRequests = const safeRequests =
this.options.ratelimitOptions!.maxRequestsPerRateLimitTick - this.options.ratelimitOptions!.maxRequestsPerRateLimitTick -
Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heartbeater!.heart.interval) * 2; Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heart.interval) * 2;
if (safeRequests < 0) return 0; if (safeRequests < 0) {
return 0;
}
return safeRequests; return safeRequests;
} }
} }

View File

@ -1,35 +1,30 @@
import { import type {
APIGatewayBotInfo, APIGatewayBotInfo,
Collection,
GatewayOpcodes,
GatewayUpdatePresence, GatewayUpdatePresence,
GatewayVoiceStateUpdate, GatewayVoiceStateUpdate,
LogLevels, // Logger,
Logger, ObjectToLower
ObjectToLower, } from '@biscuitland/common';
Options, import { Collection, GatewayOpcodes, LogLevels, Logger, Options, toSnakeCase } from '@biscuitland/common';
toSnakeCase, import { ShardManagerDefaults } from '../constants';
} from "@biscuitland/common"; import { SequentialBucket } from '../structures';
import { ShardManagerDefaults } from "../constants"; import { Shard } from './shard.js';
import { SequentialBucket } from "../structures"; import type { ShardManagerOptions } from './shared';
import { Shard } from "./shard.js";
import { ShardManagerOptions } from "./shared";
export class ShardManager extends Collection<number, Shard> { export class ShardManager extends Collection<number, Shard> {
connectQueue: SequentialBucket; connectQueue: SequentialBucket;
options: Required<ShardManagerOptions>; options: ShardManagerOptions;
logger: Logger; logger: Logger;
constructor(options: ShardManagerOptions) { constructor(options: ShardManagerOptions) {
super(); super();
this.options = Options<Required<ShardManagerOptions>>(ShardManagerDefaults, options, { info: { shards: options.totalShards } }); this.options = Options<Required<ShardManagerOptions>>(ShardManagerDefaults, options, { info: { shards: options.totalShards } });
this.connectQueue = new SequentialBucket(this.concurrency); this.connectQueue = new SequentialBucket(this.concurrency);
this.logger = new Logger({ this.logger = new Logger({
active: this.options.debug, active: this.options.debug,
name: "[ShardManager]", name: '[ShardManager]',
logLevel: LogLevels.Debug, logLevel: LogLevels.Debug
}); });
} }
@ -42,7 +37,7 @@ export class ShardManager extends Collection<number, Shard> {
} }
calculeShardId(guildId: string) { calculeShardId(guildId: string) {
return Number((BigInt(guildId) >> 22n) % BigInt(this.options.totalShards)); return Number((BigInt(guildId) >> 22n) % BigInt(this.options.totalShards ?? 1));
} }
spawn(shardId: number) { spawn(shardId: number) {
@ -57,6 +52,7 @@ export class ShardManager extends Collection<number, Shard> {
properties: this.options.properties, properties: this.options.properties,
logger: this.logger, logger: this.logger,
compress: false, compress: false,
presence: this.options.presence
}); });
this.set(shardId, shard); this.set(shardId, shard);
@ -67,10 +63,12 @@ export class ShardManager extends Collection<number, Shard> {
async spawnShards(): Promise<void> { async spawnShards(): Promise<void> {
const buckets = this.spawnBuckets(); const buckets = this.spawnBuckets();
this.logger.info("Spawn shards"); this.logger.info('Spawn shards');
for (const bucket of buckets) { for (const bucket of buckets) {
for (const shard of bucket) { for (const shard of bucket) {
if (!shard) break; if (!shard) {
break;
}
this.logger.info(`${shard.id} add to connect queue`); this.logger.info(`${shard.id} add to connect queue`);
await this.connectQueue.push(shard.connect.bind(shard)); await this.connectQueue.push(shard.connect.bind(shard));
} }
@ -82,10 +80,9 @@ export class ShardManager extends Collection<number, Shard> {
* https://discord.com/developers/docs/topics/gateway#sharding-max-concurrency * https://discord.com/developers/docs/topics/gateway#sharding-max-concurrency
*/ */
spawnBuckets(): Shard[][] { spawnBuckets(): Shard[][] {
this.logger.info("Preparing buckets"); this.logger.info('#0 Preparing buckets');
const chunks = SequentialBucket.chunk(new Array(this.options.totalShards), this.concurrency); const chunks = SequentialBucket.chunk(new Array(this.options.totalShards), this.concurrency);
// biome-ignore lint/complexity/noForEach: in maps its okay
// biome-ignore lint/complexity/noForEach: i mean is the same thing, but we need the index;
chunks.forEach((arr: any[], index: number) => { chunks.forEach((arr: any[], index: number) => {
for (let i = 0; i < arr.length; i++) { for (let i = 0; i < arr.length; i++) {
const id = i + (index > 0 ? index * this.concurrency : 0); const id = i + (index > 0 ? index * this.concurrency : 0);
@ -107,32 +104,33 @@ export class ShardManager extends Collection<number, Shard> {
} }
disconnectAll() { disconnectAll() {
this.logger.info("Disconnect all shards"); this.logger.info('Disconnect all shards');
return new Promise((resolve) => { return new Promise((_resolve) => {
// biome-ignore lint/complexity/noForEach: In maps, for each and for of have same performance // biome-ignore lint/complexity/noForEach: in maps its okay
this.forEach((shard) => shard.disconnect()); this.forEach((shard) => shard.disconnect());
resolve(null); _resolve(null);
}); });
} }
setShardPresence(shardId: number, payload: GatewayUpdatePresence["d"]) { setShardPresence(shardId: number, payload: GatewayUpdatePresence['d']) {
this.logger.info(`Shard #${shardId} update presence`); this.logger.info(`Shard #${shardId} update presence`);
return this.get(shardId)?.send<GatewayUpdatePresence>(1, { return this.get(shardId)?.send<GatewayUpdatePresence>(1, {
op: GatewayOpcodes.PresenceUpdate, op: GatewayOpcodes.PresenceUpdate,
d: payload, d: payload
});
}
setPresence(payload: GatewayUpdatePresence["d"]): Promise<void> | undefined {
return new Promise((resolve) => {
// biome-ignore lint/complexity/noForEach: In maps, for each and for of have same performance
this.forEach((shard) => {
this.setShardPresence(shard.id, payload);
}, this);
resolve();
}); });
} }
joinVoice(guild_id: string, channel_id: string, options: ObjectToLower<Pick<GatewayVoiceStateUpdate["d"], "self_deaf" | "self_mute">>) { setPresence(payload: GatewayUpdatePresence['d']): Promise<void> | undefined {
return new Promise((_resolve) => {
// biome-ignore lint/complexity/noForEach: in maps its okay
this.forEach((_shard) => {
this.setShardPresence(_shard.id, payload);
}, this);
_resolve();
});
}
joinVoice(guild_id: string, channel_id: string, options: ObjectToLower<Pick<GatewayVoiceStateUpdate['d'], 'self_deaf' | 'self_mute'>>) {
const shardId = this.calculeShardId(guild_id); const shardId = this.calculeShardId(guild_id);
this.logger.info(`Shard #${shardId} join voice ${channel_id} in ${guild_id}`); this.logger.info(`Shard #${shardId} join voice ${channel_id} in ${guild_id}`);
@ -141,8 +139,8 @@ export class ShardManager extends Collection<number, Shard> {
d: { d: {
guild_id, guild_id,
channel_id, channel_id,
...toSnakeCase(options), ...toSnakeCase(options)
}, }
}); });
} }
@ -156,8 +154,8 @@ export class ShardManager extends Collection<number, Shard> {
guild_id, guild_id,
channel_id: null, channel_id: null,
self_mute: false, self_mute: false,
self_deaf: false, self_deaf: false
}, }
}); });
} }
} }

View File

@ -1,5 +1,5 @@
import { APIGatewayBotInfo, GatewayDispatchPayload, GatewayIntentBits, Logger } from '@biscuitland/common'; import type { APIGatewayBotInfo, GatewayDispatchPayload, GatewayIntentBits, GatewayPresenceUpdateData, Logger } from '@biscuitland/common';
import { IdentifyProperties, ShardState } from '../constants'; import type { IdentifyProperties } from '../constants';
export interface ShardManagerOptions extends ShardDetails { export interface ShardManagerOptions extends ShardDetails {
/** Important data which is used by the manager to connect shards to the gateway. */ /** Important data which is used by the manager to connect shards to the gateway. */
@ -22,12 +22,10 @@ export interface ShardManagerOptions extends ShardDetails {
* wheter to send debug information to the console * wheter to send debug information to the console
*/ */
debug?: boolean; debug?: boolean;
presence?: GatewayPresenceUpdateData;
} }
export interface ShardData { export interface ShardData {
/** state */
shardState: ShardState;
/** resume seq to resume connections */ /** resume seq to resume connections */
resumeSeq: number | null; resumeSeq: number | null;
@ -76,21 +74,10 @@ export interface ShardOptions extends ShardDetails {
}; };
logger: Logger; logger: Logger;
compress: boolean; compress: boolean;
presence?: GatewayPresenceUpdateData;
} }
export enum ShardSocketCloseCodes { export enum ShardSocketCloseCodes {
/** A regular Shard shutdown. */
Shutdown = 3000, Shutdown = 3000,
/** A resume has been requested and therefore the old connection needs to be closed. */ ZombiedConnection = 3010
ResumeClosingOldConnection = 3024,
/** Did not receive a heartbeat ACK in time.
* Closing the shard and creating a new session.
*/
ZombiedConnection = 3010,
/** Discordeno's gateway tests hae been finished, therefore the Shard can be turned off. */
TestingFinished = 3064,
/** Special close code reserved for Discordeno's zero-downtime resharding system. */
Resharded = 3065,
/** Shard is re-identifying therefore the old connection needs to be closed. */
ReIdentifying = 3066
} }

View File

@ -1,4 +1,7 @@
import { Logger, delay } from '@biscuitland/common'; import type { Logger } from '@biscuitland/common';
import { delay } from '@biscuitland/common';
export * from './timeout';
/** /**
* just any kind of request to queue and resolve later * just any kind of request to queue and resolve later
@ -54,7 +57,9 @@ export class DynamicBucket {
} }
get remaining(): number { get remaining(): number {
if (this.limit < this.used) return 0; if (this.limit < this.used) {
return 0;
}
return this.limit - this.used; return this.limit - this.used;
} }
@ -119,8 +124,8 @@ export class DynamicBucket {
/** Pauses the execution until the request is available to be made. */ /** Pauses the execution until the request is available to be made. */
async acquire(priority: number): Promise<void> { async acquire(priority: number): Promise<void> {
return await new Promise((resolve) => { return await new Promise((_resolve) => {
this.queue.push(resolve, priority); this.queue.push(_resolve, priority);
// biome-ignore lint/complexity/noVoid: <explanation> // biome-ignore lint/complexity/noVoid: <explanation>
void this.processQueue(); void this.processQueue();
}); });
@ -233,6 +238,7 @@ export abstract class Queue<T> {
public toArray(): T[] { public toArray(): T[] {
return Array.from(this); return Array.from(this);
} }
public toString() { public toString() {
return this.head?.toString() || ''; return this.head?.toString() || '';
} }

View File

@ -0,0 +1,28 @@
export class ConnectTimeout {
promises: { promise: Promise<boolean>; resolve: (x: boolean) => any }[] = [];
interval?: NodeJS.Timeout = undefined;
// biome-ignore lint/nursery/noEmptyBlockStatements: <explanation>
constructor(readonly intervalTime = 5000) {}
wait() {
let resolve = (_x: boolean) => {
//
};
const promise = new Promise<boolean>((r) => (resolve = r));
if (!this.promises.length) {
this.interval = setInterval(() => {
this.shift();
}, this.intervalTime);
}
this.promises.push({ resolve, promise });
return promise;
}
shift() {
this.promises.shift()?.resolve(true);
if (!this.promises.length) {
clearInterval(this.interval);
this.interval = undefined;
}
}
}