feat: shard#ping method

This commit is contained in:
MARCROCK22 2024-05-30 18:54:51 +00:00
parent c409628a43
commit 5338248ff8
2 changed files with 386 additions and 365 deletions

View File

@ -1,38 +1,54 @@
import NodeWebSocket from 'ws'; import { randomUUID } from 'node:crypto';
import NodeWebSocket from 'ws';
export class BaseSocket {
private internal: NodeWebSocket | WebSocket; export class BaseSocket {
private internal: NodeWebSocket | WebSocket;
constructor(kind: 'ws' | 'bun', url: string) {
this.internal = kind === 'ws' ? new NodeWebSocket(url) : new WebSocket(url); constructor(kind: 'ws' | 'bun', url: string) {
} this.internal = kind === 'ws' ? new NodeWebSocket(url) : new WebSocket(url);
}
set onopen(callback: NodeWebSocket['onopen']) {
this.internal.onopen = callback; set onopen(callback: NodeWebSocket['onopen']) {
} this.internal.onopen = callback;
}
set onmessage(callback: NodeWebSocket['onmessage']) {
this.internal.onmessage = callback; set onmessage(callback: NodeWebSocket['onmessage']) {
} this.internal.onmessage = callback;
}
set onclose(callback: NodeWebSocket['onclose']) {
this.internal.onclose = callback; set onclose(callback: NodeWebSocket['onclose']) {
} this.internal.onclose = callback;
}
set onerror(callback: NodeWebSocket['onerror']) {
this.internal.onerror = callback; set onerror(callback: NodeWebSocket['onerror']) {
} this.internal.onerror = callback;
}
send(data: string) {
return this.internal.send(data); send(data: string) {
} return this.internal.send(data);
}
close(...args: Parameters<NodeWebSocket['close']>) {
// @ts-expect-error close(...args: Parameters<NodeWebSocket['close']>) {
return this.internal.close(...args); // @ts-expect-error
} return this.internal.close(...args);
}
get readyState() {
return this.internal.readyState; async ping() {
} if (!('ping' in this.internal)) throw new Error('Unexpected: Method ping not implemented');
} return new Promise<number>(res => {
const nonce = randomUUID();
const start = performance.now();
const listener = (data: Buffer) => {
if (data.toString() !== nonce) return;
(this.internal as NodeWebSocket).removeListener('pong', listener);
res(performance.now() - start);
};
(this.internal as NodeWebSocket).on('pong', listener);
(this.internal as NodeWebSocket).ping(nonce);
});
}
get readyState() {
return this.internal.readyState;
}
}

View File

