fix: ws reconnect

This commit is contained in:
MARCROCK22 2024-08-11 23:43:33 +00:00
parent 084c85368e
commit 9066f510e3
9 changed files with 67 additions and 103 deletions

View File

@ -22,8 +22,7 @@
"author": "MARCROCK22", "author": "MARCROCK22",
"license": "Apache-2.0", "license": "Apache-2.0",
"dependencies": { "dependencies": {
"magic-bytes.js": "^1.10.0", "magic-bytes.js": "^1.10.0"
"ws": "^8.18.0"
}, },
"lint-staged": { "lint-staged": {
"*.ts": [ "*.ts": [
@ -35,7 +34,6 @@
"@commitlint/cli": "^19.3.0", "@commitlint/cli": "^19.3.0",
"@commitlint/config-conventional": "^19.2.2", "@commitlint/config-conventional": "^19.2.2",
"@types/node": "^20.14.11", "@types/node": "^20.14.11",
"@types/ws": "^8.5.12",
"husky": "^9.1.1", "husky": "^9.1.1",
"lint-staged": "^15.2.7", "lint-staged": "^15.2.7",
"rimraf": "5.0.9", "rimraf": "5.0.9",

27
pnpm-lock.yaml generated
View File

@ -11,9 +11,6 @@ importers:
magic-bytes.js: magic-bytes.js:
specifier: ^1.10.0 specifier: ^1.10.0
version: 1.10.0 version: 1.10.0
ws:
specifier: ^8.18.0
version: 8.18.0
optionalDependencies: optionalDependencies:
chokidar: chokidar:
specifier: ^3.6.0 specifier: ^3.6.0
@ -40,9 +37,6 @@ importers:
'@types/node': '@types/node':
specifier: ^20.14.11 specifier: ^20.14.11
version: 20.14.11 version: 20.14.11
'@types/ws':
specifier: ^8.5.12
version: 8.5.12
husky: husky:
specifier: ^9.1.1 specifier: ^9.1.1
version: 9.1.1 version: 9.1.1
@ -209,9 +203,6 @@ packages:
'@types/node@20.14.11': '@types/node@20.14.11':
resolution: {integrity: sha512-kprQpL8MMeszbz6ojB5/tU8PLN4kesnN8Gjzw349rDlNgsSzg90lAVj3llK99Dh7JON+t9AuscPPFW6mPbTnSA==} resolution: {integrity: sha512-kprQpL8MMeszbz6ojB5/tU8PLN4kesnN8Gjzw349rDlNgsSzg90lAVj3llK99Dh7JON+t9AuscPPFW6mPbTnSA==}
'@types/ws@8.5.12':
resolution: {integrity: sha512-3tPRkv1EtkDpzlgyKyI8pGsGZAGPEaXeu0DOj5DI25Ja91bdAYddYHbADRYVrZMRbfW+1l5YwXVDKohDJNQxkQ==}
JSONStream@1.3.5: JSONStream@1.3.5:
resolution: {integrity: sha512-E+iruNOY8VV9s4JEbe1aNEm6MiszPRr/UfcHMz0TQh1BXSxHK+ASV1R6W4HpjBhSeS+54PIsAMCBmwD06LLsqQ==} resolution: {integrity: sha512-E+iruNOY8VV9s4JEbe1aNEm6MiszPRr/UfcHMz0TQh1BXSxHK+ASV1R6W4HpjBhSeS+54PIsAMCBmwD06LLsqQ==}
hasBin: true hasBin: true
@ -874,18 +865,6 @@ packages:
resolution: {integrity: sha512-G8ura3S+3Z2G+mkgNRq8dqaFZAuxfsxpBB8OCTGRTCtp+l/v9nbFNmCUP1BZMts3G1142MsZfn6eeUKrr4PD1Q==} resolution: {integrity: sha512-G8ura3S+3Z2G+mkgNRq8dqaFZAuxfsxpBB8OCTGRTCtp+l/v9nbFNmCUP1BZMts3G1142MsZfn6eeUKrr4PD1Q==}
engines: {node: '>=18'} engines: {node: '>=18'}
ws@8.18.0:
resolution: {integrity: sha512-8VbfWfHLbbwu3+N6OKsOMpBdT4kXPDDB9cJk2bJ6mh9ucxdlnNvH1e+roYkKmN9Nxw2yjz7VzeO9oOz2zJ04Pw==}
engines: {node: '>=10.0.0'}
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: '>=5.0.2'
peerDependenciesMeta:
bufferutil:
optional: true
utf-8-validate:
optional: true
y18n@5.0.8: y18n@5.0.8:
resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==} resolution: {integrity: sha512-0pfFzegeDWJHJIAmTLRP2DwHjdF5s7jo9tuztdQxAhINCdvS+3nGINqPd00AphqJR/0LhANUS6/+7SCb98YOfA==}
engines: {node: '>=10'} engines: {node: '>=10'}
@ -1092,10 +1071,6 @@ snapshots:
dependencies: dependencies:
undici-types: 5.26.5 undici-types: 5.26.5
'@types/ws@8.5.12':
dependencies:
'@types/node': 20.14.11
JSONStream@1.3.5: JSONStream@1.3.5:
dependencies: dependencies:
jsonparse: 1.3.1 jsonparse: 1.3.1
@ -1707,8 +1682,6 @@ snapshots:
string-width: 7.2.0 string-width: 7.2.0
strip-ansi: 7.1.0 strip-ansi: 7.1.0
ws@8.18.0: {}
y18n@5.0.8: {} y18n@5.0.8: {}
yaml@2.4.5: {} yaml@2.4.5: {}

View File

@ -65,7 +65,7 @@ export interface WebhookRoutes {
>, >,
): Promise<RESTPostAPIWebhookWithTokenGitHubResult | RESTPostAPIWebhookWithTokenGitHubWaitResult>; ): Promise<RESTPostAPIWebhookWithTokenGitHubResult | RESTPostAPIWebhookWithTokenGitHubWaitResult>;
}; };
messages: (id: string | '@original') => { messages: (id: (string & {}) | '@original') => {
get(args?: RestArguments<ProxyRequestMethod.Get>): Promise<RESTGetAPIWebhookWithTokenMessageResult>; get(args?: RestArguments<ProxyRequestMethod.Get>): Promise<RESTGetAPIWebhookWithTokenMessageResult>;
patch( patch(
args: RestArguments<ProxyRequestMethod.Patch, RESTPatchAPIWebhookWithTokenMessageJSONBody>, args: RestArguments<ProxyRequestMethod.Patch, RESTPatchAPIWebhookWithTokenMessageJSONBody>,

View File

@ -214,4 +214,4 @@ export class ChannelShorter extends BaseShorter {
} }
} }
export type ChannelShorterOptionalParams = Partial<{ guildId: string; reason: string }>; export type ChannelShorterOptionalParams = Partial<{ guildId: (string & {}) | '@me'; reason: string }>;

View File

@ -224,7 +224,7 @@ export class MemberShorter extends BaseShorter {
async voice(guildId: string, memberId: '@me', force?: boolean): Promise<VoiceStateStructure>; async voice(guildId: string, memberId: '@me', force?: boolean): Promise<VoiceStateStructure>;
async voice(guildId: string, memberId: string, force?: boolean): Promise<VoiceStateStructure>; async voice(guildId: string, memberId: string, force?: boolean): Promise<VoiceStateStructure>;
async voice(guildId: string, memberId: string | '@me', force = false) { async voice(guildId: string, memberId: (string & {}) | '@me', force = false) {
if (!force) { if (!force) {
const state = await this.client.cache.voiceStates?.get(memberId, guildId); const state = await this.client.cache.voiceStates?.get(memberId, guildId);
if (state) return state; if (state) return state;

View File

@ -1,60 +1,58 @@
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
// import { SeyfertWebSocket } from './socket/custom'; import { SeyfertWebSocket } from './socket/custom';
import { WebSocket as NodeJSWebSocket } from 'ws';
export class BaseSocket { export class BaseSocket {
private internal: NodeJSWebSocket | WebSocket; private internal: SeyfertWebSocket | WebSocket;
ping?: () => Promise<number>; ping?: () => Promise<number>;
constructor(kind: 'ws' | 'bun', url: string) { constructor(kind: 'ws' | 'bun', url: string) {
this.internal = kind === 'ws' ? new NodeJSWebSocket(url) : new WebSocket(url); this.internal = kind === 'ws' ? new SeyfertWebSocket(url) : new WebSocket(url);
// this.internal = kind === 'ws' ? new SeyfertWebSocket(url) : new WebSocket(url);
// if (kind === 'ws') { if (kind === 'ws') {
// const ws = this.internal as NodeJSWebSocket; const ws = this.internal as SeyfertWebSocket;
// this.ping = ws.waitPing.bind(ws); this.ping = ws.waitPing.bind(ws);
// ws.onpong = data => { ws.onpong = data => {
// const promise = ws.__promises.get(data); const promise = ws.__promises.get(data);
// if (promise) { if (promise) {
// ws.__promises.delete(data); ws.__promises.delete(data);
// promise?.resolve(); promise?.resolve();
// } }
// }; };
// } else { } else {
const ws = this.internal as WebSocket; const ws = this.internal as WebSocket;
this.ping = () => { this.ping = () => {
return new Promise<number>(res => { return new Promise<number>(res => {
const nonce = randomUUID(); const nonce = randomUUID();
const start = performance.now(); const start = performance.now();
const listener = (data: Buffer) => { const listener = (data: Buffer) => {
if (data.toString() !== nonce) return; if (data.toString() !== nonce) return;
//@ts-expect-error bun support //@ts-expect-error
ws.removeListener('pong', listener); ws.removeListener('pong', listener);
res(performance.now() - start); res(performance.now() - start);
}; };
//@ts-expect-error bun support //@ts-expect-error
ws.on('pong', listener); ws.on('pong', listener);
//@ts-expect-error bun support //@ts-expect-error
ws.ping(nonce); ws.ping(nonce);
}); });
}; };
// } }
} }
set onopen(callback: NodeJSWebSocket['onopen']) { set onopen(callback: SeyfertWebSocket['onopen']) {
this.internal.onopen = callback; this.internal.onopen = callback;
} }
set onmessage(callback: NodeJSWebSocket['onmessage']) { set onmessage(callback: SeyfertWebSocket['onmessage']) {
this.internal.onmessage = callback; this.internal.onmessage = callback;
} }
set onclose(callback: NodeJSWebSocket['onclose']) { set onclose(callback: SeyfertWebSocket['onclose']) {
this.internal.onclose = callback; this.internal.onclose = callback;
} }
set onerror(callback: NodeJSWebSocket['onerror']) { set onerror(callback: SeyfertWebSocket['onerror']) {
this.internal.onerror = callback; this.internal.onerror = callback;
} }
@ -62,8 +60,7 @@ export class BaseSocket {
return this.internal.send(data); return this.internal.send(data);
} }
close(...args: Parameters<NodeJSWebSocket['close']>) { close(...args: Parameters<SeyfertWebSocket['close']>) {
//@ts-expect-error
return this.internal.close(...args); return this.internal.close(...args);
} }

View File

@ -102,13 +102,13 @@ export class Shard {
// @ts-expect-error @types/bun cause erros in compile // @ts-expect-error @types/bun cause erros in compile
// biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol // biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol
this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL); this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL);
//@ts-expect-error
this.websocket!.onmessage = ({ data }: { data: string | Buffer }) => { this.websocket!.onmessage = ({ data }: { data: string | Buffer }) => {
this.handleMessage(data); this.handleMessage(data);
}; };
this.websocket!.onclose = (event: { code: number; reason: string }) => this.handleClosed(event); this.websocket!.onclose = (event: { code: number; reason: string }) => this.handleClosed(event);
//@ts-expect-error
this.websocket!.onerror = (event: ErrorEvent) => this.debugger?.error(event); this.websocket!.onerror = (event: ErrorEvent) => this.debugger?.error(event);
this.websocket!.onopen = () => { this.websocket!.onopen = () => {

View File

@ -19,11 +19,9 @@ export class SeyfertWebSocket {
code: number; code: number;
reason: string; reason: string;
} = null; } = null;
__closeCalled?: boolean;
constructor( constructor(url: string) {
url: string,
public compress = true,
) {
const urlParts = new URL(url); const urlParts = new URL(url);
this.hostname = urlParts.hostname || ''; this.hostname = urlParts.hostname || '';
this.path = `${urlParts.pathname}${urlParts.search || ''}`; this.path = `${urlParts.pathname}${urlParts.search || ''}`;
@ -55,31 +53,30 @@ export class SeyfertWebSocket {
} }
this.socket = socket; this.socket = socket;
socket.on('readable', () => { socket.on('readable', this.handleReadable.bind(this));
this.handleReadable();
});
socket.on('close', () => { socket.on('close', this.handleClose.bind(this));
this.handleClose();
}); socket.on('error', err => this.onerror(err));
socket.on('error', err => {
this.onerror(err);
});
this.onopen(); this.onopen();
}); });
req.on('close', () => {
req.removeAllListeners();
});
req.end(); req.end();
} }
handleReadable() { handleReadable() {
// Keep reading until no data, this is useful when two payloads merges. // Keep reading until no data, this is useful when two payloads merges.
while (this.socket?.readableLength) { while (this.socket!.readableLength > 0) {
// Read length without consuming the buffer // Read length without consuming the buffer
let length = this.readBytes(1, 1) & 127; let length = this.readBytes(1, 1) & 127;
const slice = length === 126 ? 4 : length === 127 ? 10 : 2; const slice = length === 126 ? 4 : length === 127 ? 10 : 2;
// Check if frame/data is complete // Check if frame/data is complete
if (this.socket.readableLength < slice) return; // Wait to next cycle if not if (this.socket!.readableLength < slice) return; // Wait to next cycle if not
if (length > 125) { if (length > 125) {
// https://datatracker.ietf.org/doc/html/rfc6455#section-5.2 // https://datatracker.ietf.org/doc/html/rfc6455#section-5.2
// If length is 126/127, read extended payload length instead // If length is 126/127, read extended payload length instead
@ -87,7 +84,7 @@ export class SeyfertWebSocket {
length = this.readBytes(2, slice - 2); length = this.readBytes(2, slice - 2);
} }
// Read the frame, ignore data next to it, leave it to next `while` cycle // Read the frame, ignore data next to it, leave it to next `while` cycle
const frame = this.socket.read(slice + length) as Buffer | null; const frame = this.socket!.read(slice + length) as Buffer | null;
if (!frame) return; if (!frame) return;
// Get fin (0 | 1) // Get fin (0 | 1)
const fin = frame[0] >> 7; const fin = frame[0] >> 7;
@ -147,12 +144,15 @@ export class SeyfertWebSocket {
code: body.readUInt16BE(0), code: body.readUInt16BE(0),
reason: body.subarray(2).toString(), reason: body.subarray(2).toString(),
}; };
this.socket?.destroy();
break; break;
} }
} }
handleClose() { handleClose() {
this.socket?.removeAllListeners();
this.socket?.destroy();
this.socket = undefined;
if (this.__closeCalled) return;
if (!this.__lastError) return this.connect(); if (!this.__lastError) return this.connect();
this.onclose(this.__lastError); this.onclose(this.__lastError);
this.__lastError = null; this.__lastError = null;
@ -198,6 +198,7 @@ export class SeyfertWebSocket {
onerror(_err: unknown) {} onerror(_err: unknown) {}
close(code: number, reason: string) { close(code: number, reason: string) {
this.__closeCalled = true;
// alloc payload length // alloc payload length
const buffer = Buffer.alloc(2 + Buffer.byteLength(reason)); const buffer = Buffer.alloc(2 + Buffer.byteLength(reason));
// gateway close code // gateway close code
@ -285,6 +286,7 @@ export class SeyfertWebSocket {
// Buffer to read // Buffer to read
let block; let block;
while ((block = readable.buffer[blockIndex++])) { while ((block = readable.buffer[blockIndex++])) {
// biome-ignore lint/style/useForOf: why we use biome
for (let i = 0; i < block.length; i++) { for (let i = 0; i < block.length; i++) {
if (++bitIndex > start) { if (++bitIndex > start) {
value *= 256; // shift 8 bits (1 byte) `*= 256 is faster than <<= 8` value *= 256; // shift 8 bits (1 byte) `*= 256 is faster than <<= 8`
@ -300,6 +302,7 @@ export class SeyfertWebSocket {
// readable.buffer is kinda a LinkedList // readable.buffer is kinda a LinkedList
let head: ReadableHeadData | undefined = readable.buffer.head; let head: ReadableHeadData | undefined = readable.buffer.head;
while (head) { while (head) {
// biome-ignore lint/style/useForOf: why we use biome
for (let i = 0; i < head.data.length; i++) { for (let i = 0; i < head.data.length; i++) {
if (++bitIndex > start) { if (++bitIndex > start) {
value *= 256; // shift 8 bits (1 byte) `*= 256 is faster than <<= 8` value *= 256; // shift 8 bits (1 byte) `*= 256 is faster than <<= 8`

View File

@ -45,7 +45,7 @@ export type WorkerSendCacheRequest = CreateWorkerMessage<
| 'addToRelationship' | 'addToRelationship'
| 'removeRelationship' | 'removeRelationship'
| 'removeToRelationship'; | 'removeToRelationship';
args: any[]; args: unknown[];
} }
>; >;
export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>; export type WorkerSendShardInfo = CreateWorkerMessage<'SHARD_INFO', WorkerShardInfo & { nonce: string }>;
@ -61,21 +61,15 @@ export type WorkerSendApiRequest = CreateWorkerMessage<
nonce: string; nonce: string;
} }
>; >;
export type WorkerExecuteEval = CreateWorkerMessage<
'EXECUTE_EVAL',
{
func: string;
nonce: string;
toWorkerId: number;
}
>;
export type WorkerSendEvalResponse = CreateWorkerMessage< export type WorkerSendEvalResponse = CreateWorkerMessage<
'EVAL_RESPONSE', 'EVAL_RESPONSE',
{ {
response: any; response: unknown;
nonce: string; nonce: string;
} }
>; >;
export type WorkerSendEval = CreateWorkerMessage< export type WorkerSendEval = CreateWorkerMessage<
'EVAL', 'EVAL',
{ {
@ -94,7 +88,6 @@ export type WorkerMessage =
| WorkerSendInfo | WorkerSendInfo
| WorkerReady | WorkerReady
| WorkerSendApiRequest | WorkerSendApiRequest
| WorkerExecuteEval
| WorkerSendEvalResponse | WorkerSendEvalResponse
| WorkerSendEval | WorkerSendEval
| WorkerStart; | WorkerStart;