From f4f1348ae4aef978a76872bff07900107e79b91e Mon Sep 17 00:00:00 2001 From: Dragurimu Date: Sat, 20 Aug 2022 04:38:16 -0500 Subject: [PATCH] ws needs to be refactored --- .../ws/src/adapters/default-ws-adapter.ts | 22 ++++--- packages/ws/src/adapters/ws-adapter.ts | 2 +- packages/ws/src/services/agent.ts | 9 ++- packages/ws/src/services/shard.ts | 64 +++++++------------ 4 files changed, 42 insertions(+), 55 deletions(-) diff --git a/packages/ws/src/adapters/default-ws-adapter.ts b/packages/ws/src/adapters/default-ws-adapter.ts index 90ecad1..2fe7660 100644 --- a/packages/ws/src/adapters/default-ws-adapter.ts +++ b/packages/ws/src/adapters/default-ws-adapter.ts @@ -15,10 +15,10 @@ import { Agent } from '../services/agent'; export class DefaultWsAdapter implements WsAdapter { static readonly DEFAULTS = { - spawnShardDelay: 5000, + spawnShardDelay: 5300, - shardsPerWorker: 5, - totalWorkers: 1, + shardsPerWorker: 25, + totalWorkers: 4, gatewayBot: { url: 'wss://gateway.discord.gg', @@ -32,9 +32,9 @@ export class DefaultWsAdapter implements WsAdapter { }, }, - firstShardId: 0, // remove + firstShardId: 0, - lastShardId: 1, // remove + lastShardId: 1, }; buckets = new Map< @@ -52,8 +52,11 @@ export class DefaultWsAdapter implements WsAdapter { constructor(options: DefaultWsOptions) { this.options = Object.assign(Object.create(DefaultWsAdapter.DEFAULTS), options); + this.options.firstShardId = this.options.firstShardId ?? 0; + this.options.lastShardId = this.options.lastShardId ?? this.options.totalShards - 1 ?? 1; + this.agent = new Agent({ - totalShards: this.options.totalShards ?? 1, + totalShards: this.options.totalShards ?? this.options.gatewayBot.shards ?? 1, gatewayConfig: this.options.gatewayConfig, createShardOptions: this.options.createShardOptions, @@ -62,7 +65,8 @@ export class DefaultWsAdapter implements WsAdapter { }, handleIdentify: (id: number) => { - return this.buckets.get(id)!.leak.acquire(1); + // console.log(id % this.options.gatewayBot.sessionStartLimit.maxConcurrency, id, this.options.gatewayBot.sessionStartLimit.maxConcurrency); + return this.buckets.get(id % this.options.gatewayBot.sessionStartLimit.maxConcurrency)!.leak.acquire(1); }, }); } @@ -98,9 +102,7 @@ export class DefaultWsAdapter implements WsAdapter { ); } - const bucketId = - shardId % - this.options.gatewayBot.sessionStartLimit.maxConcurrency; + const bucketId = shardId % this.options.gatewayBot.sessionStartLimit.maxConcurrency; const bucket = this.buckets.get(bucketId); if (!bucket) { diff --git a/packages/ws/src/adapters/ws-adapter.ts b/packages/ws/src/adapters/ws-adapter.ts index 1b36424..89f63f4 100644 --- a/packages/ws/src/adapters/ws-adapter.ts +++ b/packages/ws/src/adapters/ws-adapter.ts @@ -1,5 +1,5 @@ import type { Agent } from '../services/agent'; -import { GatewayBot } from '@biscuitland/api-types'; +import type { GatewayBot } from '@biscuitland/api-types'; export interface WsAdapter { options: Partial; diff --git a/packages/ws/src/services/agent.ts b/packages/ws/src/services/agent.ts index 7cba9a8..f47d9f3 100644 --- a/packages/ws/src/services/agent.ts +++ b/packages/ws/src/services/agent.ts @@ -29,7 +29,7 @@ export class Agent { return this.options.handleMessage(shard, message); }, - handleIdentify: async function () { + async handleIdentify() { return await handleIdentify(id); }, @@ -46,6 +46,7 @@ export class Agent { */ async identify(id: number) { + // @ts-ignore let shard = this.shards.get(id); if (!shard) { @@ -60,7 +61,7 @@ export class Agent { return this.options.handleMessage(shard, message); }, - handleIdentify: async function () { + async handleIdentify() { return await handleIdentify(id); }, @@ -77,7 +78,9 @@ export class Agent { * @inheritDoc */ - async scale() {} + async scale() { + // + } } export type AgentOptions = Pick< diff --git a/packages/ws/src/services/shard.ts b/packages/ws/src/services/shard.ts index 7e1b5e8..f323dbd 100644 --- a/packages/ws/src/services/shard.ts +++ b/packages/ws/src/services/shard.ts @@ -202,9 +202,6 @@ export enum ShardState { export class Shard { static readonly DEFAULTS = { - /** The total amount of shards which are used to communicate with Discord. */ - totalShards: 1, - /** The maximum of requests which can be send to discord per rate limit tick. */ maxRequestsPerRateLimitTick: MAX_GATEWAY_REQUESTS_PER_INTERVAL, @@ -219,6 +216,8 @@ export class Shard { offlineSendQueue: any[]; + totalShards: number; + sessionId!: string; resolves: Map< @@ -297,6 +296,8 @@ export class Shard { interval: DEFAULT_HEARTBEAT_INTERVAL, }; + this.totalShards = this.options.totalShards, + this.state = ShardState.Offline; this.id = options.id; @@ -369,10 +370,10 @@ export class Shard { } /** - * @inheritDoc + * @inheritDoc */ - async handleMessage (message: MessageEvent): Promise { + async handleMessage(message: MessageEvent): Promise { let data = message.data; if (this.options.gatewayConfig.compress && data instanceof Blob) { @@ -380,14 +381,17 @@ export class Shard { data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer()))); } - if (typeof data !== 'string') return; + if (typeof data !== 'string') { + return; + } const messageData = JSON.parse(data) as DiscordGatewayPayload; - switch (messageData.op) { case GatewayOpcodes.Heartbeat: { - if (!this.isOpen()) return; + if (!this.isOpen()) { + return; + } this.heart.lastBeat = Date.now(); @@ -466,18 +470,17 @@ export class Shard { this.state = ShardState.Connected; this.events.resumed?.(this); - this.offlineSendQueue.map((resolve) => resolve()); + this.offlineSendQueue.map(resolve => resolve()); this.resolves.get('RESUMED')?.(messageData); this.resolves.delete('RESUMED'); - } - else if (messageData.t === 'READY') { + } else if (messageData.t === 'READY') { const payload = messageData.d as DiscordReady; this.sessionId = payload.session_id; this.state = ShardState.Connected; - this.offlineSendQueue.map((resolve) => resolve()); + this.offlineSendQueue.map(resolve => resolve()); this.resolves.get('READY')?.(messageData); this.resolves.delete('READY'); @@ -527,7 +530,7 @@ export class Shard { await this.connect(); } - await this.handleIdentify(); + await this.handleIdentify(); this.send( { @@ -537,7 +540,7 @@ export class Shard { compress: this.options.gatewayConfig.compress, properties: this.options.gatewayConfig.properties, intents: this.options.gatewayConfig.intents, - shard: [this.id, this.options.totalShards] + shard: [this.id, this.totalShards] }, }, true @@ -561,8 +564,6 @@ export class Shard { */ async connect(): Promise { - let hi = false; - if ( ![ShardState.Identifying, ShardState.Resuming].includes(this.state!) ) { @@ -580,37 +581,18 @@ export class Shard { socket.onclose = (event: any) => this.handleClose(event); socket.onmessage = (message: any) => { - hi = true; - this.handleMessage(message); }; return new Promise(resolve => { socket.onopen = () => { - setTimeout(() => { - if (!hi) { - this.handleMessage({ - data: JSON.stringify({ - t: null, - s: null, - op: 10, - d: { heartbeat_interval: 41250 }, - }), - } as any); - } - }, 250); + if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) { + this.state = ShardState.Unidentified; + } - if ( - ![ShardState.Identifying, ShardState.Resuming].includes( - this.state! - ) - ) { - this.state = ShardState.Unidentified; - } + this.events.connected?.(this); - this.events.connected?.(this); - - resolve(); + resolve(); }; }); } @@ -758,7 +740,7 @@ export class Shard { close(code: number, reason: string): void { if (this.socket?.readyState !== WebSocket.OPEN) { return; - }; + } return this.socket?.close(code, reason); }