@ -1,327 +1,332 @@
import type { GatewayReceivePayload, GatewaySendPayload } from 'discord-api-types/v10'; import type { GatewayReceivePayload, GatewaySendPayload } from 'discord-api-types/v10';
import { GatewayCloseCodes, GatewayDispatchEvents, GatewayOpcodes } from 'discord-api-types/v10'; import { GatewayCloseCodes, GatewayDispatchEvents, GatewayOpcodes } from 'discord-api-types/v10';
import { inflateSync } from 'node:zlib'; import { inflateSync } from 'node:zlib';
import type WS from 'ws'; import type WS from 'ws';
import { WebSocket, type CloseEvent, type ErrorEvent } from 'ws'; import { WebSocket, type CloseEvent, type ErrorEvent } from 'ws';
import type { Logger } from '../../common'; import type { Logger } from '../../common';
import { properties } from '../constants'; import { properties } from '../constants';
import { DynamicBucket } from '../structures'; import { DynamicBucket } from '../structures';
import { ConnectTimeout } from '../structures/timeout'; import { ConnectTimeout } from '../structures/timeout';
import { BaseSocket } from './basesocket'; import { BaseSocket } from './basesocket';
import type { ShardData, ShardOptions } from './shared'; import type { ShardData, ShardOptions } from './shared';
import { ShardSocketCloseCodes } from './shared'; import { ShardSocketCloseCodes } from './shared';
export interface ShardHeart { export interface ShardHeart {
interval: number; interval: number;
nodeInterval?: NodeJS.Timeout; nodeInterval?: NodeJS.Timeout;
lastAck?: number; lastAck?: number;
lastBeat?: number; lastBeat?: number;
ack: boolean; ack: boolean;
} }
export class Shard { export class Shard {
debugger?: Logger; debugger?: Logger;
data: Partial<ShardData> | ShardData = { data: Partial<ShardData> | ShardData = {
resumeSeq: null, resumeSeq: null,
}; };
websocket: BaseSocket | null = null; websocket: BaseSocket | null = null;
connectTimeout = new ConnectTimeout(); connectTimeout = new ConnectTimeout();
heart: ShardHeart = { heart: ShardHeart = {
interval: 30e3, interval: 30e3,
ack: true, ack: true,
}; };
bucket: DynamicBucket; bucket: DynamicBucket;
offlineSendQueue: ((_?: unknown) => void)[] = []; offlineSendQueue: ((_?: unknown) => void)[] = [];
constructor( constructor(
public id: number, public id: number,
public options: ShardOptions, public options: ShardOptions,
) { ) {
this.options.ratelimitOptions ??= { this.options.ratelimitOptions ??= {
rateLimitResetInterval: 60_000, rateLimitResetInterval: 60_000,
maxRequestsPerRateLimitTick: 120, maxRequestsPerRateLimitTick: 120,
}; };
if (options.debugger) this.debugger = options.debugger; if (options.debugger) this.debugger = options.debugger;
const safe = this.calculateSafeRequests(); const safe = this.calculateSafeRequests();
this.bucket = new DynamicBucket({ refillInterval: 6e4, limit: safe, debugger: options.debugger }); this.bucket = new DynamicBucket({ refillInterval: 6e4, limit: safe, debugger: options.debugger });
} }
get latency() { get latency() {
return this.heart.lastAck && this.heart.lastBeat return this.heart.lastAck && this.heart.lastBeat
? this.heart.lastAck - this.heart.lastBeat ? this.heart.lastAck - this.heart.lastBeat
: Number.POSITIVE_INFINITY; : Number.POSITIVE_INFINITY;
} }
get isOpen() { get isOpen() {
return this.websocket?.readyState === WebSocket.OPEN; return this.websocket?.readyState === WebSocket.OPEN;
} }
get gatewayURL() { get gatewayURL() {
return this.options.info.url; return this.options.info.url;
} }
get resumeGatewayURL() { get resumeGatewayURL() {
return this.data.resume_gateway_url; return this.data.resume_gateway_url;
} }
get currentGatewayURL() { get currentGatewayURL() {
const url = new URL(this.resumeGatewayURL ?? this.options.info.url); const url = new URL(this.resumeGatewayURL ?? this.options.info.url);
url.searchParams.set('v', '10'); url.searchParams.set('v', '10');
return url.href; return url.href;
} }
async connect() { ping() {
await this.connectTimeout.wait(); if (!this.websocket) return Promise.resolve(Number.POSITIVE_INFINITY);
if (this.isOpen) { return this.websocket.ping();
this.debugger?.debug(`[Shard #${this.id}] attempted to connect while open`); }
return;
} async connect() {
await this.connectTimeout.wait();
this.debugger?.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`); if (this.isOpen) {
this.debugger?.debug(`[Shard #${this.id}] attempted to connect while open`);
// @ts-expect-error @types/bun cause erros in compile return;
// biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol }
this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL);
this.debugger?.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`);
this.websocket!.onmessage = (event: WS.MessageEvent) => this.handleMessage(event);
// @ts-expect-error @types/bun cause erros in compile
this.websocket!.onclose = (event: WS.CloseEvent) => this.handleClosed(event); // biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol
this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL);
this.websocket!.onerror = (event: ErrorEvent) => this.debugger?.error(event);
this.websocket!.onmessage = (event: WS.MessageEvent) => this.handleMessage(event);
this.websocket!.onopen = () => {
this.heart.ack = true; this.websocket!.onclose = (event: WS.CloseEvent) => this.handleClosed(event);
};
} this.websocket!.onerror = (event: ErrorEvent) => this.debugger?.error(event);
async send<T extends GatewaySendPayload = GatewaySendPayload>(force: boolean, message: T) { this.websocket!.onopen = () => {
this.debugger?.info( this.heart.ack = true;
`[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify( };
message.d, }
(_, value) => {
if (typeof value === 'string') async send<T extends GatewaySendPayload = GatewaySendPayload>(force: boolean, message: T) {
return value.replaceAll(this.options.token, v => { this.debugger?.info(
const split = v.split('.'); `[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify(
return `${split[0]}.${'*'.repeat(split[1].length)}.${'*'.repeat(split[2].length)}`; message.d,
}); (_, value) => {
return value; if (typeof value === 'string')
}, return value.replaceAll(this.options.token, v => {
1, const split = v.split('.');
)}`, return `${split[0]}.${'*'.repeat(split[1].length)}.${'*'.repeat(split[2].length)}`;
); });
await this.checkOffline(force); return value;
await this.bucket.acquire(force); },
await this.checkOffline(force); 1,
this.websocket?.send(JSON.stringify(message)); )}`,
} );
await this.checkOffline(force);
async identify() { await this.bucket.acquire(force);
await this.send(true, { await this.checkOffline(force);
op: GatewayOpcodes.Identify, this.websocket?.send(JSON.stringify(message));
d: { }
token: `Bot ${this.options.token}`,
compress: this.options.compress, async identify() {
properties: this.options.properties ?? properties, await this.send(true, {
shard: [this.id, this.options.info.shards], op: GatewayOpcodes.Identify,
intents: this.options.intents, d: {
presence: this.options.presence, token: `Bot ${this.options.token}`,
}, compress: this.options.compress,
}); properties: this.options.properties ?? properties,
} shard: [this.id, this.options.info.shards],
intents: this.options.intents,
get resumable() { presence: this.options.presence,
return !!(this.data.resume_gateway_url && this.data.session_id && this.data.resumeSeq !== null); },
} });
}
async resume() {
await this.send(true, { get resumable() {
op: GatewayOpcodes.Resume, return !!(this.data.resume_gateway_url && this.data.session_id && this.data.resumeSeq !== null);
d: { }
seq: this.data.resumeSeq!,
session_id: this.data.session_id!, async resume() {
token: `Bot ${this.options.token}`, await this.send(true, {
}, op: GatewayOpcodes.Resume,
}); d: {
} seq: this.data.resumeSeq!,
session_id: this.data.session_id!,
async heartbeat(requested: boolean) { token: `Bot ${this.options.token}`,
this.debugger?.debug( },
`[Shard #${this.id}] Sending ${requested ? '' : 'un'}requested heartbeat (Ack=${this.heart.ack})`, });
); }
if (!requested) {
if (!this.heart.ack) { async heartbeat(requested: boolean) {
await this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection'); this.debugger?.debug(
return; `[Shard #${this.id}] Sending ${requested ? '' : 'un'}requested heartbeat (Ack=${this.heart.ack})`,
} );
this.heart.ack = false; if (!requested) {
} if (!this.heart.ack) {
await this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection');
this.heart.lastBeat = Date.now(); return;
}
this.websocket!.send( this.heart.ack = false;
JSON.stringify({ }
op: GatewayOpcodes.Heartbeat,
d: this.data.resumeSeq ?? null, this.heart.lastBeat = Date.now();
}),
); this.websocket!.send(
} JSON.stringify({
op: GatewayOpcodes.Heartbeat,
async disconnect() { d: this.data.resumeSeq ?? null,
this.debugger?.info(`[Shard #${this.id}] Disconnecting`); }),
await this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request'); );
} }
async reconnect() { async disconnect() {
this.debugger?.info(`[Shard #${this.id}] Reconnecting`); this.debugger?.info(`[Shard #${this.id}] Disconnecting`);
await this.disconnect(); await this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request');
await this.connect(); }
}
async reconnect() {
async onpacket(packet: GatewayReceivePayload) { this.debugger?.info(`[Shard #${this.id}] Reconnecting`);
if (packet.s !== null) { await this.disconnect();
this.data.resumeSeq = packet.s; await this.connect();
} }
this.debugger?.debug(`[Shard #${this.id}]`, packet.t ? packet.t : GatewayOpcodes[packet.op], this.data.resumeSeq); async onpacket(packet: GatewayReceivePayload) {
if (packet.s !== null) {
switch (packet.op) { this.data.resumeSeq = packet.s;
case GatewayOpcodes.Hello: }
{
clearInterval(this.heart.nodeInterval); this.debugger?.debug(`[Shard #${this.id}]`, packet.t ? packet.t : GatewayOpcodes[packet.op], this.data.resumeSeq);
this.heart.interval = packet.d.heartbeat_interval; switch (packet.op) {
case GatewayOpcodes.Hello:
await this.heartbeat(false); {
this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval); clearInterval(this.heart.nodeInterval);
if (this.resumable) { this.heart.interval = packet.d.heartbeat_interval;
return this.resume();
} await this.heartbeat(false);
await this.identify(); this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval);
}
break; if (this.resumable) {
case GatewayOpcodes.HeartbeatAck: return this.resume();
this.heart.ack = true; }
this.heart.lastAck = Date.now(); await this.identify();
break; }
case GatewayOpcodes.Heartbeat: break;
this.heartbeat(true); case GatewayOpcodes.HeartbeatAck:
break; this.heart.ack = true;
case GatewayOpcodes.Reconnect: this.heart.lastAck = Date.now();
await this.reconnect(); break;
break; case GatewayOpcodes.Heartbeat:
case GatewayOpcodes.InvalidSession: this.heartbeat(true);
if (packet.d) { break;
if (!this.resumable) { case GatewayOpcodes.Reconnect:
return this.debugger?.fatal(`[Shard #${this.id}] This is a completely unexpected error message.`); await this.reconnect();
} break;
await this.resume(); case GatewayOpcodes.InvalidSession:
} else { if (packet.d) {
this.data.resumeSeq = 0; if (!this.resumable) {
this.data.session_id = undefined; return this.debugger?.fatal(`[Shard #${this.id}] This is a completely unexpected error message.`);
await this.identify(); }
} await this.resume();
break; } else {
case GatewayOpcodes.Dispatch: this.data.resumeSeq = 0;
{ this.data.session_id = undefined;
switch (packet.t) { await this.identify();
case GatewayDispatchEvents.Resumed: }
this.offlineSendQueue.map((resolve: () => any) => resolve()); break;
this.options.handlePayload(this.id, packet); case GatewayOpcodes.Dispatch:
break; {
case GatewayDispatchEvents.Ready: { switch (packet.t) {
this.data.resume_gateway_url = packet.d.resume_gateway_url; case GatewayDispatchEvents.Resumed:
this.data.session_id = packet.d.session_id; this.offlineSendQueue.map((resolve: () => any) => resolve());
this.offlineSendQueue.map((resolve: () => any) => resolve()); this.options.handlePayload(this.id, packet);
this.options.handlePayload(this.id, packet); break;
break; case GatewayDispatchEvents.Ready: {
} this.data.resume_gateway_url = packet.d.resume_gateway_url;
default: this.data.session_id = packet.d.session_id;
this.options.handlePayload(this.id, packet); this.offlineSendQueue.map((resolve: () => any) => resolve());
break; this.options.handlePayload(this.id, packet);
} break;
} }
break; default:
} this.options.handlePayload(this.id, packet);
} break;
}
protected async handleClosed(close: CloseEvent) { }
clearInterval(this.heart.nodeInterval); break;
this.debugger?.warn( }
`[Shard #${this.id}] ${ShardSocketCloseCodes[close.code] ?? GatewayCloseCodes[close.code] ?? close.code} (${ }
close.code
})`, protected async handleClosed(close: CloseEvent) {
); clearInterval(this.heart.nodeInterval);
this.debugger?.warn(
switch (close.code) { `[Shard #${this.id}] ${ShardSocketCloseCodes[close.code] ?? GatewayCloseCodes[close.code] ?? close.code} (${
case ShardSocketCloseCodes.Shutdown: close.code
break; })`,
case 1000: );
case 1001:
case 1006: switch (close.code) {
case ShardSocketCloseCodes.ZombiedConnection: case ShardSocketCloseCodes.Shutdown:
case GatewayCloseCodes.UnknownError: break;
case GatewayCloseCodes.UnknownOpcode: case 1000:
case GatewayCloseCodes.DecodeError: case 1001:
case GatewayCloseCodes.NotAuthenticated: case 1006:
case GatewayCloseCodes.AlreadyAuthenticated: case ShardSocketCloseCodes.ZombiedConnection:
case GatewayCloseCodes.InvalidSeq: case GatewayCloseCodes.UnknownError:
case GatewayCloseCodes.RateLimited: case GatewayCloseCodes.UnknownOpcode:
case GatewayCloseCodes.SessionTimedOut: case GatewayCloseCodes.DecodeError:
this.debugger?.info(`[Shard #${this.id}] Trying to reconnect`); case GatewayCloseCodes.NotAuthenticated:
await this.reconnect(); case GatewayCloseCodes.AlreadyAuthenticated:
break; case GatewayCloseCodes.InvalidSeq:
case GatewayCloseCodes.RateLimited:
case GatewayCloseCodes.AuthenticationFailed: case GatewayCloseCodes.SessionTimedOut:
case GatewayCloseCodes.DisallowedIntents: this.debugger?.info(`[Shard #${this.id}] Trying to reconnect`);
case GatewayCloseCodes.InvalidAPIVersion: await this.reconnect();
case GatewayCloseCodes.InvalidIntents: break;
case GatewayCloseCodes.InvalidShard:
case GatewayCloseCodes.ShardingRequired: case GatewayCloseCodes.AuthenticationFailed:
this.debugger?.fatal(`[Shard #${this.id}] cannot reconnect`); case GatewayCloseCodes.DisallowedIntents:
break; case GatewayCloseCodes.InvalidAPIVersion:
case GatewayCloseCodes.InvalidIntents:
default: case GatewayCloseCodes.InvalidShard:
this.debugger?.warn(`[Shard #${this.id}] Unknown close code, trying to reconnect anyways`); case GatewayCloseCodes.ShardingRequired:
await this.reconnect(); this.debugger?.fatal(`[Shard #${this.id}] cannot reconnect`);
break; break;
}
} default:
this.debugger?.warn(`[Shard #${this.id}] Unknown close code, trying to reconnect anyways`);
async close(code: number, reason: string) { await this.reconnect();
if (this.websocket?.readyState !== WebSocket.OPEN) { break;
return this.debugger?.warn(`[Shard #${this.id}] Is not open`); }
} }
this.debugger?.warn(`[Shard #${this.id}] Called close`);
this.websocket?.close(code, reason); async close(code: number, reason: string) {
} if (this.websocket?.readyState !== WebSocket.OPEN) {
return this.debugger?.warn(`[Shard #${this.id}] Is not open`);
protected handleMessage({ data }: WS.MessageEvent) { }
if (data instanceof Buffer) { this.debugger?.warn(`[Shard #${this.id}] Called close`);
data = inflateSync(data); this.websocket?.close(code, reason);
} }
return this.onpacket(JSON.parse(data as string));
} protected handleMessage({ data }: WS.MessageEvent) {
if (data instanceof Buffer) {
checkOffline(force: boolean) { data = inflateSync(data);
if (!this.isOpen) { }
return new Promise(resolve => this.offlineSendQueue[force ? 'unshift' : 'push'](resolve)); return this.onpacket(JSON.parse(data as string));
} }
return Promise.resolve();
} checkOffline(force: boolean) {
if (!this.isOpen) {
calculateSafeRequests(): number { return new Promise(resolve => this.offlineSendQueue[force ? 'unshift' : 'push'](resolve));
const safeRequests = }
this.options.ratelimitOptions!.maxRequestsPerRateLimitTick - return Promise.resolve();
Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heart.interval) * 2; }
if (safeRequests < 0) { calculateSafeRequests(): number {
return 0; const safeRequests =
} this.options.ratelimitOptions!.maxRequestsPerRateLimitTick -
return safeRequests; Math.ceil(this.options.ratelimitOptions!.rateLimitResetInterval / this.heart.interval) * 2;
}
} if (safeRequests < 0) {
return 0;
}
return safeRequests;
}
}