ws needs to be refactored

This commit is contained in:
Dragurimu 2022-08-20 04:38:16 -05:00
parent ab4b1f0d74
commit f4f1348ae4
4 changed files with 42 additions and 55 deletions

View File

@ -15,10 +15,10 @@ import { Agent } from '../services/agent';
export class DefaultWsAdapter implements WsAdapter { export class DefaultWsAdapter implements WsAdapter {
static readonly DEFAULTS = { static readonly DEFAULTS = {
spawnShardDelay: 5000, spawnShardDelay: 5300,
shardsPerWorker: 5, shardsPerWorker: 25,
totalWorkers: 1, totalWorkers: 4,
gatewayBot: { gatewayBot: {
url: 'wss://gateway.discord.gg', url: 'wss://gateway.discord.gg',
@ -32,9 +32,9 @@ export class DefaultWsAdapter implements WsAdapter {
}, },
}, },
firstShardId: 0, // remove firstShardId: 0,
lastShardId: 1, // remove lastShardId: 1,
}; };
buckets = new Map< buckets = new Map<
@ -52,8 +52,11 @@ export class DefaultWsAdapter implements WsAdapter {
constructor(options: DefaultWsOptions) { constructor(options: DefaultWsOptions) {
this.options = Object.assign(Object.create(DefaultWsAdapter.DEFAULTS), options); this.options = Object.assign(Object.create(DefaultWsAdapter.DEFAULTS), options);
this.options.firstShardId = this.options.firstShardId ?? 0;
this.options.lastShardId = this.options.lastShardId ?? this.options.totalShards - 1 ?? 1;
this.agent = new Agent({ this.agent = new Agent({
totalShards: this.options.totalShards ?? 1, totalShards: this.options.totalShards ?? this.options.gatewayBot.shards ?? 1,
gatewayConfig: this.options.gatewayConfig, gatewayConfig: this.options.gatewayConfig,
createShardOptions: this.options.createShardOptions, createShardOptions: this.options.createShardOptions,
@ -62,7 +65,8 @@ export class DefaultWsAdapter implements WsAdapter {
}, },
handleIdentify: (id: number) => { handleIdentify: (id: number) => {
return this.buckets.get(id)!.leak.acquire(1); // console.log(id % this.options.gatewayBot.sessionStartLimit.maxConcurrency, id, this.options.gatewayBot.sessionStartLimit.maxConcurrency);
return this.buckets.get(id % this.options.gatewayBot.sessionStartLimit.maxConcurrency)!.leak.acquire(1);
}, },
}); });
} }
@ -98,9 +102,7 @@ export class DefaultWsAdapter implements WsAdapter {
); );
} }
const bucketId = const bucketId = shardId % this.options.gatewayBot.sessionStartLimit.maxConcurrency;
shardId %
this.options.gatewayBot.sessionStartLimit.maxConcurrency;
const bucket = this.buckets.get(bucketId); const bucket = this.buckets.get(bucketId);
if (!bucket) { if (!bucket) {

View File

@ -1,5 +1,5 @@
import type { Agent } from '../services/agent'; import type { Agent } from '../services/agent';
import { GatewayBot } from '@biscuitland/api-types'; import type { GatewayBot } from '@biscuitland/api-types';
export interface WsAdapter { export interface WsAdapter {
options: Partial<Options | any>; options: Partial<Options | any>;

View File

@ -29,7 +29,7 @@ export class Agent {
return this.options.handleMessage(shard, message); return this.options.handleMessage(shard, message);
}, },
handleIdentify: async function () { async handleIdentify() {
return await handleIdentify(id); return await handleIdentify(id);
}, },
@ -46,6 +46,7 @@ export class Agent {
*/ */
async identify(id: number) { async identify(id: number) {
// @ts-ignore
let shard = this.shards.get(id); let shard = this.shards.get(id);
if (!shard) { if (!shard) {
@ -60,7 +61,7 @@ export class Agent {
return this.options.handleMessage(shard, message); return this.options.handleMessage(shard, message);
}, },
handleIdentify: async function () { async handleIdentify() {
return await handleIdentify(id); return await handleIdentify(id);
}, },
@ -77,7 +78,9 @@ export class Agent {
* @inheritDoc * @inheritDoc
*/ */
async scale() {} async scale() {
//
}
} }
export type AgentOptions = Pick< export type AgentOptions = Pick<

View File

@ -202,9 +202,6 @@ export enum ShardState {
export class Shard { export class Shard {
static readonly DEFAULTS = { static readonly DEFAULTS = {
/** The total amount of shards which are used to communicate with Discord. */
totalShards: 1,
/** The maximum of requests which can be send to discord per rate limit tick. */ /** The maximum of requests which can be send to discord per rate limit tick. */
maxRequestsPerRateLimitTick: MAX_GATEWAY_REQUESTS_PER_INTERVAL, maxRequestsPerRateLimitTick: MAX_GATEWAY_REQUESTS_PER_INTERVAL,
@ -219,6 +216,8 @@ export class Shard {
offlineSendQueue: any[]; offlineSendQueue: any[];
totalShards: number;
sessionId!: string; sessionId!: string;
resolves: Map< resolves: Map<
@ -297,6 +296,8 @@ export class Shard {
interval: DEFAULT_HEARTBEAT_INTERVAL, interval: DEFAULT_HEARTBEAT_INTERVAL,
}; };
this.totalShards = this.options.totalShards,
this.state = ShardState.Offline; this.state = ShardState.Offline;
this.id = options.id; this.id = options.id;
@ -380,14 +381,17 @@ export class Shard {
data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer()))); data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer())));
} }
if (typeof data !== 'string') return; if (typeof data !== 'string') {
return;
}
const messageData = JSON.parse(data) as DiscordGatewayPayload; const messageData = JSON.parse(data) as DiscordGatewayPayload;
switch (messageData.op) { switch (messageData.op) {
case GatewayOpcodes.Heartbeat: { case GatewayOpcodes.Heartbeat: {
if (!this.isOpen()) return; if (!this.isOpen()) {
return;
}
this.heart.lastBeat = Date.now(); this.heart.lastBeat = Date.now();
@ -466,18 +470,17 @@ export class Shard {
this.state = ShardState.Connected; this.state = ShardState.Connected;
this.events.resumed?.(this); this.events.resumed?.(this);
this.offlineSendQueue.map((resolve) => resolve()); this.offlineSendQueue.map(resolve => resolve());
this.resolves.get('RESUMED')?.(messageData); this.resolves.get('RESUMED')?.(messageData);
this.resolves.delete('RESUMED'); this.resolves.delete('RESUMED');
} } else if (messageData.t === 'READY') {
else if (messageData.t === 'READY') {
const payload = messageData.d as DiscordReady; const payload = messageData.d as DiscordReady;
this.sessionId = payload.session_id; this.sessionId = payload.session_id;
this.state = ShardState.Connected; this.state = ShardState.Connected;
this.offlineSendQueue.map((resolve) => resolve()); this.offlineSendQueue.map(resolve => resolve());
this.resolves.get('READY')?.(messageData); this.resolves.get('READY')?.(messageData);
this.resolves.delete('READY'); this.resolves.delete('READY');
@ -537,7 +540,7 @@ export class Shard {
compress: this.options.gatewayConfig.compress, compress: this.options.gatewayConfig.compress,
properties: this.options.gatewayConfig.properties, properties: this.options.gatewayConfig.properties,
intents: this.options.gatewayConfig.intents, intents: this.options.gatewayConfig.intents,
shard: [this.id, this.options.totalShards] shard: [this.id, this.totalShards]
}, },
}, },
true true
@ -561,8 +564,6 @@ export class Shard {
*/ */
async connect(): Promise<void> { async connect(): Promise<void> {
let hi = false;
if ( if (
![ShardState.Identifying, ShardState.Resuming].includes(this.state!) ![ShardState.Identifying, ShardState.Resuming].includes(this.state!)
) { ) {
@ -580,31 +581,12 @@ export class Shard {
socket.onclose = (event: any) => this.handleClose(event); socket.onclose = (event: any) => this.handleClose(event);
socket.onmessage = (message: any) => { socket.onmessage = (message: any) => {
hi = true;
this.handleMessage(message); this.handleMessage(message);
}; };
return new Promise(resolve => { return new Promise(resolve => {
socket.onopen = () => { socket.onopen = () => {
setTimeout(() => { if (![ShardState.Identifying, ShardState.Resuming].includes(this.state)) {
if (!hi) {
this.handleMessage({
data: JSON.stringify({
t: null,
s: null,
op: 10,
d: { heartbeat_interval: 41250 },
}),
} as any);
}
}, 250);
if (
![ShardState.Identifying, ShardState.Resuming].includes(
this.state!
)
) {
this.state = ShardState.Unidentified; this.state = ShardState.Unidentified;
} }
@ -758,7 +740,7 @@ export class Shard {
close(code: number, reason: string): void { close(code: number, reason: string): void {
if (this.socket?.readyState !== WebSocket.OPEN) { if (this.socket?.readyState !== WebSocket.OPEN) {
return; return;
}; }
return this.socket?.close(code, reason); return this.socket?.close(code, reason);
} }