mirror of
https://github.com/tiramisulabs/seyfert.git
synced 2025-07-04 14:06:07 +00:00
Apply our custom gateway on biscuit (#145)
* feat(ws): new custom ws * fix(ws): shard reconnect only if property exist
This commit is contained in:
parent
d6ebe07ee7
commit
20dc37d9e3
@ -1,10 +1,10 @@
|
|||||||
import { GatewayIntentBits, Identify, When } from '@biscuitland/common';
|
import { GatewayIntentBits, Identify, When } from "@biscuitland/common";
|
||||||
import type { BiscuitRESTOptions, CDNRoutes, Routes } from '@biscuitland/rest';
|
import type { BiscuitRESTOptions, CDNRoutes, Routes } from "@biscuitland/rest";
|
||||||
import { BiscuitREST, CDN, Router } from '@biscuitland/rest';
|
import { BiscuitREST, CDN, Router } from "@biscuitland/rest";
|
||||||
import { CreateGatewayManagerOptions, GatewayEvents, GatewayManager } from '@biscuitland/ws';
|
import { GatewayEvents, GatewayManager, GatewayManagerOptions } from "@biscuitland/ws";
|
||||||
import EventEmitter2 from 'eventemitter2';
|
import EventEmitter2 from "eventemitter2";
|
||||||
import { MainManager, getBotIdFromToken } from '.';
|
import { MainManager, getBotIdFromToken } from ".";
|
||||||
import { Handler, actionHandler } from './events/handler';
|
import { Handler, actionHandler } from "./events/handler";
|
||||||
|
|
||||||
export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
||||||
constructor(public options: BiscuitOptions) {
|
constructor(public options: BiscuitOptions) {
|
||||||
@ -67,7 +67,7 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
|||||||
if (!rest) {
|
if (!rest) {
|
||||||
return new BiscuitREST({
|
return new BiscuitREST({
|
||||||
token: this.options.token,
|
token: this.options.token,
|
||||||
...this.options.defaultRestOptions
|
...this.options.defaultRestOptions,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
|||||||
return rest;
|
return rest;
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new Error('[CORE] REST not found');
|
throw new Error("[CORE] REST not found");
|
||||||
}
|
}
|
||||||
|
|
||||||
async start() {
|
async start() {
|
||||||
@ -85,16 +85,17 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
|||||||
ctx.gateway = new GatewayManager({
|
ctx.gateway = new GatewayManager({
|
||||||
token: this.options.token,
|
token: this.options.token,
|
||||||
intents: this.options.intents ?? 0,
|
intents: this.options.intents ?? 0,
|
||||||
connection: this.options.defaultGatewayOptions?.connection ?? (await this.api.gateway.bot.get()),
|
info: this.options.defaultGatewayOptions?.info ?? (await this.api.gateway.bot.get()),
|
||||||
async handlePayload(shard, data) {
|
async handlePayload(shard, data) {
|
||||||
const { t, d } = data;
|
const { t, d } = data;
|
||||||
if (!(t && d)) return;
|
if (!(t && d)) return;
|
||||||
|
// @ts-expect-error
|
||||||
actionHandler([ctx, { t, d }, shard]);
|
actionHandler([ctx, { t, d }, shard]);
|
||||||
},
|
},
|
||||||
...this.options.defaultGatewayOptions
|
...this.options.defaultGatewayOptions,
|
||||||
});
|
});
|
||||||
|
|
||||||
ctx.once('READY', (payload) => {
|
ctx.once("READY", (payload) => {
|
||||||
const { user, application } = payload;
|
const { user, application } = payload;
|
||||||
this.botId = user.id;
|
this.botId = user.id;
|
||||||
this.applicationId = application.id;
|
this.applicationId = application.id;
|
||||||
@ -103,18 +104,18 @@ export class Session<On extends boolean = boolean> extends EventEmitter2 {
|
|||||||
await ctx.gateway.spawnShards();
|
await ctx.gateway.spawnShards();
|
||||||
}
|
}
|
||||||
|
|
||||||
async stop(code = 1000, error = 'Stopped') {
|
async stop() {
|
||||||
this.removeAllListeners();
|
this.removeAllListeners();
|
||||||
await this.gateway.shutdown(code, error);
|
await this.gateway.explode();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export type HandlePayload = Pick<CreateGatewayManagerOptions, 'handlePayload'>['handlePayload'];
|
export type HandlePayload = Pick<GatewayManagerOptions, "handlePayload">["handlePayload"];
|
||||||
|
|
||||||
export interface BiscuitOptions {
|
export interface BiscuitOptions {
|
||||||
token: string;
|
token: string;
|
||||||
intents: number | GatewayIntentBits;
|
intents: number | GatewayIntentBits;
|
||||||
rest?: BiscuitREST;
|
rest?: BiscuitREST;
|
||||||
defaultRestOptions?: Partial<BiscuitRESTOptions>;
|
defaultRestOptions?: Partial<BiscuitRESTOptions>;
|
||||||
defaultGatewayOptions?: Identify<Partial<Omit<CreateGatewayManagerOptions, 'token' | 'intents'>>>;
|
defaultGatewayOptions?: Identify<Partial<Omit<GatewayManagerOptions, "token" | "intents">>>;
|
||||||
}
|
}
|
||||||
|
@ -59,40 +59,6 @@ import type {
|
|||||||
RestToKeys,
|
RestToKeys,
|
||||||
} from "@biscuitland/common";
|
} from "@biscuitland/common";
|
||||||
|
|
||||||
export enum ShardState {
|
|
||||||
/** Shard is fully connected to the gateway and receiving events from Discord. */
|
|
||||||
Connected = 0,
|
|
||||||
/** Shard started to connect to the gateway. This is only used if the shard is not currently trying to identify or resume. */
|
|
||||||
Connecting = 1,
|
|
||||||
/** Shard got disconnected and reconnection actions have been started. */
|
|
||||||
Disconnected = 2,
|
|
||||||
/** The shard is connected to the gateway but only heartbeating. At this state the shard has not been identified with discord. */
|
|
||||||
Unidentified = 3,
|
|
||||||
/** Shard is trying to identify with the gateway to create a new session. */
|
|
||||||
Identifying = 4,
|
|
||||||
/** Shard is trying to resume a session with the gateway. */
|
|
||||||
Resuming = 5,
|
|
||||||
/** Shard got shut down studied or due to a not (self) fixable error and may not attempt to reconnect on its own. */
|
|
||||||
Offline = 6,
|
|
||||||
}
|
|
||||||
|
|
||||||
export enum ShardSocketCloseCodes {
|
|
||||||
/** A regular Shard shutdown. */
|
|
||||||
Shutdown = 3000,
|
|
||||||
/** A resume has been requested and therefore the old connection needs to be closed. */
|
|
||||||
ResumeClosingOldConnection = 3024,
|
|
||||||
/** Did not receive a heartbeat ACK in time.
|
|
||||||
* Closing the shard and creating a new session.
|
|
||||||
*/
|
|
||||||
ZombiedConnection = 3010,
|
|
||||||
/** Discordeno's gateway tests hae been finished, therefore the Shard can be turned off. */
|
|
||||||
TestingFinished = 3064,
|
|
||||||
/** Special close code reserved for Discordeno's zero-downtime resharding system. */
|
|
||||||
Resharded = 3065,
|
|
||||||
/** Shard is re-identifying therefore the old connection needs to be closed. */
|
|
||||||
ReIdentifying = 3066,
|
|
||||||
}
|
|
||||||
|
|
||||||
/** https://discord.com/developers/docs/topics/gateway-events#update-presence */
|
/** https://discord.com/developers/docs/topics/gateway-events#update-presence */
|
||||||
export interface StatusUpdate {
|
export interface StatusUpdate {
|
||||||
/** The user's activities */
|
/** The user's activities */
|
||||||
|
91
packages/ws/src/constants/index.ts
Normal file
91
packages/ws/src/constants/index.ts
Normal file
@ -0,0 +1,91 @@
|
|||||||
|
import type { GatewayDispatchPayload } from "@biscuitland/common";
|
||||||
|
import type { GatewayManagerOptions } from "../gateway";
|
||||||
|
|
||||||
|
const COMPRESS = false;
|
||||||
|
|
||||||
|
const properties = {
|
||||||
|
os: process.platform,
|
||||||
|
browser: "Biscuit",
|
||||||
|
device: "Biscuit",
|
||||||
|
};
|
||||||
|
|
||||||
|
const GatewayManagerDefaults: Partial<GatewayManagerOptions> = {
|
||||||
|
totalShards: 1,
|
||||||
|
spawnShardDelay: 5300,
|
||||||
|
debug: false,
|
||||||
|
intents: 0,
|
||||||
|
properties: properties,
|
||||||
|
version: 10,
|
||||||
|
handlePayload: function (shardId: number, packet: GatewayDispatchPayload): void {
|
||||||
|
console.info(`Packet ${packet.t} on shard ${shardId}`);
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
export interface IdentifyProperties {
|
||||||
|
/**
|
||||||
|
* Operating system the shard runs on.
|
||||||
|
* @default "darwin" | "linux" | "windows"
|
||||||
|
*/
|
||||||
|
os: string;
|
||||||
|
/**
|
||||||
|
* The "browser" where this shard is running on.
|
||||||
|
*/
|
||||||
|
browser: string;
|
||||||
|
/**
|
||||||
|
* The device on which the shard is running.
|
||||||
|
*/
|
||||||
|
device: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ShardState {
|
||||||
|
/** Shard is fully connected to the gateway and receiving events from Discord. */
|
||||||
|
Connected = 0,
|
||||||
|
/** Shard started to connect to the gateway. This is only used if the shard is not currently trying to identify or resume. */
|
||||||
|
Connecting = 1,
|
||||||
|
/** Shard got disconnected and reconnection actions have been started. */
|
||||||
|
Disconnected = 2,
|
||||||
|
/** The shard is connected to the gateway but only heartbeating. At this state the shard has not been identified with discord. */
|
||||||
|
Unidentified = 3,
|
||||||
|
/** Shard is trying to identify with the gateway to create a new session. */
|
||||||
|
Identifying = 4,
|
||||||
|
/** Shard is trying to resume a session with the gateway. */
|
||||||
|
Resuming = 5,
|
||||||
|
/** Shard got shut down studied or due to a not (self) fixable error and may not attempt to reconnect on its own. */
|
||||||
|
Offline = 6,
|
||||||
|
}
|
||||||
|
|
||||||
|
function isObject(o: any) {
|
||||||
|
return o && typeof o === "object" && !Array.isArray(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
function Options<T>(defaults: any, ...options: any[]): T {
|
||||||
|
const option = options.shift();
|
||||||
|
if (!option) return defaults;
|
||||||
|
|
||||||
|
return Options(
|
||||||
|
{
|
||||||
|
...option,
|
||||||
|
...Object.fromEntries(
|
||||||
|
Object.entries(defaults).map(([key, value]) => [
|
||||||
|
key,
|
||||||
|
isObject(value) ? Options(value, option?.[key] || {}) : option?.[key] || value,
|
||||||
|
]),
|
||||||
|
),
|
||||||
|
},
|
||||||
|
...options,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// nutella truco
|
||||||
|
function OptionsD<O extends Record<any, any>>(o: O) {
|
||||||
|
return function (target: { new (...args: any[]): any }) {
|
||||||
|
return class extends target {
|
||||||
|
constructor(...args: any[]) {
|
||||||
|
super();
|
||||||
|
this.options = Options(o, ...args.filter(isObject));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export { COMPRESS, GatewayManagerDefaults, Options, OptionsD, ShardState, isObject, properties };
|
@ -1,17 +0,0 @@
|
|||||||
export const GatewayManagerDefaultOptions = {
|
|
||||||
intents: 0,
|
|
||||||
properties: {
|
|
||||||
os: process.platform,
|
|
||||||
browser: 'Biscuit',
|
|
||||||
device: 'Biscuit'
|
|
||||||
},
|
|
||||||
version: 10,
|
|
||||||
totalShards: 1,
|
|
||||||
lastShardId: 0,
|
|
||||||
firstShardId: 0,
|
|
||||||
totalWorkers: 4,
|
|
||||||
shardsPerWorker: 25,
|
|
||||||
spawnShardDelay: 5300,
|
|
||||||
debug: false,
|
|
||||||
cache: false
|
|
||||||
};
|
|
3
packages/ws/src/gateway/index.ts
Normal file
3
packages/ws/src/gateway/index.ts
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
export * from "./shard";
|
||||||
|
export * from "./sharder";
|
||||||
|
export * from "./shared";
|
477
packages/ws/src/gateway/shard.ts
Normal file
477
packages/ws/src/gateway/shard.ts
Normal file
@ -0,0 +1,477 @@
|
|||||||
|
import type {
|
||||||
|
GatewayDispatchPayload,
|
||||||
|
GatewayHeartbeatRequest,
|
||||||
|
GatewayHello,
|
||||||
|
GatewayReceivePayload,
|
||||||
|
GatewaySendPayload,
|
||||||
|
When,
|
||||||
|
} from "@biscuitland/common";
|
||||||
|
|
||||||
|
import {
|
||||||
|
GATEWAY_BASE_URL,
|
||||||
|
GatewayVersion as GATEWAY_VERSION,
|
||||||
|
GatewayCloseCodes,
|
||||||
|
GatewayDispatchEvents,
|
||||||
|
GatewayOpcodes,
|
||||||
|
Logger,
|
||||||
|
} from "@biscuitland/common";
|
||||||
|
import { inflateSync } from "node:zlib";
|
||||||
|
import { WebSocket, type MessageEvent } from "ws";
|
||||||
|
import { COMPRESS, ShardState, properties } from "../constants/index";
|
||||||
|
import { DynamicBucket, PriorityQueue } from "../structures/index";
|
||||||
|
import type { ShardData, ShardOptions } from "./shared";
|
||||||
|
|
||||||
|
export interface ShardHeart {
|
||||||
|
/** Whether or not the heartbeat was acknowledged by Discord in time. */
|
||||||
|
ack: boolean;
|
||||||
|
/** Interval between heartbeats requested by Discord. */
|
||||||
|
interval: number;
|
||||||
|
/** Id of the interval, which is used for sending the heartbeats. */
|
||||||
|
intervalId?: NodeJS.Timer;
|
||||||
|
/** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */
|
||||||
|
lastAck?: number;
|
||||||
|
/** Unix timestamp (in milliseconds) when the last heartbeat was sent. */
|
||||||
|
lastBeat?: number;
|
||||||
|
/** Round trip time (in milliseconds) from Shard to Discord and back.
|
||||||
|
* Calculated using the heartbeat system.
|
||||||
|
* Note: this value is undefined until the first heartbeat to Discord has happened.
|
||||||
|
*/
|
||||||
|
rtt?: number;
|
||||||
|
/** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */
|
||||||
|
timeoutId?: NodeJS.Timeout;
|
||||||
|
/** internal value */
|
||||||
|
toString(): string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class ShardHeartbeater {
|
||||||
|
shard: Shard<true>;
|
||||||
|
heart: ShardHeart;
|
||||||
|
|
||||||
|
constructor(shard: Shard<true>) {
|
||||||
|
this.shard = shard;
|
||||||
|
this.heart = {
|
||||||
|
ack: false,
|
||||||
|
interval: 30_000,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
acknowledge(ack = true) {
|
||||||
|
this.heart.ack = ack;
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleHeartbeat(_packet: Extract<GatewayReceivePayload, GatewayHeartbeatRequest>) {
|
||||||
|
this.shard.logger.debug("received hearbeat event");
|
||||||
|
this.heartbeat(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleHello(packet: GatewayHello) {
|
||||||
|
if (packet.d.heartbeat_interval > 0) {
|
||||||
|
if (this.heart.interval != null) {
|
||||||
|
this.stopHeartbeating();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.heart.interval = packet.d.heartbeat_interval;
|
||||||
|
this.heart.intervalId = setInterval(() => {
|
||||||
|
this.acknowledge(false);
|
||||||
|
this.heartbeat(false);
|
||||||
|
}, this.heart.interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
const interval: number = packet.d.heartbeat_interval;
|
||||||
|
return this.startHeartbeating(interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
async onpacket(packet: GatewayReceivePayload) {
|
||||||
|
switch (packet.op) {
|
||||||
|
case GatewayOpcodes.Hello: {
|
||||||
|
return this.handleHello(packet);
|
||||||
|
}
|
||||||
|
case GatewayOpcodes.Heartbeat: {
|
||||||
|
return this.handleHeartbeat(packet);
|
||||||
|
}
|
||||||
|
case GatewayOpcodes.HeartbeatAck: {
|
||||||
|
this.acknowledge();
|
||||||
|
return (this.heart.lastAck = Date.now());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* sends a heartbeat whenever its needed
|
||||||
|
* fails if heart.interval is null
|
||||||
|
*/
|
||||||
|
heartbeat(acknowledgeAck: boolean) {
|
||||||
|
if (acknowledgeAck) {
|
||||||
|
if (!this.heart.lastAck) this.shard.disconnect({ reconnect: () => this.shard.resume() });
|
||||||
|
this.heart.lastAck = undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.heart.lastBeat = Date.now();
|
||||||
|
|
||||||
|
// avoid creating a bucket here
|
||||||
|
this.shard.websocket.send(
|
||||||
|
JSON.stringify({
|
||||||
|
op: GatewayOpcodes.Heartbeat,
|
||||||
|
d: this.shard.data.resumeSeq,
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* wait the first interval upon receiving the hello event, then start heartbiting
|
||||||
|
* interval * jitter
|
||||||
|
* @param interval the packet interval
|
||||||
|
*/
|
||||||
|
async startHeartbeating(interval: number) {
|
||||||
|
this.heart.interval = interval;
|
||||||
|
this.shard.logger.debug("scheduling heartbeat!");
|
||||||
|
|
||||||
|
// The first heartbeat needs to be send with a random delay between `0` and `interval`
|
||||||
|
// Using a `setTimeout(_, jitter)` here to accomplish that.
|
||||||
|
// `Math.random()` can be `0` so we use `0.5` if this happens
|
||||||
|
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating
|
||||||
|
const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5));
|
||||||
|
|
||||||
|
return (this.heart.timeoutId = setTimeout(() => {
|
||||||
|
// send a heartbeat
|
||||||
|
this.heartbeat(false);
|
||||||
|
this.heart.intervalId = setInterval(() => {
|
||||||
|
this.acknowledge(false);
|
||||||
|
this.heartbeat(false);
|
||||||
|
}, this.heart.interval);
|
||||||
|
}, jitter));
|
||||||
|
}
|
||||||
|
|
||||||
|
stopHeartbeating() {
|
||||||
|
clearTimeout(this.heart.intervalId);
|
||||||
|
return clearTimeout(this.heart.timeoutId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class Shard<Connected extends boolean = true> {
|
||||||
|
constructor(id: number, protected options: ShardOptions) {
|
||||||
|
this.options.ratelimitOptions ??= {
|
||||||
|
rateLimitResetInterval: 60_000,
|
||||||
|
maxRequestsPerRateLimitTick: 120,
|
||||||
|
};
|
||||||
|
this.data = {
|
||||||
|
id,
|
||||||
|
resume_gateway_url: GATEWAY_BASE_URL,
|
||||||
|
resumeSeq: null,
|
||||||
|
} as never;
|
||||||
|
this.websocket = null as never;
|
||||||
|
this.ready = false;
|
||||||
|
this.handlePayload = (packet) => {
|
||||||
|
// do not ->
|
||||||
|
// decorate this function
|
||||||
|
if (packet.t === "READY") {
|
||||||
|
this.logger.debug("received ready event");
|
||||||
|
this.data.resume_gateway_url = `${packet.d.resume_gateway_url}?v=${GATEWAY_VERSION}&encoding=json`;
|
||||||
|
this.logger.debug("switch resume url");
|
||||||
|
this.data.session_id = packet.d.session_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.options.handlePayload(this.data.id ?? id, packet);
|
||||||
|
};
|
||||||
|
|
||||||
|
// we create an empty heartbeater just to get the interval variable
|
||||||
|
this.heartbeater = new ShardHeartbeater(this as Shard<true>) as never;
|
||||||
|
|
||||||
|
const safe = this.calculateSafeRequests();
|
||||||
|
|
||||||
|
this.bucket = new DynamicBucket({
|
||||||
|
limit: safe,
|
||||||
|
refillAmount: safe,
|
||||||
|
refillInterval: 6e4,
|
||||||
|
debug: this.options.debug,
|
||||||
|
});
|
||||||
|
this.logger = new Logger({
|
||||||
|
name: "[Shard]",
|
||||||
|
active: this.options.debug,
|
||||||
|
logLevel: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
websocket: When<Connected, WebSocket, null>;
|
||||||
|
heartbeater: When<Connected, ShardHeartbeater, null>;
|
||||||
|
data: When<Connected, ShardData, Partial<ShardData>>;
|
||||||
|
handlePayload: (packet: GatewayDispatchPayload) => unknown;
|
||||||
|
offlineSendQueue: PriorityQueue<(_?: unknown) => void> = new PriorityQueue();
|
||||||
|
bucket: DynamicBucket;
|
||||||
|
logger: Logger;
|
||||||
|
resolves: Map<"READY" | "RESUMED" | "INVALID_SESSION", (payload: GatewayReceivePayload) => void> = new Map();
|
||||||
|
|
||||||
|
/** wheter the shard was already identified */
|
||||||
|
ready: boolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* a guard of wheter is connected or not
|
||||||
|
*/
|
||||||
|
isConnected(): this is Shard<true> {
|
||||||
|
{
|
||||||
|
return this.websocket?.readyState === WebSocket.OPEN;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* the state of the current shard
|
||||||
|
*/
|
||||||
|
get state() {
|
||||||
|
return this.data.shardState ?? ShardState.Offline;
|
||||||
|
}
|
||||||
|
|
||||||
|
set state(st: ShardState) {
|
||||||
|
this.data.shardState = st;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* pushes dead requests for the bot to resolve later on send payload
|
||||||
|
* this is a remanecent of the old gateway but I (Yuzu) decided to let it untouched
|
||||||
|
*/
|
||||||
|
async checkOffline(priority: number) {
|
||||||
|
if (!this.isConnected()) {
|
||||||
|
return new Promise((resolve) => this.offlineSendQueue.push(resolve, priority));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send a message to Discord Gateway.
|
||||||
|
* sets up the buckets aswell for every path
|
||||||
|
* these buckets are dynamic memory however a good practice is to use 'WebSocket.send' directly
|
||||||
|
* in simpler terms, do not use where we don't want buckets
|
||||||
|
*/
|
||||||
|
async send(this: Shard<true>, priority: number, message: GatewaySendPayload) {
|
||||||
|
// Before acquiring a token from the bucket, check whether the shard is currently offline or not.
|
||||||
|
// Else bucket and token wait time just get wasted.
|
||||||
|
await this.checkOffline(priority);
|
||||||
|
|
||||||
|
// pause the function execution for the bucket to be acquired
|
||||||
|
await this.bucket.acquire(priority);
|
||||||
|
|
||||||
|
// It's possible, that the shard went offline after a token has been acquired from the bucket.
|
||||||
|
await this.checkOffline(priority);
|
||||||
|
|
||||||
|
// send the payload at last
|
||||||
|
this.websocket.send(JSON.stringify(message));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** starts the ws connection */
|
||||||
|
async connect(): Promise<Shard<true>> {
|
||||||
|
if (this.state === ShardState.Resuming) {
|
||||||
|
this.state = ShardState.Connecting;
|
||||||
|
}
|
||||||
|
if (this.state === ShardState.Identifying) {
|
||||||
|
this.state = ShardState.Connecting;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set client
|
||||||
|
const websocket = new WebSocket(this.data.session_id ? this.gatewayURL : this.options.info.url);
|
||||||
|
this.websocket = websocket as When<Connected, WebSocket>;
|
||||||
|
|
||||||
|
this.websocket.onmessage = async (event) => {
|
||||||
|
return await (this as Shard<true>).handleMessage(event);
|
||||||
|
};
|
||||||
|
|
||||||
|
this.websocket.onerror = (event) => {
|
||||||
|
return this.logger.error({ error: event, shardId: this.data.id });
|
||||||
|
};
|
||||||
|
|
||||||
|
this.websocket.onclose = async (event) => {
|
||||||
|
return await (this as Shard<true>).handleClose(event.code, event.reason);
|
||||||
|
};
|
||||||
|
|
||||||
|
return new Promise<Shard<true>>((resolve, _reject) => {
|
||||||
|
this.websocket!.onopen = () => {
|
||||||
|
if (this.state === ShardState.Resuming) {
|
||||||
|
this.state = ShardState.Unidentified;
|
||||||
|
}
|
||||||
|
if (this.state === ShardState.Identifying) {
|
||||||
|
this.state = ShardState.Unidentified;
|
||||||
|
}
|
||||||
|
resolve(this as Shard<true>);
|
||||||
|
};
|
||||||
|
|
||||||
|
// set hearbeater
|
||||||
|
const heartbeater = new ShardHeartbeater(this as Shard<true>);
|
||||||
|
this.heartbeater = heartbeater as When<Connected, ShardHeartbeater, null>;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Handle an incoming gateway message. */
|
||||||
|
async handleMessage(this: Shard<true>, event: MessageEvent) {
|
||||||
|
let preProcessMessage = event.data;
|
||||||
|
|
||||||
|
if (preProcessMessage instanceof Buffer) {
|
||||||
|
preProcessMessage = inflateSync(preProcessMessage);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof preProcessMessage === "string") {
|
||||||
|
const packet = JSON.parse(preProcessMessage);
|
||||||
|
|
||||||
|
// emit heartbeat events
|
||||||
|
this.heartbeater.onpacket(packet);
|
||||||
|
|
||||||
|
// try to identify
|
||||||
|
if (packet.op === GatewayOpcodes.Hello) {
|
||||||
|
this.identify();
|
||||||
|
}
|
||||||
|
|
||||||
|
// emit other events
|
||||||
|
this.onpacket(packet);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
async handleClose(this: Shard<true>, code: number, reason: string) {
|
||||||
|
this.heartbeater.stopHeartbeating();
|
||||||
|
|
||||||
|
switch (code) {
|
||||||
|
case 1000: {
|
||||||
|
this.logger.info("Uknown error code trying to resume");
|
||||||
|
return this.resume();
|
||||||
|
}
|
||||||
|
case GatewayCloseCodes.UnknownOpcode:
|
||||||
|
case GatewayCloseCodes.NotAuthenticated:
|
||||||
|
case GatewayCloseCodes.InvalidSeq:
|
||||||
|
case GatewayCloseCodes.RateLimited:
|
||||||
|
case GatewayCloseCodes.SessionTimedOut:
|
||||||
|
this.logger.info(`Gateway connection closing requiring re-identify. Code: ${code}`);
|
||||||
|
this.state = ShardState.Identifying;
|
||||||
|
return this.identify();
|
||||||
|
case GatewayCloseCodes.AuthenticationFailed:
|
||||||
|
case GatewayCloseCodes.InvalidShard:
|
||||||
|
case GatewayCloseCodes.ShardingRequired:
|
||||||
|
case GatewayCloseCodes.InvalidAPIVersion:
|
||||||
|
case GatewayCloseCodes.InvalidIntents:
|
||||||
|
case GatewayCloseCodes.DisallowedIntents:
|
||||||
|
this.state = ShardState.Offline;
|
||||||
|
// disconnected event
|
||||||
|
throw new Error(reason || "Discord gave no reason! GG! You broke Discord!");
|
||||||
|
default:
|
||||||
|
return this.disconnect({ reconnect: () => this.resume() });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async resume(this: Shard<true>) {
|
||||||
|
this.state = ShardState.Resuming;
|
||||||
|
|
||||||
|
if (!this.ready) return;
|
||||||
|
|
||||||
|
return await this.send(0, {
|
||||||
|
op: GatewayOpcodes.Resume,
|
||||||
|
d: {
|
||||||
|
token: this.options.token,
|
||||||
|
session_id: this.data.session_id!,
|
||||||
|
seq: this.data.resumeSeq!,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
resetState() {
|
||||||
|
this.ready = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
disconnect(options?: { reconnect?: () => unknown }) {
|
||||||
|
this.logger.error("closing connection");
|
||||||
|
|
||||||
|
if (!this.websocket) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!options?.reconnect) {
|
||||||
|
this.websocket.close(1012, "BiscuitWS: close");
|
||||||
|
} else {
|
||||||
|
this.logger.error("trying to reconnect");
|
||||||
|
options.reconnect();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.websocket.terminate();
|
||||||
|
|
||||||
|
this.resetState();
|
||||||
|
}
|
||||||
|
|
||||||
|
async onpacket(this: Shard<true>, packet: GatewayDispatchPayload | GatewayReceivePayload) {
|
||||||
|
if (packet.s) this.data.resumeSeq = packet.s;
|
||||||
|
|
||||||
|
switch (packet.op) {
|
||||||
|
case GatewayOpcodes.InvalidSession:
|
||||||
|
this.logger.debug("got invalid session, trying to identify back");
|
||||||
|
this.data.resumeSeq = null;
|
||||||
|
this.data.session_id = undefined;
|
||||||
|
this.data.resume_gateway_url = undefined;
|
||||||
|
this.resetState();
|
||||||
|
await this.identify();
|
||||||
|
break;
|
||||||
|
case GatewayOpcodes.Reconnect:
|
||||||
|
this.disconnect({
|
||||||
|
reconnect: () => {
|
||||||
|
this.resume();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (packet.t) {
|
||||||
|
case GatewayDispatchEvents.Resumed:
|
||||||
|
this.heartbeater.heartbeat(false);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
this.handlePayload(packet as any);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** do the handshake with discord */
|
||||||
|
async identify() {
|
||||||
|
if (!this.isConnected()) {
|
||||||
|
await this.connect().then((shardThis: Shard<true>) => {
|
||||||
|
this.identify.call(shardThis);
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!this.ready) {
|
||||||
|
this.logger.debug(`identifying shard ${this.data.id} with a total of ${this.options.info.shards}`);
|
||||||
|
this.data.shardState = ShardState.Identifying;
|
||||||
|
await this.send(0, {
|
||||||
|
op: GatewayOpcodes.Identify,
|
||||||
|
d: {
|
||||||
|
token: this.options.token,
|
||||||
|
intents: this.options.intents,
|
||||||
|
properties: properties,
|
||||||
|
shard: [this.data.id, this.options.info.shards],
|
||||||
|
compress: COMPRESS,
|
||||||
|
},
|
||||||
|
}).then(() => {
|
||||||
|
this.logger.debug("finished identifying");
|
||||||
|
});
|
||||||
|
// ^if we don't get this message start preocupating
|
||||||
|
|
||||||
|
this.ready = true;
|
||||||
|
}
|
||||||
|
// ^we make sure we can no longer identify unless invalid session
|
||||||
|
|
||||||
|
return new Promise<void>((resolve) => {
|
||||||
|
return this.shardIsReady?.().then(() => {
|
||||||
|
resolve();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
get gatewayURL() {
|
||||||
|
return this.data.resume_gateway_url ?? this.options.info.url;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Calculate the amount of requests which can safely be made per rate limit interval, before the gateway gets disconnected due to an exceeded rate limit. */
|
||||||
|
calculateSafeRequests(): number {
|
||||||
|
// * 2 adds extra safety layer for discords OP 1 requests that we need to respond to
|
||||||
|
const safeRequests =
|
||||||
|
this.options.ratelimitOptions!.maxRequestsPerRateLimitTick -
|
||||||
|
Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heartbeater!.heart.interval) * 2;
|
||||||
|
|
||||||
|
if (safeRequests < 0) return 0;
|
||||||
|
else return safeRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
shardIsReady?: () => Promise<void>;
|
||||||
|
}
|
86
packages/ws/src/gateway/sharder.ts
Normal file
86
packages/ws/src/gateway/sharder.ts
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
import type { APIGatewayBotInfo } from "@biscuitland/common";
|
||||||
|
import { Collection, Logger } from "@biscuitland/common";
|
||||||
|
import { GatewayManagerDefaults, Options } from "../constants/index";
|
||||||
|
import { SequentialBucket } from "../structures/index";
|
||||||
|
import { Shard } from "./shard";
|
||||||
|
import type { GatewayManagerOptions } from "./shared";
|
||||||
|
|
||||||
|
export class GatewayManager extends Collection<number, Shard> {
|
||||||
|
connectQueue: SequentialBucket;
|
||||||
|
options: Required<GatewayManagerOptions>;
|
||||||
|
logger: Logger;
|
||||||
|
|
||||||
|
constructor(_options: GatewayManagerOptions) {
|
||||||
|
super();
|
||||||
|
this.options = Options<Required<GatewayManagerOptions>>(GatewayManagerDefaults, { info: { shards: _options.totalShards } }, _options);
|
||||||
|
this.connectQueue = new SequentialBucket(this.concurrency);
|
||||||
|
this.logger = new Logger({
|
||||||
|
name: "[GatewayManager]",
|
||||||
|
active: this.options.debug,
|
||||||
|
logLevel: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
get concurrency(): number {
|
||||||
|
return this.options.info.session_start_limit.max_concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
get remaining(): number {
|
||||||
|
return this.options.info.session_start_limit.remaining;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* spawns buckets in order
|
||||||
|
* https://discord.com/developers/docs/topics/gateway#sharding-max-concurrency
|
||||||
|
*/
|
||||||
|
spawnBuckets(): Shard[][] {
|
||||||
|
this.logger.info("Preparing buckets");
|
||||||
|
const chunks = SequentialBucket.chunk(new Array(this.options.totalShards), this.concurrency);
|
||||||
|
|
||||||
|
chunks.forEach((arr: any[], index: number) => {
|
||||||
|
for (let i = 0; i < arr.length; i++) {
|
||||||
|
const id = i + (index > 0 ? index * this.concurrency : 0);
|
||||||
|
chunks[index][i] = this.spawn(id);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return chunks;
|
||||||
|
}
|
||||||
|
|
||||||
|
async spawnShards(): Promise<void> {
|
||||||
|
const buckets = this.spawnBuckets();
|
||||||
|
|
||||||
|
this.logger.info("Spawn shards");
|
||||||
|
for (const bucket of buckets) {
|
||||||
|
for (const shard of bucket) {
|
||||||
|
if (!shard) break;
|
||||||
|
await this.connectQueue.push(shard.identify.bind(shard));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spawn(shardId: number) {
|
||||||
|
let shard = this.get(shardId);
|
||||||
|
|
||||||
|
shard ??= new Shard<true>(shardId, {
|
||||||
|
token: this.options.token,
|
||||||
|
intents: this.options.intents,
|
||||||
|
info: Options<APIGatewayBotInfo>(this.options.info, { shards: this.options.totalShards }),
|
||||||
|
handlePayload: this.options.handlePayload,
|
||||||
|
properties: this.options.properties,
|
||||||
|
debug: this.options.debug,
|
||||||
|
});
|
||||||
|
|
||||||
|
this.set(shardId, shard);
|
||||||
|
|
||||||
|
return shard;
|
||||||
|
}
|
||||||
|
|
||||||
|
async forceIdentify(shardId: number) {
|
||||||
|
await this.spawn(shardId).identify();
|
||||||
|
}
|
||||||
|
|
||||||
|
explode() {
|
||||||
|
return this.forEach(($) => $.disconnect());
|
||||||
|
}
|
||||||
|
}
|
98
packages/ws/src/gateway/shared.ts
Normal file
98
packages/ws/src/gateway/shared.ts
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
import type { APIGatewayBotInfo, GatewayDispatchPayload, GatewayIntentBits } from "@biscuitland/common";
|
||||||
|
import type { IdentifyProperties, ShardState } from "../constants/index";
|
||||||
|
|
||||||
|
export interface ShardDetails {
|
||||||
|
/** Bot token which is used to connect to Discord */
|
||||||
|
token: string;
|
||||||
|
/**
|
||||||
|
* The URL of the gateway which should be connected to.
|
||||||
|
* @default "wss://gateway.discord.gg"
|
||||||
|
*/
|
||||||
|
url?: string;
|
||||||
|
/**
|
||||||
|
* The gateway version which should be used.
|
||||||
|
* @default 10
|
||||||
|
*/
|
||||||
|
version?: number;
|
||||||
|
/**
|
||||||
|
* The calculated intent value of the events which the shard should receive.
|
||||||
|
*/
|
||||||
|
intents: GatewayIntentBits | number;
|
||||||
|
/**
|
||||||
|
* Identify properties to use
|
||||||
|
*/
|
||||||
|
properties?: IdentifyProperties;
|
||||||
|
}
|
||||||
|
|
||||||
|
export enum ShardSocketCloseCodes {
|
||||||
|
/** A regular Shard shutdown. */
|
||||||
|
Shutdown = 3000,
|
||||||
|
/** A resume has been requested and therefore the old connection needs to be closed. */
|
||||||
|
ResumeClosingOldConnection = 3024,
|
||||||
|
/** Did not receive a heartbeat ACK in time.
|
||||||
|
* Closing the shard and creating a new session.
|
||||||
|
*/
|
||||||
|
ZombiedConnection = 3010,
|
||||||
|
/** Discordeno's gateway tests hae been finished, therefore the Shard can be turned off. */
|
||||||
|
TestingFinished = 3064,
|
||||||
|
/** Special close code reserved for Discordeno's zero-downtime resharding system. */
|
||||||
|
Resharded = 3065,
|
||||||
|
/** Shard is re-identifying therefore the old connection needs to be closed. */
|
||||||
|
ReIdentifying = 3066,
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ShardOptions extends ShardDetails {
|
||||||
|
info: APIGatewayBotInfo;
|
||||||
|
handlePayload(shardId: number, packet: GatewayDispatchPayload): unknown;
|
||||||
|
debug?: boolean;
|
||||||
|
ratelimitOptions?: {
|
||||||
|
maxRequestsPerRateLimitTick: number;
|
||||||
|
rateLimitResetInterval: number;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ShardData {
|
||||||
|
/** the id of the current shard */
|
||||||
|
id: number;
|
||||||
|
|
||||||
|
/** state */
|
||||||
|
shardState: ShardState;
|
||||||
|
|
||||||
|
/** resume seq to resume connections */
|
||||||
|
resumeSeq: number | null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* resume_gateway_url is the url to resume the connection
|
||||||
|
* @link https://discord.com/developers/docs/topics/gateway#ready-event
|
||||||
|
*/
|
||||||
|
resume_gateway_url?: string;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* session_id is the unique session id of the gateway
|
||||||
|
* do not mistake with the biscuit client which is named Session
|
||||||
|
*/
|
||||||
|
session_id?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface GatewayManagerOptions extends ShardDetails {
|
||||||
|
/** Important data which is used by the manager to connect shards to the gateway. */
|
||||||
|
info: APIGatewayBotInfo;
|
||||||
|
/**
|
||||||
|
* Delay in milliseconds to wait before spawning next shard. OPTIMAL IS ABOVE 5100. YOU DON'T WANT TO HIT THE RATE LIMIT!!!
|
||||||
|
* @default 5300
|
||||||
|
*/
|
||||||
|
spawnShardDelay?: number;
|
||||||
|
/**
|
||||||
|
* Total amount of shards your bot uses. Useful for zero-downtime updates or resharding.
|
||||||
|
* @default 1
|
||||||
|
*/
|
||||||
|
totalShards?: number;
|
||||||
|
/**
|
||||||
|
* The payload handlers for messages on the shard.
|
||||||
|
*/
|
||||||
|
handlePayload(shardId: number, packet: GatewayDispatchPayload): unknown;
|
||||||
|
/**
|
||||||
|
* wheter to send debug information to the console
|
||||||
|
*/
|
||||||
|
debug?: boolean;
|
||||||
|
}
|
@ -1,7 +1,3 @@
|
|||||||
export * from './manager/GatewayManager';
|
export * from "./SharedTypes";
|
||||||
export * from './manager/GatewayManagerTypes';
|
export * from "./constants";
|
||||||
export * from './shard/Shard';
|
export * from "./gateway";
|
||||||
export * from './shard/ShardTypes';
|
|
||||||
export * from './defaults';
|
|
||||||
export * from './SharedTypes';
|
|
||||||
export * from './utils/Bucket';
|
|
||||||
|
@ -1,183 +0,0 @@
|
|||||||
import { Collection, Logger, Options, delay } from '@biscuitland/common';
|
|
||||||
import {
|
|
||||||
BucketData,
|
|
||||||
CreateGatewayManagerOptions,
|
|
||||||
GatewayManagerDefaultOptions,
|
|
||||||
GatewayMemberRequest,
|
|
||||||
JoinVoiceOptions,
|
|
||||||
Shard
|
|
||||||
} from '../index';
|
|
||||||
export class GatewayManager {
|
|
||||||
buckets = new Map<number, BucketData>();
|
|
||||||
shards = new Map<number, Shard>();
|
|
||||||
cache: Collection<string, GatewayMemberRequest> | null = null;
|
|
||||||
options: Required<CreateGatewayManagerOptions>;
|
|
||||||
logger: Logger;
|
|
||||||
constructor({ connection, ...options }: CreateGatewayManagerOptions) {
|
|
||||||
this.options = Options<Required<CreateGatewayManagerOptions>>(GatewayManagerDefaultOptions, {
|
|
||||||
...options,
|
|
||||||
lastShardId: options.lastShardId ?? (options.totalShards ? options.totalShards - 1 : connection ? connection.shards - 1 : 0)
|
|
||||||
});
|
|
||||||
|
|
||||||
this.options.connection = connection;
|
|
||||||
if (this.options.cache) this.cache = new Collection();
|
|
||||||
this.logger = new Logger({
|
|
||||||
name: '[GatewayManager]',
|
|
||||||
active: this.options.debug
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
calculateTotalShards(): number {
|
|
||||||
if (this.options.totalShards < 100) {
|
|
||||||
this.logger.info(`Calculating total shards: ${this.options.totalShards}`);
|
|
||||||
return this.options.totalShards;
|
|
||||||
}
|
|
||||||
this.logger.info('Calculating total shards', this.options.totalShards, this.options.connection.session_start_limit.max_concurrency);
|
|
||||||
|
|
||||||
// Calculate a multiple of `maxConcurrency` which can be used to connect to the gateway.
|
|
||||||
return (
|
|
||||||
Math.ceil(
|
|
||||||
this.options.totalShards /
|
|
||||||
// If `maxConcurrency` is 1 we can safely use 16.
|
|
||||||
(this.options.connection.session_start_limit.max_concurrency === 1
|
|
||||||
? 16
|
|
||||||
: this.options.connection.session_start_limit.max_concurrency)
|
|
||||||
) * this.options.connection.session_start_limit.max_concurrency
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
calculateWorekId(shardId: number): number {
|
|
||||||
const workerId = shardId % this.options.shardsPerWorker;
|
|
||||||
this.logger.info(
|
|
||||||
`Calculating workerId: Shard: ${shardId} -> Worker: ${workerId} -> Per Worker: ${this.options.shardsPerWorker} -> Total: ${this.options.totalWorkers}`
|
|
||||||
);
|
|
||||||
return workerId;
|
|
||||||
}
|
|
||||||
|
|
||||||
prepareBuckets(): void {
|
|
||||||
for (let i = 0; i < this.options.connection.session_start_limit.max_concurrency; ++i) {
|
|
||||||
this.logger.info(`Preparing buckets for concurrency: ${i}`);
|
|
||||||
this.buckets.set(i, { workers: [], identifyRequest: [] });
|
|
||||||
}
|
|
||||||
|
|
||||||
for (let shardId = this.options.firstShardId; shardId <= this.options.lastShardId; ++shardId) {
|
|
||||||
this.logger.info(`Preparing bucket for shard: ${shardId}`);
|
|
||||||
if (shardId >= this.options.totalShards) {
|
|
||||||
throw new Error(`Shard (id: ${shardId}) is bigger or equal to the used amount of used shards which is ${this.options.totalShards}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
const bucketId = shardId % this.options.connection.session_start_limit.max_concurrency;
|
|
||||||
const bucket = this.buckets.get(bucketId);
|
|
||||||
if (!bucket)
|
|
||||||
throw new Error(
|
|
||||||
`Shard (id: ${shardId}) got assigned to an illegal bucket id: ${bucketId}, expected a bucket id between 0 and ${
|
|
||||||
this.options.connection.session_start_limit.max_concurrency - 1
|
|
||||||
}`
|
|
||||||
);
|
|
||||||
|
|
||||||
const workerId = this.calculateWorekId(shardId);
|
|
||||||
const worker = bucket.workers.find((w) => w.id === workerId);
|
|
||||||
|
|
||||||
// IF THE QUEUE HAS SPACE JUST ADD IT TO THIS QUEUE
|
|
||||||
worker ? worker.queue.push(shardId) : bucket.workers.push({ id: workerId, queue: [shardId] });
|
|
||||||
|
|
||||||
for (const bucket of this.buckets.values()) {
|
|
||||||
for (const worker of bucket.workers.values()) {
|
|
||||||
worker.queue = worker.queue.sort();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async spawnShards() {
|
|
||||||
this.prepareBuckets();
|
|
||||||
|
|
||||||
await Promise.all(
|
|
||||||
[...this.buckets.entries()].map(async ([bucketId, bucket]) => {
|
|
||||||
for (const worker of bucket.workers) {
|
|
||||||
worker.queue.forEach(async (shardId) => await this.tellWorkerIdentify(worker.id, shardId, bucketId));
|
|
||||||
}
|
|
||||||
})
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
async tellWorkerIdentify(...[workerId, shardId, bucketId]: number[]) {
|
|
||||||
this.logger.info(`tell worker to identify (${workerId}, ${shardId}, ${bucketId})`);
|
|
||||||
await this.identify(shardId);
|
|
||||||
}
|
|
||||||
|
|
||||||
async identify(shardId: number, bId?: number) {
|
|
||||||
const bucketId = bId ?? shardId % this.options.connection.session_start_limit.max_concurrency;
|
|
||||||
let shard = this.shards.get(shardId);
|
|
||||||
this.logger.info(`identifying ${shard ? 'existing' : 'new'} shard (${shardId})`);
|
|
||||||
if (!shard) {
|
|
||||||
shard = new Shard({
|
|
||||||
id: shardId,
|
|
||||||
connection: {
|
|
||||||
intents: this.options.intents,
|
|
||||||
url: this.options.connection.url,
|
|
||||||
version: this.options.version,
|
|
||||||
token: this.options.token,
|
|
||||||
totalShards: this.options.totalShards,
|
|
||||||
properties: this.options.properties
|
|
||||||
},
|
|
||||||
logger: this.logger,
|
|
||||||
// @ts-ignore
|
|
||||||
handlePayload: this.options.handlePayload,
|
|
||||||
requestIdentify: async () => await this.identify(shardId),
|
|
||||||
shardIsReady: async () => {
|
|
||||||
this.logger.info(`<Shard> Shard #${shardId} is ready`);
|
|
||||||
await delay(this.options.spawnShardDelay);
|
|
||||||
this.logger.info('<Shard> Resolving shard identify request');
|
|
||||||
this.buckets.get(bucketId)?.identifyRequest.shift()?.();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.shards.set(shardId, shard);
|
|
||||||
}
|
|
||||||
|
|
||||||
const bucket = this.buckets.get(bucketId);
|
|
||||||
if (!bucket) return;
|
|
||||||
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
// Mark that we are making an identify request so another is not made.
|
|
||||||
bucket.identifyRequest.push(resolve);
|
|
||||||
this.logger.info(`identifiying shard #${shardId}`);
|
|
||||||
shard!.identify();
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async shutdown(code: number, reason: string) {
|
|
||||||
this.shards.forEach((shard) => shard.close(code, reason));
|
|
||||||
await delay(5000);
|
|
||||||
}
|
|
||||||
|
|
||||||
async kill(shardId: number) {
|
|
||||||
const shard = this.shards.get(shardId);
|
|
||||||
if (!shard) {
|
|
||||||
this.logger.info(`kill shard but not found ${shardId}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
this.logger.info(`kill shard ${shardId}`);
|
|
||||||
this.shards.delete(shardId);
|
|
||||||
await shard.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
calculateShardId(guildId: string, totalShards = this.options.totalShards) {
|
|
||||||
if (totalShards === 1) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.logger.info(`calculateShardId (guildId: ${guildId}, totalShards: ${totalShards})`);
|
|
||||||
return Number((BigInt(guildId) >> 22n) % BigInt(totalShards));
|
|
||||||
}
|
|
||||||
|
|
||||||
async joinVoice(options: JoinVoiceOptions) {
|
|
||||||
const shardId = this.calculateShardId(options.guild_id);
|
|
||||||
const shard = this.shards.get(shardId);
|
|
||||||
if (!shard) throw new Error(`Shard #${shardId}`);
|
|
||||||
|
|
||||||
this.logger.info(`joinVoice guildId ${options.guild_id} channelId ${options.channel_id}`);
|
|
||||||
shard.joinVoiceChannel(options);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,88 +0,0 @@
|
|||||||
import type { APIGatewayBotInfo, Identify, MakeRequired } from '@biscuitland/common';
|
|
||||||
import type { GatewayEvents, UpdateVoiceState } from '../SharedTypes';
|
|
||||||
|
|
||||||
export interface CreateGatewayManagerOptions {
|
|
||||||
/** Important data which is used by the manager to connect shards to the gateway. */
|
|
||||||
connection: APIGatewayBotInfo;
|
|
||||||
/**
|
|
||||||
* Id of the first Shard which should get controlled by this manager.
|
|
||||||
* @default 0
|
|
||||||
*/
|
|
||||||
firstShardId?: number;
|
|
||||||
/**
|
|
||||||
* Id of the last Shard which should get controlled by this manager.
|
|
||||||
* @default 0
|
|
||||||
*/
|
|
||||||
lastShardId?: number;
|
|
||||||
/**
|
|
||||||
* Delay in milliseconds to wait before spawning next shard. OPTIMAL IS ABOVE 5100. YOU DON'T WANT TO HIT THE RATE LIMIT!!!
|
|
||||||
* @default 5300
|
|
||||||
*/
|
|
||||||
spawnShardDelay?: number;
|
|
||||||
/**
|
|
||||||
* Total amount of shards your bot uses. Useful for zero-downtime updates or resharding.
|
|
||||||
* @default 1
|
|
||||||
*/
|
|
||||||
totalShards?: number;
|
|
||||||
/**
|
|
||||||
* The amount of shards to load per worker.
|
|
||||||
* @default 25
|
|
||||||
*/
|
|
||||||
shardsPerWorker?: number;
|
|
||||||
/**
|
|
||||||
* The total amount of workers to use for your bot.
|
|
||||||
* @default 4
|
|
||||||
*/
|
|
||||||
totalWorkers?: number;
|
|
||||||
/** The calculated intent value of the events which the shard should receive.
|
|
||||||
*
|
|
||||||
* @default 0
|
|
||||||
*/
|
|
||||||
intents?: number;
|
|
||||||
/** Identify properties to use */
|
|
||||||
properties?: {
|
|
||||||
/** Operating system the shard runs on.
|
|
||||||
*
|
|
||||||
* @default "darwin" | "linux" | "windows"
|
|
||||||
*/
|
|
||||||
os: string;
|
|
||||||
/** The "browser" where this shard is running on.
|
|
||||||
*
|
|
||||||
* @default "Discordeno"
|
|
||||||
*/
|
|
||||||
browser: string;
|
|
||||||
/** The device on which the shard is running.
|
|
||||||
*
|
|
||||||
* @default "Discordeno"
|
|
||||||
*/
|
|
||||||
device: string;
|
|
||||||
};
|
|
||||||
/** Bot token which is used to connect to Discord */
|
|
||||||
token: string;
|
|
||||||
/** The URL of the gateway which should be connected to.
|
|
||||||
*
|
|
||||||
* @default "wss://gateway.discord.gg"
|
|
||||||
*/
|
|
||||||
url?: string;
|
|
||||||
/** The gateway version which should be used.
|
|
||||||
*
|
|
||||||
* @default 10
|
|
||||||
*/
|
|
||||||
version?: number;
|
|
||||||
/** The payload handlers for messages on the shard. */
|
|
||||||
handlePayload: <K extends keyof GatewayEvents>(shardId: number, data: { t: K; d: GatewayEvents[K] }) => Promise<unknown>;
|
|
||||||
/** This managers cache related settings. */
|
|
||||||
cache?: boolean;
|
|
||||||
debug?: boolean;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface BucketData {
|
|
||||||
workers: { id: number; queue: number[] }[];
|
|
||||||
identifyRequest: ((value: void | PromiseLike<void>) => void)[];
|
|
||||||
}
|
|
||||||
|
|
||||||
export type JoinVoiceOptions = Identify<
|
|
||||||
Omit<MakeRequired<Partial<UpdateVoiceState>, 'guild_id'>, 'channel_id'> & {
|
|
||||||
channel_id: string;
|
|
||||||
}
|
|
||||||
>;
|
|
@ -1,583 +0,0 @@
|
|||||||
import { LeakyBucket } from '../index';
|
|
||||||
import { ShardGatewayConfig, ShardHeart, ShardCreateOptions } from './ShardTypes';
|
|
||||||
import { GatewayMemberRequest, RequestGuildMembersOptions, ShardSocketCloseCodes, ShardState, StatusUpdate } from '../SharedTypes';
|
|
||||||
import {
|
|
||||||
APIGuildMember,
|
|
||||||
GatewayCloseCodes,
|
|
||||||
GatewayDispatchPayload,
|
|
||||||
GatewayIntentBits,
|
|
||||||
GatewayOpcodes,
|
|
||||||
GatewayReceivePayload,
|
|
||||||
GatewaySendPayload,
|
|
||||||
Collection,
|
|
||||||
Logger
|
|
||||||
} from '@biscuitland/common';
|
|
||||||
import { WebSocket, CloseEvent, MessageEvent } from 'ws';
|
|
||||||
import { JoinVoiceOptions } from '../manager/GatewayManagerTypes';
|
|
||||||
|
|
||||||
export class Shard {
|
|
||||||
/** The id of the shard */
|
|
||||||
id: number;
|
|
||||||
/** The connection config details that this shard will used to connect to discord. */
|
|
||||||
connection: ShardGatewayConfig;
|
|
||||||
/** This contains all the heartbeat information */
|
|
||||||
heart: ShardHeart;
|
|
||||||
/** The maximum of requests which can be send to discord per rate limit tick. Typically this value should not be changed. */
|
|
||||||
maxRequestsPerRateLimitTick = 120;
|
|
||||||
/** The previous payload sequence number. */
|
|
||||||
previousSequenceNumber: number | null = null;
|
|
||||||
/** In which interval (in milliseconds) the gateway resets it's rate limit. */
|
|
||||||
rateLimitResetInterval = 60000;
|
|
||||||
/** Current session id of the shard if present. */
|
|
||||||
sessionId?: string;
|
|
||||||
/** This contains the WebSocket connection to Discord, if currently connected. */
|
|
||||||
socket?: WebSocket;
|
|
||||||
/** Current internal state of the this. */
|
|
||||||
state = ShardState.Offline;
|
|
||||||
/** The url provided by discord to use when resuming a connection for this this. */
|
|
||||||
resumeGatewayUrl = '';
|
|
||||||
/** Cache for pending gateway requests which should have been send while the gateway went offline. */
|
|
||||||
offlineSendQueue: ((_?: unknown) => void)[] = [];
|
|
||||||
/** Resolve internal waiting states. Mapped by SelectedEvents => ResolveFunction */
|
|
||||||
resolves = new Map<'READY' | 'RESUMED' | 'INVALID_SESSION', (payload: GatewayReceivePayload) => void>();
|
|
||||||
/** Shard bucket. Only access this if you know what you are doing. Bucket for handling shard request rate limits. */
|
|
||||||
bucket: LeakyBucket;
|
|
||||||
logger: Logger;
|
|
||||||
/** The payload handlers for messages on the shard. */
|
|
||||||
handlePayload: (shardId: number, data: GatewayReceivePayload) => Promise<unknown>;
|
|
||||||
cache = {
|
|
||||||
requestMembers: {
|
|
||||||
/**
|
|
||||||
* Whether or not request member requests should be cached.
|
|
||||||
* @default false
|
|
||||||
*/
|
|
||||||
enabled: false,
|
|
||||||
/** The pending requests. */
|
|
||||||
pending: new Collection<string, GatewayMemberRequest>()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
constructor(options: ShardCreateOptions) {
|
|
||||||
this.id = options.id;
|
|
||||||
this.connection = options.connection;
|
|
||||||
this.logger = options.logger;
|
|
||||||
this.heart = {
|
|
||||||
acknowledged: false,
|
|
||||||
interval: 45e3
|
|
||||||
};
|
|
||||||
|
|
||||||
if (options.requestIdentify) this.requestIdentify = options.requestIdentify;
|
|
||||||
if (options.shardIsReady) this.shardIsReady = options.shardIsReady;
|
|
||||||
|
|
||||||
this.handlePayload = options.handlePayload;
|
|
||||||
|
|
||||||
const safe = this.calculateSafeRequests();
|
|
||||||
|
|
||||||
this.bucket = new LeakyBucket({
|
|
||||||
max: safe,
|
|
||||||
refillAmount: safe,
|
|
||||||
refillInterval: 6e4
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** The url to connect to. Intially this is the discord gateway url, and then is switched to resume gateway url once a READY is received. */
|
|
||||||
get connectionUrl(): string {
|
|
||||||
return this.resumeGatewayUrl || this.connection.url;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Check whether the connection to Discord is currently open. */
|
|
||||||
get isOpen(): boolean {
|
|
||||||
return this.socket?.readyState === WebSocket.OPEN;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Calculate the amount of requests which can safely be made per rate limit interval, before the gateway gets disconnected due to an exceeded rate limit. */
|
|
||||||
calculateSafeRequests(): number {
|
|
||||||
// * 2 adds extra safety layer for discords OP 1 requests that we need to respond to
|
|
||||||
const safeRequests = this.maxRequestsPerRateLimitTick - Math.ceil(this.rateLimitResetInterval / this.heart.interval) * 2;
|
|
||||||
|
|
||||||
return safeRequests < 0 ? 0 : safeRequests;
|
|
||||||
}
|
|
||||||
|
|
||||||
async checkOffline(highPriority: boolean) {
|
|
||||||
if (!this.isOpen) {
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
// Higher priority requests get added at the beginning of the array.
|
|
||||||
if (highPriority) this.offlineSendQueue.unshift(resolve);
|
|
||||||
else this.offlineSendQueue.push(resolve);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Close the socket connection to discord if present. */
|
|
||||||
close(code: number, reason: string): void {
|
|
||||||
if (this.socket?.readyState !== WebSocket.OPEN) return;
|
|
||||||
|
|
||||||
this.socket?.close(code, reason);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Connect the shard with the gateway and start heartbeating. This will not identify the shard to the gateway. */
|
|
||||||
async connect(): Promise<Shard> {
|
|
||||||
// Only set the shard to `Connecting` state,
|
|
||||||
// if the connection request does not come from an identify or resume action.
|
|
||||||
if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) {
|
|
||||||
this.state = ShardState.Connecting;
|
|
||||||
}
|
|
||||||
|
|
||||||
const url = new URL(this.connectionUrl);
|
|
||||||
url.searchParams.set('v', this.connection.version.toString());
|
|
||||||
url.searchParams.set('enconding', 'json');
|
|
||||||
|
|
||||||
this.socket = new WebSocket(url.toString());
|
|
||||||
|
|
||||||
this.socket.onerror = (event) => this.logger.info({ error: event, shardId: this.id });
|
|
||||||
this.socket.onclose = async (event) => await this.handleClose(event);
|
|
||||||
this.socket.onmessage = async (message) => await this.handleMessage(message);
|
|
||||||
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
this.socket!.onopen = () => {
|
|
||||||
// Only set the shard to `Unidentified` state,
|
|
||||||
// if the connection request does not come from an identify or resume action.
|
|
||||||
if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) {
|
|
||||||
this.state = ShardState.Unidentified;
|
|
||||||
}
|
|
||||||
// event connected
|
|
||||||
resolve(this);
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Identify the shard to the gateway. If not connected, this will also connect the shard to the gateway. */
|
|
||||||
async identify() {
|
|
||||||
// A new identify has been requested even though there is already a connection open.
|
|
||||||
// Therefore we need to close the old connection and heartbeating before creating a new one.
|
|
||||||
if (this.isOpen) {
|
|
||||||
this.logger.info(`Closing existing shard: #${this.id}`);
|
|
||||||
this.close(ShardSocketCloseCodes.ReIdentifying, 'Re-identifying closure of old connection');
|
|
||||||
}
|
|
||||||
this.state = ShardState.Identifying;
|
|
||||||
// identifying
|
|
||||||
// It is possible that the shard is in Heartbeating state but not identified,
|
|
||||||
// so check whether there is already a gateway connection existing.
|
|
||||||
// If not we need to create one before we identify.
|
|
||||||
if (!this.isOpen) await this.connect();
|
|
||||||
|
|
||||||
this.send(
|
|
||||||
{
|
|
||||||
op: GatewayOpcodes.Identify,
|
|
||||||
d: {
|
|
||||||
token: `Bot ${this.connection.token}`,
|
|
||||||
properties: this.connection.properties,
|
|
||||||
intents: this.connection.intents,
|
|
||||||
shard: [this.id, this.connection.totalShards],
|
|
||||||
presence: this.connection.presence
|
|
||||||
}
|
|
||||||
},
|
|
||||||
true
|
|
||||||
);
|
|
||||||
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
this.resolves.set('READY', () => {
|
|
||||||
// event idenfity
|
|
||||||
this.shardIsReady();
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
|
|
||||||
// When identifying too fast,
|
|
||||||
// Discord sends an invalid session payload.
|
|
||||||
// This can safely be ignored though and the shard starts a new identify action.
|
|
||||||
this.resolves.set('INVALID_SESSION', () => {
|
|
||||||
this.resolves.delete('READY');
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
async resume() {
|
|
||||||
this.logger.info(`Resuming Shard #${this.id}`);
|
|
||||||
// It has been requested to resume the Shards session.
|
|
||||||
// It's possible that the shard is still connected with Discord's gateway therefore we need to forcefully close it.
|
|
||||||
if (this.isOpen) {
|
|
||||||
this.logger.info(`Resuming Shard #${this.id} in isOpen`);
|
|
||||||
this.close(ShardSocketCloseCodes.ResumeClosingOldConnection, 'Reconnecting the shard, closing old connection.');
|
|
||||||
}
|
|
||||||
|
|
||||||
// Shard has never identified, so we cannot resume.
|
|
||||||
if (!this.sessionId) {
|
|
||||||
this.logger.info(`Trying to resume a shard #${this.id} that was NOT first identified. (No session id found)`);
|
|
||||||
|
|
||||||
return await this.identify();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.state = ShardState.Resuming;
|
|
||||||
|
|
||||||
this.logger.info(`Resuming Shard #${this.id}, before connecting`);
|
|
||||||
// Before we can resume, we need to create a new connection with Discord's gateway.
|
|
||||||
await this.connect();
|
|
||||||
this.logger.info(
|
|
||||||
`Resuming Shard #${this.id}, after connecting. ${this.connection.token} | ${this.sessionId} | ${this.previousSequenceNumber}`
|
|
||||||
);
|
|
||||||
|
|
||||||
this.send(
|
|
||||||
{
|
|
||||||
op: GatewayOpcodes.Resume,
|
|
||||||
d: {
|
|
||||||
token: `Bot ${this.connection.token}`,
|
|
||||||
session_id: this.sessionId,
|
|
||||||
seq: this.previousSequenceNumber ?? 0
|
|
||||||
}
|
|
||||||
},
|
|
||||||
true
|
|
||||||
);
|
|
||||||
this.logger.info(`Resuming Shard #${this.id} after send resumg`);
|
|
||||||
|
|
||||||
return new Promise<void>((resolve) => {
|
|
||||||
this.resolves.set('RESUMED', () => resolve());
|
|
||||||
// If it is attempted to resume with an invalid session id,
|
|
||||||
// Discord sends an invalid session payload
|
|
||||||
// Not erroring here since it is easy that this happens, also it would be not catchable
|
|
||||||
this.resolves.set('INVALID_SESSION', () => {
|
|
||||||
this.resolves.delete('RESUMED');
|
|
||||||
resolve();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send a message to Discord Gateway.
|
|
||||||
* @param highPriority [highPriority=false] - Whether this message should be send asap.
|
|
||||||
*/
|
|
||||||
async send(message: GatewaySendPayload, highPriority = false) {
|
|
||||||
// Before acquiring a token from the bucket, check whether the shard is currently offline or not.
|
|
||||||
// Else bucket and token wait time just get wasted.
|
|
||||||
await this.checkOffline(highPriority);
|
|
||||||
|
|
||||||
await this.bucket.acquire(highPriority);
|
|
||||||
|
|
||||||
// It's possible, that the shard went offline after a token has been acquired from the bucket.
|
|
||||||
await this.checkOffline(highPriority);
|
|
||||||
|
|
||||||
this.socket?.send(JSON.stringify(message));
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Shutdown the this. Forcefully disconnect the shard from Discord. The shard may not attempt to reconnect with Discord. */
|
|
||||||
async shutdown(): Promise<void> {
|
|
||||||
this.close(ShardSocketCloseCodes.Shutdown, 'Shard shutting down.');
|
|
||||||
this.state = ShardState.Offline;
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Handle a gateway connection close. */
|
|
||||||
async handleClose(close: CloseEvent) {
|
|
||||||
this.stopHeartbeating();
|
|
||||||
|
|
||||||
switch (close.code) {
|
|
||||||
case ShardSocketCloseCodes.TestingFinished:
|
|
||||||
this.state = ShardState.Offline;
|
|
||||||
// disconnected event
|
|
||||||
return;
|
|
||||||
// On these codes a manual start will be done.
|
|
||||||
case ShardSocketCloseCodes.Shutdown:
|
|
||||||
case ShardSocketCloseCodes.ReIdentifying:
|
|
||||||
case ShardSocketCloseCodes.Resharded:
|
|
||||||
case ShardSocketCloseCodes.ResumeClosingOldConnection:
|
|
||||||
case ShardSocketCloseCodes.ZombiedConnection:
|
|
||||||
this.state = ShardState.Offline;
|
|
||||||
// disconnected event
|
|
||||||
return;
|
|
||||||
// Gateway connection closes which require a new identify.
|
|
||||||
case GatewayCloseCodes.UnknownOpcode:
|
|
||||||
case GatewayCloseCodes.NotAuthenticated:
|
|
||||||
case GatewayCloseCodes.InvalidSeq:
|
|
||||||
case GatewayCloseCodes.RateLimited:
|
|
||||||
case GatewayCloseCodes.SessionTimedOut:
|
|
||||||
this.logger.info(`Gateway connection closing requiring re-identify. Code: ${close.code}`);
|
|
||||||
this.state = ShardState.Identifying;
|
|
||||||
// disconnected event
|
|
||||||
// @ts-expect-error identify
|
|
||||||
return this.idenfity();
|
|
||||||
// When these codes are received something went really wrong.
|
|
||||||
// On those we cannot start a reconnect attempt.
|
|
||||||
case GatewayCloseCodes.AuthenticationFailed:
|
|
||||||
case GatewayCloseCodes.InvalidShard:
|
|
||||||
case GatewayCloseCodes.ShardingRequired:
|
|
||||||
case GatewayCloseCodes.InvalidAPIVersion:
|
|
||||||
case GatewayCloseCodes.InvalidIntents:
|
|
||||||
case GatewayCloseCodes.DisallowedIntents:
|
|
||||||
this.state = ShardState.Offline;
|
|
||||||
// disconnected event
|
|
||||||
throw new Error(close.reason || 'Discord gave no reason! GG! You broke Discord!');
|
|
||||||
default:
|
|
||||||
this.logger.info(`Closed shard #${this.id}. Resuming...`);
|
|
||||||
// disconnected event
|
|
||||||
return this.resume();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Handles a incoming gateway packet. */
|
|
||||||
async handleDiscordPacket(packet: GatewayReceivePayload) {
|
|
||||||
// Edge case start: https://github.com/discordeno/discordeno/issues/2311
|
|
||||||
this.heart.lastAck = Date.now();
|
|
||||||
|
|
||||||
// Manually calculating the round trip time for users who need it.
|
|
||||||
if (this.heart.lastBeat && !this.heart.acknowledged) {
|
|
||||||
this.heart.rtt = this.heart.lastAck - this.heart.lastBeat;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.heart.acknowledged = true;
|
|
||||||
|
|
||||||
switch (packet.op) {
|
|
||||||
case GatewayOpcodes.Heartbeat:
|
|
||||||
if (!this.isOpen) return await this.resume();
|
|
||||||
this.heart.lastBeat = Date.now();
|
|
||||||
// Discord randomly sends this requiring an immediate heartbeat back.
|
|
||||||
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
|
||||||
this.socket?.send(
|
|
||||||
JSON.stringify({
|
|
||||||
op: GatewayOpcodes.Heartbeat,
|
|
||||||
d: this.previousSequenceNumber
|
|
||||||
})
|
|
||||||
);
|
|
||||||
// hearbeat event
|
|
||||||
break;
|
|
||||||
case GatewayOpcodes.Hello: {
|
|
||||||
const interval = packet.d.heartbeat_interval;
|
|
||||||
this.logger.info(`Hello on Shard #${this.id}`);
|
|
||||||
this.startHeartbeating(interval);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.handlePayload(this.id, packet);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Handle an incoming gateway message. */
|
|
||||||
async handleMessage(message: MessageEvent) {
|
|
||||||
const preProcessMessage = message.data;
|
|
||||||
|
|
||||||
if (typeof preProcessMessage !== 'string') return;
|
|
||||||
|
|
||||||
return await this.handleDiscordPacket(JSON.parse(preProcessMessage) as GatewayDispatchPayload);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Start sending heartbeat payloads to Discord in the provided interval. */
|
|
||||||
startHeartbeating(interval: number) {
|
|
||||||
this.logger.info(`Start heartbeating shard #${this.id}`);
|
|
||||||
// If old heartbeast exist like after resume, clear the old ones.
|
|
||||||
if (this.heart.intervalId) clearInterval(this.heart.intervalId);
|
|
||||||
if (this.heart.timeoutId) clearTimeout(this.heart.timeoutId);
|
|
||||||
|
|
||||||
this.heart.interval = interval;
|
|
||||||
|
|
||||||
// Only set the shard's state to `Unidentified`
|
|
||||||
// if heartbeating has not been started due to an identify or resume action.
|
|
||||||
if ([ShardState.Disconnected, ShardState.Offline].includes(this.state)) {
|
|
||||||
this.logger.info(`[tart Heartbeating Shard #${this.id} a`);
|
|
||||||
this.state = ShardState.Unidentified;
|
|
||||||
}
|
|
||||||
|
|
||||||
// The first heartbeat needs to be send with a random delay between `0` and `interval`
|
|
||||||
// Using a `setTimeout(_, jitter)` here to accomplish that.
|
|
||||||
// `Math.random()` can be `0` so we use `0.5` if this happens
|
|
||||||
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating
|
|
||||||
const jitter = Math.ceil(this.heart.interval * (Math.random() || 0.5));
|
|
||||||
this.heart.timeoutId = setTimeout(() => {
|
|
||||||
this.logger.info(`start hearting shard #${this.id} b`);
|
|
||||||
if (!this.isOpen) return;
|
|
||||||
this.logger.info(`start heartbeting shard #${this.id} c ${this.previousSequenceNumber}`);
|
|
||||||
|
|
||||||
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
|
||||||
this.socket?.send(
|
|
||||||
JSON.stringify({
|
|
||||||
op: GatewayOpcodes.Heartbeat,
|
|
||||||
d: this.previousSequenceNumber
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
this.logger.info(`start hearting shard #${this.id} d`);
|
|
||||||
this.heart.lastBeat = Date.now();
|
|
||||||
this.heart.acknowledged = false;
|
|
||||||
|
|
||||||
// After the random heartbeat jitter we can start a normal interval.
|
|
||||||
this.heart.intervalId = setInterval(async () => {
|
|
||||||
this.logger.info(`start heartbeating shard #${this.id} e`);
|
|
||||||
if (!this.isOpen) return;
|
|
||||||
this.logger.info(`start heartbeating shard #${this.id} f`);
|
|
||||||
|
|
||||||
// The Shard did not receive a heartbeat ACK from Discord in time,
|
|
||||||
// therefore we have to assume that the connection has failed or got "zombied".
|
|
||||||
// The Shard needs to start a re-identify action accordingly.
|
|
||||||
// Reference: https://discord.com/developers/docs/topics/gateway#heartbeating-example-gateway-heartbeat-ack
|
|
||||||
if (!this.heart.acknowledged) {
|
|
||||||
this.logger.info(`heartbeat not acknowledged for shard #${this.id}.`);
|
|
||||||
this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection, did not receive an heartbeat ACK in time.');
|
|
||||||
return await this.identify();
|
|
||||||
}
|
|
||||||
|
|
||||||
this.heart.acknowledged = false;
|
|
||||||
|
|
||||||
this.logger.info(`start Heartbeating Shard #${this.id} g`);
|
|
||||||
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
|
||||||
this.socket?.send(
|
|
||||||
JSON.stringify({
|
|
||||||
op: GatewayOpcodes.Heartbeat,
|
|
||||||
d: this.previousSequenceNumber
|
|
||||||
})
|
|
||||||
);
|
|
||||||
this.heart.lastBeat = Date.now();
|
|
||||||
|
|
||||||
// heartbeat event
|
|
||||||
}, this.heart.interval);
|
|
||||||
}, jitter);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** Stop the heartbeating process with discord. */
|
|
||||||
stopHeartbeating(): void {
|
|
||||||
// Clear the regular heartbeat interval.
|
|
||||||
clearInterval(this.heart.intervalId);
|
|
||||||
// It's possible that the Shard got closed before the first jittered heartbeat.
|
|
||||||
// To go safe we should clear the related timeout too.
|
|
||||||
clearTimeout(this.heart.timeoutId);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Fetches the list of members for a guild over the gateway.
|
|
||||||
*
|
|
||||||
* @param options - The parameters for the fetching of the members.
|
|
||||||
*
|
|
||||||
* @remarks
|
|
||||||
* If requesting the entire member list:
|
|
||||||
* - Requires the `GUILD_MEMBERS` intent.
|
|
||||||
*
|
|
||||||
* Fires a _Guild Members Chunk_ gateway event for every 1000 members fetched.
|
|
||||||
*
|
|
||||||
* @see {@link https://discord.com/developers/docs/topics/gateway#request-guild-members}
|
|
||||||
*/
|
|
||||||
async requestMembers(options: { guild_id: string } & Partial<RequestGuildMembersOptions>): Promise<APIGuildMember[]> {
|
|
||||||
// You can request 1 member without the intent
|
|
||||||
// Check if intents is not 0 as proxy ws won't set intents in other instances
|
|
||||||
if (this.connection.intents && (!options.limit || options.limit > 1) && !(this.connection.intents & GatewayIntentBits.GuildMembers))
|
|
||||||
throw new Error('MISSING_INTENT_GUILD_MEMBERS');
|
|
||||||
if (options.user_ids?.length) {
|
|
||||||
this.logger.info(
|
|
||||||
`requestMembers guildId: ${options.guild_id} -> setting user limit based on userIds length: ${options.user_ids.length}`
|
|
||||||
);
|
|
||||||
options.limit = options.user_ids.length;
|
|
||||||
}
|
|
||||||
|
|
||||||
const nonce = `${options.guild_id}-${Date.now()}`;
|
|
||||||
|
|
||||||
// Gateway does not require caching these requests so directly send and return
|
|
||||||
if (!this.cache.requestMembers.enabled) {
|
|
||||||
this.logger.info(`requestMembers guildId: ${options.guild_id} -> skipping cache -> options ${JSON.stringify(options)}`);
|
|
||||||
|
|
||||||
await this.send({
|
|
||||||
op: GatewayOpcodes.RequestGuildMembers,
|
|
||||||
d: {
|
|
||||||
guild_id: options.guild_id,
|
|
||||||
// @ts-expect-error
|
|
||||||
// If a query is provided use it, OR if a limit is NOT provided use ""
|
|
||||||
query: options.query ?? (options.limit ? undefined : ''),
|
|
||||||
limit: options.limit ?? 0,
|
|
||||||
presences: options.presences ?? false,
|
|
||||||
user_ids: options.user_ids,
|
|
||||||
nonce
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return [];
|
|
||||||
}
|
|
||||||
return new Promise((resolve) => {
|
|
||||||
this.cache.requestMembers.pending.set(nonce, {
|
|
||||||
nonce,
|
|
||||||
resolve,
|
|
||||||
members: []
|
|
||||||
});
|
|
||||||
this.logger.info(`requestMembers options.guild_id: ${options.guild_id} -> requesting members -> data: ${JSON.stringify(options)}`);
|
|
||||||
this.send({
|
|
||||||
op: GatewayOpcodes.RequestGuildMembers,
|
|
||||||
d: {
|
|
||||||
guild_id: options.guild_id,
|
|
||||||
// @ts-expect-error
|
|
||||||
// If a query is provided use it, OR if a limit is NOT provided use ""
|
|
||||||
query: options?.query ?? (options?.limit ? undefined : ''),
|
|
||||||
limit: options?.limit ?? 0,
|
|
||||||
presences: options?.presences ?? false,
|
|
||||||
user_ids: options?.user_ids,
|
|
||||||
nonce
|
|
||||||
}
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connects the bot user to a voice or stage channel.
|
|
||||||
*
|
|
||||||
* This function sends the _Update Voice State_ gateway command over the gateway behind the scenes.
|
|
||||||
*
|
|
||||||
* @remarks
|
|
||||||
* Requires the `CONNECT` permission.
|
|
||||||
*
|
|
||||||
* Fires a _Voice State Update_ gateway event.
|
|
||||||
*
|
|
||||||
* @see {@link https://discord.com/developers/docs/topics/gateway#update-voice-state}
|
|
||||||
*/
|
|
||||||
async joinVoiceChannel({ guild_id, channel_id, self_deaf, self_mute }: JoinVoiceOptions) {
|
|
||||||
this.logger.info(`joinVoiceChannel guildId: ${guild_id} channelId: ${channel_id}`);
|
|
||||||
await this.send({
|
|
||||||
op: GatewayOpcodes.VoiceStateUpdate,
|
|
||||||
d: {
|
|
||||||
guild_id,
|
|
||||||
channel_id,
|
|
||||||
self_mute: Boolean(self_mute),
|
|
||||||
self_deaf: self_deaf ?? true
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Leaves the voice channel the bot user is currently in.
|
|
||||||
*
|
|
||||||
* This function sends the _Update Voice State_ gateway command over the gateway behind the scenes.
|
|
||||||
*
|
|
||||||
* @param guildId - The ID of the guild the voice channel to leave is in.
|
|
||||||
*
|
|
||||||
* @remarks
|
|
||||||
* Fires a _Voice State Update_ gateway event.
|
|
||||||
*
|
|
||||||
* @see {@link https://discord.com/developers/docs/topics/gateway#update-voice-state}
|
|
||||||
*/
|
|
||||||
async leaveVoiceChannel(guild_id: string) {
|
|
||||||
this.logger.info(`leaveVoiceChannel guildId: ${guild_id} Shard ${this.id}`);
|
|
||||||
await this.send({
|
|
||||||
op: GatewayOpcodes.VoiceStateUpdate,
|
|
||||||
d: {
|
|
||||||
guild_id,
|
|
||||||
channel_id: null,
|
|
||||||
self_deaf: false,
|
|
||||||
self_mute: false
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Edits the bot's status on one shard.
|
|
||||||
*
|
|
||||||
* @param shardId The shard id to edit the status for.
|
|
||||||
* @param data The status data to set the bots status to.
|
|
||||||
*/
|
|
||||||
async editShardStatus(data: Required<StatusUpdate>) {
|
|
||||||
this.logger.info(`editShardStatus shardId: ${this.id} -> data: ${JSON.stringify(data)}`);
|
|
||||||
await this.send({
|
|
||||||
op: GatewayOpcodes.PresenceUpdate,
|
|
||||||
d: {
|
|
||||||
since: null,
|
|
||||||
afk: false,
|
|
||||||
activities: data.activities,
|
|
||||||
status: data.status
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/** This function communicates with the management process, in order to know whether its free to identify. When this function resolves, this means that the shard is allowed to send an identify payload to discord. */
|
|
||||||
async requestIdentify(): Promise<void> {}
|
|
||||||
|
|
||||||
/** This function communicates with the management process, in order to tell it can identify the next shard. */
|
|
||||||
async shardIsReady(): Promise<void> {}
|
|
||||||
}
|
|
@ -1,80 +0,0 @@
|
|||||||
import type { GatewayPresenceUpdateData, GatewayReceivePayload, Logger } from '@biscuitland/common';
|
|
||||||
|
|
||||||
export interface ShardGatewayConfig {
|
|
||||||
/** The calculated intent value of the events which the shard should receive.
|
|
||||||
*
|
|
||||||
* @default 0
|
|
||||||
*/
|
|
||||||
intents: number;
|
|
||||||
/** Identify properties to use */
|
|
||||||
properties: {
|
|
||||||
/** Operating system the shard runs on.
|
|
||||||
*
|
|
||||||
* @default "darwin" | "linux" | "windows"
|
|
||||||
*/
|
|
||||||
os: string;
|
|
||||||
/** The "browser" where this shard is running on.
|
|
||||||
*
|
|
||||||
* @default "Biscuit"
|
|
||||||
*/
|
|
||||||
browser: string;
|
|
||||||
/** The device on which the shard is running.
|
|
||||||
*
|
|
||||||
* @default "Biscuit"
|
|
||||||
*/
|
|
||||||
device: string;
|
|
||||||
};
|
|
||||||
/** Bot token which is used to connect to Discord */
|
|
||||||
token: string;
|
|
||||||
/** The URL of the gateway which should be connected to.
|
|
||||||
*
|
|
||||||
* @default "wss://gateway.discord.gg"
|
|
||||||
*/
|
|
||||||
url: string;
|
|
||||||
/** The gateway version which should be used.
|
|
||||||
*
|
|
||||||
* @default 10
|
|
||||||
*/
|
|
||||||
version: number;
|
|
||||||
/**
|
|
||||||
* The total number of shards to connect to across the entire bot.
|
|
||||||
* @default 1
|
|
||||||
*/
|
|
||||||
totalShards: number;
|
|
||||||
|
|
||||||
presence?: GatewayPresenceUpdateData;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ShardCreateOptions {
|
|
||||||
/** The shard id */
|
|
||||||
id: number;
|
|
||||||
/** The connection details */
|
|
||||||
connection: ShardGatewayConfig;
|
|
||||||
/** The payload handlers for messages on the shard. */
|
|
||||||
handlePayload: (shardId: number, data: GatewayReceivePayload) => Promise<unknown>;
|
|
||||||
/** The handler to request a space to make an identify request. */
|
|
||||||
requestIdentify?: () => Promise<void>;
|
|
||||||
/** The handler to alert the gateway manager that this shard has received a READY event. */
|
|
||||||
shardIsReady?: () => Promise<void>;
|
|
||||||
logger: Logger;
|
|
||||||
}
|
|
||||||
|
|
||||||
export interface ShardHeart {
|
|
||||||
/** Whether or not the heartbeat was acknowledged by Discord in time. */
|
|
||||||
acknowledged: boolean;
|
|
||||||
/** Interval between heartbeats requested by Discord. */
|
|
||||||
interval: number;
|
|
||||||
/** Id of the interval, which is used for sending the heartbeats. */
|
|
||||||
intervalId?: NodeJS.Timer;
|
|
||||||
/** Unix (in milliseconds) timestamp when the last heartbeat ACK was received from Discord. */
|
|
||||||
lastAck?: number;
|
|
||||||
/** Unix timestamp (in milliseconds) when the last heartbeat was sent. */
|
|
||||||
lastBeat?: number;
|
|
||||||
/** Round trip time (in milliseconds) from Shard to Discord and back.
|
|
||||||
* Calculated using the heartbeat system.
|
|
||||||
* Note: this value is undefined until the first heartbeat to Discord has happened.
|
|
||||||
*/
|
|
||||||
rtt?: number;
|
|
||||||
/** Id of the timeout which is used for sending the first heartbeat to Discord since it's "special". */
|
|
||||||
timeoutId?: NodeJS.Timeout;
|
|
||||||
}
|
|
@ -1,46 +1,81 @@
|
|||||||
import { delay, Logger } from '@biscuitland/common';
|
import { Logger, delay } from "@biscuitland/common";
|
||||||
|
import { PriorityQueue } from "./lists";
|
||||||
|
|
||||||
export class LeakyBucket implements LeakyBucketOptions {
|
/**
|
||||||
max: number;
|
* just any kind of request to queue and resolve later
|
||||||
|
*/
|
||||||
|
export type QueuedRequest = (value: void | Promise<void>) => Promise<unknown> | any;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* options of the dynamic bucket
|
||||||
|
*/
|
||||||
|
export interface DynamicBucketOptions {
|
||||||
|
limit: number;
|
||||||
|
refillInterval: number;
|
||||||
|
refillAmount: number;
|
||||||
|
debug?: boolean;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* generally useless for interaction based bots
|
||||||
|
* ideally this would only be triggered on certain paths
|
||||||
|
* example: a huge amount of messages being spammed
|
||||||
|
*
|
||||||
|
* a dynamic bucket is just a priority queue implemented using linked lists
|
||||||
|
* we create an empty bucket for every path
|
||||||
|
* dynamically allocating memory improves the final memory footprint
|
||||||
|
*/
|
||||||
|
export class DynamicBucket {
|
||||||
|
limit: number;
|
||||||
refillInterval: number;
|
refillInterval: number;
|
||||||
refillAmount: number;
|
refillAmount: number;
|
||||||
|
|
||||||
|
/** The queue of requests to acquire an available request. Mapped by <shardId, resolve()> */
|
||||||
|
queue: PriorityQueue<QueuedRequest> = new PriorityQueue();
|
||||||
|
|
||||||
/** The amount of requests that have been used up already. */
|
/** The amount of requests that have been used up already. */
|
||||||
used = 0;
|
used = 0;
|
||||||
/** The queue of requests to acquire an available request. Mapped by <shardId, resolve()> */
|
|
||||||
queue: Array<(value: void | PromiseLike<void>) => void> = [];
|
|
||||||
/** Whether or not the queue is already processing. */
|
/** Whether or not the queue is already processing. */
|
||||||
processing = false;
|
processing = false;
|
||||||
|
|
||||||
/** The timeout id for the timer to reduce the used amount by the refill amount. */
|
/** The timeout id for the timer to reduce the used amount by the refill amount. */
|
||||||
timeoutId?: NodeJS.Timeout;
|
timeoutId?: NodeJS.Timeout;
|
||||||
|
|
||||||
/** The timestamp in milliseconds when the next refill is scheduled. */
|
/** The timestamp in milliseconds when the next refill is scheduled. */
|
||||||
refillsAt?: number;
|
refillsAt?: number;
|
||||||
|
|
||||||
logger = new Logger({ name: 'BiscuitWS' });
|
logger: Logger;
|
||||||
|
|
||||||
constructor(options?: LeakyBucketOptions) {
|
constructor(options: DynamicBucketOptions) {
|
||||||
this.max = options?.max ?? 1;
|
this.limit = options.limit;
|
||||||
this.refillAmount = options?.refillAmount ? (options.refillAmount > this.max ? this.max : options.refillAmount) : 1;
|
this.refillInterval = options.refillInterval;
|
||||||
this.refillInterval = options?.refillInterval ?? 5000;
|
this.refillAmount = options.refillAmount;
|
||||||
|
this.logger = new Logger({
|
||||||
|
name: "[Gateway]",
|
||||||
|
active: options.debug,
|
||||||
|
logLevel: 0,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
/** The amount of requests that still remain. */
|
|
||||||
get remaining(): number {
|
get remaining(): number {
|
||||||
return this.max < this.used ? 0 : this.max - this.used;
|
if (this.limit < this.used) return 0;
|
||||||
|
else return this.limit - this.used;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Refills the bucket as needed. */
|
refill(): void {
|
||||||
refillBucket(): void {
|
|
||||||
this.logger.debug('[LeakyBucket] Timeout for leaky bucket requests executed. Refilling bucket.');
|
|
||||||
// Lower the used amount by the refill amount
|
// Lower the used amount by the refill amount
|
||||||
this.used = this.refillAmount > this.used ? 0 : this.used - this.refillAmount;
|
this.used = this.refillAmount > this.used ? 0 : this.used - this.refillAmount;
|
||||||
|
|
||||||
// Reset the refillsAt timestamp since it just got refilled
|
// Reset the refillsAt timestamp since it just got refilled
|
||||||
this.refillsAt = undefined;
|
this.refillsAt = undefined;
|
||||||
|
|
||||||
if (this.used > 0) {
|
if (this.used > 0) {
|
||||||
if (this.timeoutId) clearTimeout(this.timeoutId);
|
if (this.timeoutId) {
|
||||||
|
clearTimeout(this.timeoutId);
|
||||||
|
}
|
||||||
this.timeoutId = setTimeout(() => {
|
this.timeoutId = setTimeout(() => {
|
||||||
this.refillBucket();
|
this.refill();
|
||||||
}, this.refillInterval);
|
}, this.refillInterval);
|
||||||
this.refillsAt = Date.now() + this.refillInterval;
|
this.refillsAt = Date.now() + this.refillInterval;
|
||||||
}
|
}
|
||||||
@ -48,26 +83,28 @@ export class LeakyBucket implements LeakyBucketOptions {
|
|||||||
|
|
||||||
/** Begin processing the queue. */
|
/** Begin processing the queue. */
|
||||||
async processQueue(): Promise<void> {
|
async processQueue(): Promise<void> {
|
||||||
this.logger.debug('[Gateway] Processing queue');
|
|
||||||
// There is already a queue that is processing
|
// There is already a queue that is processing
|
||||||
if (this.processing) return this.logger.debug('[Gateway] Queue is already processing.');
|
if (this.processing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Begin going through the queue.
|
// Begin going through the queue.
|
||||||
while (this.queue.length) {
|
while (!this.queue.isEmpty()) {
|
||||||
if (this.remaining) {
|
if (this.remaining) {
|
||||||
this.logger.debug(`[LeakyBucket] Processing queue. Remaining: ${this.remaining} Length: ${this.queue.length}`);
|
this.logger.debug(`Processing queue. Remaining: ${this.remaining} Length: ${this.queue.size()}`);
|
||||||
// Resolves the promise allowing the paused execution of this request to resolve and continue.
|
// Resolves the promise allowing the paused execution of this request to resolve and continue.
|
||||||
this.queue.shift()?.();
|
this.queue.peek()();
|
||||||
|
this.queue.pop();
|
||||||
|
|
||||||
// A request can be made
|
// A request can be made
|
||||||
this.used++;
|
this.used++;
|
||||||
|
|
||||||
// Create a new timeout for this request if none exists.
|
// Create a new timeout for this request if none exists.
|
||||||
if (!this.timeoutId) {
|
if (!this.timeoutId) {
|
||||||
this.logger.debug('[LeakyBucket] Creating new timeout for leaky bucket requests.');
|
|
||||||
|
|
||||||
this.timeoutId = setTimeout(() => {
|
this.timeoutId = setTimeout(() => {
|
||||||
this.refillBucket();
|
this.refill();
|
||||||
}, this.refillInterval);
|
}, this.refillInterval);
|
||||||
|
|
||||||
// Set the time for when this refill will occur.
|
// Set the time for when this refill will occur.
|
||||||
this.refillsAt = Date.now() + this.refillInterval;
|
this.refillsAt = Date.now() + this.refillInterval;
|
||||||
}
|
}
|
||||||
@ -78,9 +115,7 @@ export class LeakyBucket implements LeakyBucketOptions {
|
|||||||
const now = Date.now();
|
const now = Date.now();
|
||||||
// If there is time left until next refill, just delay execution.
|
// If there is time left until next refill, just delay execution.
|
||||||
if (this.refillsAt > now) {
|
if (this.refillsAt > now) {
|
||||||
this.logger.debug(`[LeakyBucket] Delaying execution of leaky bucket requests for ${this.refillsAt - now}ms`);
|
|
||||||
await delay(this.refillsAt - now);
|
await delay(this.refillsAt - now);
|
||||||
this.logger.debug('[LeakyBucket] Resuming execution');
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -90,33 +125,14 @@ export class LeakyBucket implements LeakyBucketOptions {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/** Pauses the execution until the request is available to be made. */
|
/** Pauses the execution until the request is available to be made. */
|
||||||
async acquire(highPriority?: boolean): Promise<void> {
|
async acquire(priority: number): Promise<void> {
|
||||||
return await new Promise((resolve) => {
|
return await new Promise((resolve) => {
|
||||||
// High priority requests get added to the start of the queue
|
this.queue.push(resolve, priority);
|
||||||
if (highPriority) this.queue.unshift(resolve);
|
|
||||||
// All other requests get pushed to the end.
|
|
||||||
else this.queue.push(resolve);
|
|
||||||
|
|
||||||
// Each request should trigger the queue to be processesd.
|
|
||||||
void this.processQueue();
|
void this.processQueue();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
export interface LeakyBucketOptions {
|
toString() {
|
||||||
/**
|
return [...this.queue].toString();
|
||||||
* Max requests allowed at once.
|
}
|
||||||
* @default 1
|
|
||||||
*/
|
|
||||||
max?: number;
|
|
||||||
/**
|
|
||||||
* Interval in milliseconds between refills.
|
|
||||||
* @default 5000
|
|
||||||
*/
|
|
||||||
refillInterval?: number;
|
|
||||||
/**
|
|
||||||
* Amount of requests to refill at each interval.
|
|
||||||
* @default 1
|
|
||||||
*/
|
|
||||||
refillAmount?: number;
|
|
||||||
}
|
}
|
3
packages/ws/src/structures/index.ts
Normal file
3
packages/ws/src/structures/index.ts
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
export * from "./dynamic_bucket";
|
||||||
|
export * from "./lists";
|
||||||
|
export * from "./sequential_bucket";
|
153
packages/ws/src/structures/lists.ts
Normal file
153
packages/ws/src/structures/lists.ts
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
/**
|
||||||
|
* abstract node lol
|
||||||
|
*/
|
||||||
|
export interface AbstractNode<T> {
|
||||||
|
data: T;
|
||||||
|
next: this | null;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueuePusher<T> {
|
||||||
|
push(data: T): NonNullable<TNode<T>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueuePusherWithPriority<T> {
|
||||||
|
push(data: T, priority: number): NonNullable<PNode<T>>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export class TNode<T> implements AbstractNode<T> {
|
||||||
|
data: T;
|
||||||
|
next: this | null;
|
||||||
|
|
||||||
|
constructor(data: T) {
|
||||||
|
this.data = data;
|
||||||
|
this.next = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
static null<T>(list: AbstractNode<T> | null): list is null {
|
||||||
|
return !list;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class PNode<T> extends TNode<T> {
|
||||||
|
priority: number;
|
||||||
|
|
||||||
|
constructor(data: T, priority: number) {
|
||||||
|
super(data);
|
||||||
|
this.priority = priority;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export abstract class Queue<T> {
|
||||||
|
protected abstract head: AbstractNode<T> | null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(1)
|
||||||
|
*/
|
||||||
|
public pop() {
|
||||||
|
if (TNode.null(this.head)) {
|
||||||
|
throw new Error("cannot pop a list without elements");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (this.head = this.head.next);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(1)
|
||||||
|
*/
|
||||||
|
public peek(): T {
|
||||||
|
if (TNode.null(this.head)) {
|
||||||
|
throw new Error("cannot peek an empty list");
|
||||||
|
}
|
||||||
|
|
||||||
|
return this.head.data;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(n)
|
||||||
|
*/
|
||||||
|
public size(): number {
|
||||||
|
let aux = this.head;
|
||||||
|
|
||||||
|
if (TNode.null(aux)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
let count = 1;
|
||||||
|
|
||||||
|
while (aux.next !== null) {
|
||||||
|
count++;
|
||||||
|
aux = aux.next;
|
||||||
|
}
|
||||||
|
|
||||||
|
return count;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(1)
|
||||||
|
*/
|
||||||
|
public isEmpty() {
|
||||||
|
return TNode.null(this.head);
|
||||||
|
}
|
||||||
|
|
||||||
|
*[Symbol.iterator](): IterableIterator<T> {
|
||||||
|
let temp = this.head;
|
||||||
|
while (temp !== null) {
|
||||||
|
yield temp.data;
|
||||||
|
temp = temp.next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public toArray(): T[] {
|
||||||
|
return Array.from(this);
|
||||||
|
}
|
||||||
|
public toString() {
|
||||||
|
return this.head?.toString() || "";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class LinkedList<T> extends Queue<T> implements QueuePusher<T> {
|
||||||
|
protected head: TNode<T> | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(1)
|
||||||
|
*/
|
||||||
|
public push(data: T): NonNullable<TNode<T>> {
|
||||||
|
const temp = new TNode<T>(data);
|
||||||
|
temp.next = this.head;
|
||||||
|
this.head = temp;
|
||||||
|
|
||||||
|
return this.head;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export class PriorityQueue<T> extends Queue<T> implements QueuePusherWithPriority<T> {
|
||||||
|
protected head: PNode<T> | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* O(#priorities)
|
||||||
|
*/
|
||||||
|
public push(data: T, priority: number): NonNullable<PNode<T>> {
|
||||||
|
let start = this.head;
|
||||||
|
const temp = new PNode(data, priority);
|
||||||
|
|
||||||
|
if (TNode.null(this.head) || TNode.null(start)) {
|
||||||
|
this.head = temp;
|
||||||
|
return this.head;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (this.head.priority > priority) {
|
||||||
|
temp.next = this.head;
|
||||||
|
this.head = temp;
|
||||||
|
return this.head;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (start.next !== null && start.next.priority < priority) {
|
||||||
|
start = start.next;
|
||||||
|
}
|
||||||
|
|
||||||
|
temp.next = start.next as PNode<T>;
|
||||||
|
start.next = temp;
|
||||||
|
|
||||||
|
return this.head;
|
||||||
|
}
|
||||||
|
}
|
54
packages/ws/src/structures/sequential_bucket.ts
Normal file
54
packages/ws/src/structures/sequential_bucket.ts
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
import { delay } from "@biscuitland/common";
|
||||||
|
import type { QueuedRequest } from "./dynamic_bucket";
|
||||||
|
import { LinkedList } from "./lists";
|
||||||
|
|
||||||
|
export class SequentialBucket {
|
||||||
|
private connections: LinkedList<QueuedRequest>;
|
||||||
|
private capacity: number; // max_concurrency
|
||||||
|
private spawnTimeout: number;
|
||||||
|
|
||||||
|
constructor(maxCapacity: number) {
|
||||||
|
this.connections = new LinkedList();
|
||||||
|
this.capacity = maxCapacity;
|
||||||
|
this.spawnTimeout = 5000;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async destroy() {
|
||||||
|
this.connections = new LinkedList();
|
||||||
|
}
|
||||||
|
|
||||||
|
public async push(promise: QueuedRequest) {
|
||||||
|
this.connections.push(promise);
|
||||||
|
|
||||||
|
if (this.capacity <= this.connections.size()) {
|
||||||
|
await this.acquire();
|
||||||
|
await delay(this.spawnTimeout);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
public async acquire(promises = this.connections) {
|
||||||
|
while (!promises.isEmpty()) {
|
||||||
|
const item = promises.peek();
|
||||||
|
item().catch((...args: any[]) => {
|
||||||
|
Promise.reject(...args);
|
||||||
|
});
|
||||||
|
promises.pop();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Promise.resolve(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static chunk<T>(array: T[], chunks: number): T[][] {
|
||||||
|
let index = 0;
|
||||||
|
let resIndex = 0;
|
||||||
|
const result = Array(Math.ceil(array.length / chunks));
|
||||||
|
|
||||||
|
while (index < array.length) {
|
||||||
|
result[resIndex] = array.slice(index, (index += chunks));
|
||||||
|
resIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user