diff --git a/src/client/workerclient.ts b/src/client/workerclient.ts index 48e4d0f..73b0416 100644 --- a/src/client/workerclient.ts +++ b/src/client/workerclient.ts @@ -1,10 +1,10 @@ -import { type GatewayDispatchPayload, GatewayIntentBits, type GatewaySendPayload } from 'discord-api-types/v10'; +import { GatewayIntentBits, type GatewayDispatchPayload, type GatewaySendPayload } from 'discord-api-types/v10'; import { randomUUID } from 'node:crypto'; import { parentPort as manager } from 'node:worker_threads'; import { ApiHandler, Logger } from '..'; import type { Cache } from '../cache'; import { WorkerAdapter } from '../cache'; -import { type DeepPartial, LogLevels, type When } from '../common'; +import { LogLevels, type DeepPartial, type When } from '../common'; import { EventHandler, type EventHandlerLike } from '../events'; import { ClientUser } from '../structures'; import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket'; @@ -148,7 +148,7 @@ export class WorkerClient extends BaseClient { return; } - await shard.send(0, { + await shard.send({ ...data, } satisfies GatewaySendPayload); diff --git a/src/websocket/discord/shard.ts b/src/websocket/discord/shard.ts index c0644d2..99b7559 100644 --- a/src/websocket/discord/shard.ts +++ b/src/websocket/discord/shard.ts @@ -5,7 +5,7 @@ import type WS from 'ws'; import { WebSocket, type CloseEvent, type ErrorEvent } from 'ws'; import type { Logger } from '../../common'; import { properties } from '../constants'; -import { DynamicBucket, PriorityQueue } from '../structures'; +import { DynamicBucket } from '../structures'; import { ConnectTimeout } from '../structures/timeout'; import { BaseSocket } from './basesocket'; import type { ShardData, ShardOptions } from './shared'; @@ -33,7 +33,7 @@ export class Shard { }; bucket: DynamicBucket; - offlineSendQueue = new PriorityQueue<(_?: unknown) => void>(); + offlineSendQueue: ((_?: unknown) => void)[] = []; constructor( public id: number, @@ -47,12 +47,7 @@ export class Shard { if (options.debugger) this.debugger = options.debugger; const safe = this.calculateSafeRequests(); - this.bucket = new DynamicBucket({ - limit: safe, - refillAmount: safe, - refillInterval: 6e4, - debugger: this.debugger, - }); + this.bucket = new DynamicBucket({ refillInterval: 6e4, limit: safe, debugger: options.debugger }); } get latency() { @@ -102,7 +97,7 @@ export class Shard { }; } - async send(priority: number, message: T) { + async send(message: T) { this.debugger?.info( `[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify( message.d, @@ -117,14 +112,14 @@ export class Shard { 1, )}`, ); - await this.checkOffline(priority); - await this.bucket.acquire(priority); - await this.checkOffline(priority); + await this.checkOffline(); + await this.bucket.acquire(); + await this.checkOffline(); this.websocket?.send(JSON.stringify(message)); } async identify() { - await this.send(0, { + await this.send({ op: GatewayOpcodes.Identify, d: { token: `Bot ${this.options.token}`, @@ -142,7 +137,7 @@ export class Shard { } async resume() { - await this.send(0, { + await this.send({ op: GatewayOpcodes.Resume, d: { seq: this.data.resumeSeq!, @@ -234,13 +229,13 @@ export class Shard { { switch (packet.t) { case GatewayDispatchEvents.Resumed: - this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); + this.offlineSendQueue.map((resolve: () => any) => resolve()); this.options.handlePayload(this.id, packet); break; case GatewayDispatchEvents.Ready: { this.data.resume_gateway_url = packet.d.resume_gateway_url; this.data.session_id = packet.d.session_id; - this.offlineSendQueue.toArray().map((resolve: () => any) => resolve()); + this.offlineSendQueue.map((resolve: () => any) => resolve()); this.options.handlePayload(this.id, packet); break; } @@ -311,9 +306,9 @@ export class Shard { return this.onpacket(JSON.parse(data as string)); } - checkOffline(priority: number) { + checkOffline() { if (!this.isOpen) { - return new Promise(resolve => this.offlineSendQueue.push(resolve, priority)); + return new Promise(resolve => this.offlineSendQueue.push(resolve)); } return Promise.resolve(); } diff --git a/src/websocket/discord/sharder.ts b/src/websocket/discord/sharder.ts index 08f1193..3933d5c 100644 --- a/src/websocket/discord/sharder.ts +++ b/src/websocket/discord/sharder.ts @@ -14,7 +14,7 @@ import { type WatcherSendToShard, } from '../../common'; import { ShardManagerDefaults } from '../constants'; -import { SequentialBucket } from '../structures'; +import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; import { Shard } from './shard.js'; import type { ShardManagerOptions } from './shared'; @@ -111,7 +111,7 @@ export class ShardManager extends Map { */ spawnBuckets(): Shard[][] { this.debugger?.info('#0 Preparing buckets'); - const chunks = SequentialBucket.chunk(new Array(this.shardEnd - this.shardStart), this.concurrency); + const chunks = DynamicBucket.chunk(new Array(this.shardEnd - this.shardStart), this.concurrency); chunks.forEach((arr: any[], index: number) => { for (let i = 0; i < arr.length; i++) { const id = i + (index > 0 ? index * this.concurrency : 0) + this.shardStart; @@ -148,7 +148,7 @@ export class ShardManager extends Map { }); } - setPresence(payload: GatewayUpdatePresence['d']): Promise | undefined { + setPresence(payload: GatewayUpdatePresence['d']): Promise { return new Promise(resolve => { this.forEach(shard => { this.setShardPresence(shard.id, payload); @@ -198,6 +198,6 @@ export class ShardManager extends Map { payload, } satisfies WatcherSendToShard); } - this.get(shardId)?.send(1, payload); + this.get(shardId)?.send(payload); } } diff --git a/src/websocket/discord/workermanager.ts b/src/websocket/discord/workermanager.ts index cbbd32d..3850b18 100644 --- a/src/websocket/discord/workermanager.ts +++ b/src/websocket/discord/workermanager.ts @@ -5,9 +5,9 @@ import { Worker as ThreadWorker } from 'node:worker_threads'; import { ApiHandler, Logger, Router } from '../..'; import { MemoryAdapter, type Adapter } from '../../cache'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base'; -import { type MakePartial, MergeOptions } from '../../common'; +import { MergeOptions, type MakePartial } from '../../common'; import { WorkerManagerDefaults } from '../constants'; -import { SequentialBucket } from '../structures'; +import { DynamicBucket } from '../structures'; import { ConnectQueue } from '../structures/timeout'; import { MemberUpdateHandler } from './events/memberUpdate'; import { PresenceUpdateHandler } from './events/presenceUpdate'; @@ -100,7 +100,7 @@ export class WorkerManager extends Map( + const chunks = DynamicBucket.chunk( new Array(this.shardEnd - this.shardStart), this.options.shardsPerWorker, ); diff --git a/src/websocket/structures/index.ts b/src/websocket/structures/index.ts index e718723..8469b6e 100644 --- a/src/websocket/structures/index.ts +++ b/src/websocket/structures/index.ts @@ -1,10 +1,4 @@ -import type { Logger } from '../../common'; -import { delay } from '../../common'; - -/** - * just any kind of request to queue and resolve later - */ -export type QueuedRequest = (value: void | Promise) => Promise | any; +import { delay, type Logger } from '../../common'; /** * options of the dynamic bucket @@ -12,323 +6,74 @@ export type QueuedRequest = (value: void | Promise) => Promise | export interface DynamicBucketOptions { limit: number; refillInterval: number; - refillAmount: number; debugger?: Logger; } -/** - * generally useless for interaction based bots - * ideally this would only be triggered on certain paths - * example: a huge amount of messages being spammed - * - * a dynamic bucket is just a priority queue implemented using linked lists - * we create an empty bucket for every path - * dynamically allocating memory improves the final memory footprint - */ export class DynamicBucket { - limit: number; - refillInterval: number; - refillAmount: number; - - /** The queue of requests to acquire an available request. Mapped by */ - queue = new PriorityQueue(); - - /** The amount of requests that have been used up already. */ + queue: ((value?: unknown) => any)[] = []; used = 0; - - /** Whether or not the queue is already processing. */ - processing = false; - - /** The timeout id for the timer to reduce the used amount by the refill amount. */ + processing?: boolean; + refillsAt? = 0; timeoutId?: NodeJS.Timeout; - /** The timestamp in milliseconds when the next refill is scheduled. */ - refillsAt?: number; - - debugger?: Logger; - - constructor(options: DynamicBucketOptions) { - this.limit = options.limit; - this.refillInterval = options.refillInterval; - this.refillAmount = options.refillAmount; - - if (options.debugger) { - this.debugger = options.debugger; - } - } + constructor(public options: DynamicBucketOptions) {} get remaining(): number { - if (this.limit < this.used) { + if (this.options.limit < this.used) { return 0; } - return this.limit - this.used; + return this.options.limit - this.used; } - refill(): void { - // Lower the used amount by the refill amount - this.used = this.refillAmount > this.used ? 0 : this.used - this.refillAmount; - - // Reset the refillsAt timestamp since it just got refilled + refill() { this.refillsAt = undefined; - + if (this.timeoutId) { + clearTimeout(this.timeoutId); + this.timeoutId = undefined; + } if (this.used > 0) { - if (this.timeoutId) { - clearTimeout(this.timeoutId); - } + this.used = 0; this.timeoutId = setTimeout(() => { this.refill(); - }, this.refillInterval); - this.refillsAt = Date.now() + this.refillInterval; + }, this.options.refillInterval); + this.refillsAt = Date.now() + this.options.refillInterval; } } - /** Begin processing the queue. */ - async processQueue(): Promise { - // There is already a queue that is processing - if (this.processing) { - return; - } - - // Begin going through the queue. - while (!this.queue.isEmpty()) { + async processQueue() { + if (this.processing) return; + this.processing = true; + while (this.queue.length) { if (this.remaining) { - this.debugger?.debug(`Processing queue. Remaining: ${this.remaining} Length: ${this.queue.size()}`); - // Resolves the promise allowing the paused execution of this request to resolve and continue. - this.queue.peek()(); - this.queue.pop(); - - // A request can be made + this.options.debugger?.debug(`Processing queue. Remaining: ${this.remaining} Length: ${this.queue.length}`); + this.queue.shift()!(); this.used++; - - // Create a new timeout for this request if none exists. if (!this.timeoutId) { this.timeoutId = setTimeout(() => { this.refill(); - }, this.refillInterval); - - // Set the time for when this refill will occur. - this.refillsAt = Date.now() + this.refillInterval; + }, this.options.refillInterval); + this.refillsAt = Date.now() + this.options.refillInterval; } - // Check if a refill is scheduled, since we have used up all available requests } else if (this.refillsAt) { const now = Date.now(); - // If there is time left until next refill, just delay execution. if (this.refillsAt > now) { + this.options.debugger?.info(`Waiting ${this.refillsAt - now}ms to process queue`); await delay(this.refillsAt - now); + this.used = 0; } } } - - // Loop has ended mark false so it can restart later when needed this.processing = false; } - /** Pauses the execution until the request is available to be made. */ - async acquire(priority: number): Promise { - return new Promise(resolve => { - this.queue.push(resolve, priority); + acquire() { + return new Promise(res => { + this.queue.push(res); void this.processQueue(); }); } - toString() { - return [...this.queue].toString(); - } -} - -/** - * abstract node lol - */ -export interface AbstractNode { - data: T; - next: this | null; -} - -export interface QueuePusher { - push(data: T): NonNullable>; -} - -export interface QueuePusherWithPriority { - push(data: T, priority: number): NonNullable>; -} - -export class TNode implements AbstractNode { - data: T; - next: this | null; - - constructor(data: T) { - this.data = data; - this.next = null; - } - - static null(list: AbstractNode | null): list is null { - return !list; - } -} - -export class PNode extends TNode { - priority: number; - - constructor(data: T, priority: number) { - super(data); - this.priority = priority; - } -} - -export abstract class Queue { - protected abstract head: AbstractNode | null; - - /** - * O(1) - */ - public pop() { - if (TNode.null(this.head)) { - throw new Error('cannot pop a list without elements'); - } - - return (this.head = this.head.next); - } - - /** - * O(1) - */ - public peek(): T { - if (TNode.null(this.head)) { - throw new Error('cannot peek an empty list'); - } - - return this.head.data; - } - - /** - * O(n) - */ - public size(): number { - let aux = this.head; - - if (TNode.null(aux)) { - return 0; - } - - let count = 1; - - while (aux.next !== null) { - count++; - aux = aux.next; - } - - return count; - } - - /** - * O(1) - */ - public isEmpty() { - return TNode.null(this.head); - } - - *[Symbol.iterator](): IterableIterator { - let temp = this.head; - while (temp !== null) { - yield temp.data; - temp = temp.next; - } - } - - public toArray(): T[] { - return Array.from(this); - } - - public toString() { - return this.head?.toString() || ''; - } -} - -export class LinkedList extends Queue implements QueuePusher { - protected head: TNode | null = null; - - /** - * O(1) - */ - public push(data: T): NonNullable> { - const temp = new TNode(data); - temp.next = this.head; - this.head = temp; - - return this.head; - } -} - -export class PriorityQueue extends Queue implements QueuePusherWithPriority { - protected head: PNode | null = null; - - /** - * O(#priorities) - */ - public push(data: T, priority: number): NonNullable> { - let start = this.head; - const temp = new PNode(data, priority); - - if (TNode.null(this.head) || TNode.null(start)) { - this.head = temp; - return this.head; - } - - if (this.head.priority > priority) { - temp.next = this.head; - this.head = temp; - return this.head; - } - - while (start.next !== null && start.next.priority < priority) { - start = start.next; - } - - temp.next = start.next as PNode; - start.next = temp; - - return this.head; - } -} - -export class SequentialBucket { - private connections: LinkedList; - private capacity: number; // max_concurrency - private spawnTimeout: number; - - constructor(maxCapacity: number) { - this.connections = new LinkedList(); - this.capacity = maxCapacity; - this.spawnTimeout = 5000; - } - - public async destroy() { - this.connections = new LinkedList(); - } - - public async push(promise: QueuedRequest) { - this.connections.push(promise); - - if (this.capacity <= this.connections.size()) { - await this.acquire(); - await delay(this.spawnTimeout); - } - return; - } - - public async acquire(promises = this.connections) { - while (!promises.isEmpty()) { - const item = promises.peek(); - item().catch((...args: any[]) => { - Promise.reject(...args); - }); - promises.pop(); - } - - return Promise.resolve(true); - } - - public static chunk(array: T[], chunks: number): T[][] { + static chunk(array: T[], chunks: number): T[][] { let index = 0; let resIndex = 0; const result = Array(Math.ceil(array.length / chunks)); diff --git a/src/websocket/structures/timeout.ts b/src/websocket/structures/timeout.ts index 6f6e40d..aa73882 100644 --- a/src/websocket/structures/timeout.ts +++ b/src/websocket/structures/timeout.ts @@ -36,11 +36,11 @@ export class ConnectQueue { public concurrency = 1, ) {} - push(callback: () => any) { + async push(callback: () => any) { this.queue.push({ cb: callback }); if (this.queue.length === this.concurrency) { for (let i = 0; i < this.concurrency; i++) { - this.queue[i].cb?.(); + await this.queue[i].cb?.(); this.queue[i].cb = undefined; } this.interval = setInterval(() => { @@ -51,7 +51,7 @@ export class ConnectQueue { } } - shift(): any { + async shift(): Promise { const shift = this.queue.shift(); if (!shift) { if (!this.queue.length) { @@ -61,7 +61,7 @@ export class ConnectQueue { return; } if (!shift.cb) return this.shift(); - shift.cb?.(); + await shift.cb?.(); if (!this.queue.length) { clearInterval(this.interval); this.interval = undefined;