fix: dynamicbucket queue

This commit is contained in:
MARCROCK22 2024-04-01 18:04:50 -04:00
parent e3aa0a932a
commit 206b3dfe63
6 changed files with 56 additions and 316 deletions

View File

@ -1,10 +1,10 @@
import { type GatewayDispatchPayload, GatewayIntentBits, type GatewaySendPayload } from 'discord-api-types/v10';
import { GatewayIntentBits, type GatewayDispatchPayload, type GatewaySendPayload } from 'discord-api-types/v10';
import { randomUUID } from 'node:crypto';
import { parentPort as manager } from 'node:worker_threads';
import { ApiHandler, Logger } from '..';
import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache';
import { type DeepPartial, LogLevels, type When } from '../common';
import { LogLevels, type DeepPartial, type When } from '../common';
import { EventHandler, type EventHandlerLike } from '../events';
import { ClientUser } from '../structures';
import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket';
@ -148,7 +148,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
return;
}
await shard.send(0, {
await shard.send({
...data,
} satisfies GatewaySendPayload);

View File

@ -5,7 +5,7 @@ import type WS from 'ws';
import { WebSocket, type CloseEvent, type ErrorEvent } from 'ws';
import type { Logger } from '../../common';
import { properties } from '../constants';
import { DynamicBucket, PriorityQueue } from '../structures';
import { DynamicBucket } from '../structures';
import { ConnectTimeout } from '../structures/timeout';
import { BaseSocket } from './basesocket';
import type { ShardData, ShardOptions } from './shared';
@ -33,7 +33,7 @@ export class Shard {
};
bucket: DynamicBucket;
offlineSendQueue = new PriorityQueue<(_?: unknown) => void>();
offlineSendQueue: ((_?: unknown) => void)[] = [];
constructor(
public id: number,
@ -47,12 +47,7 @@ export class Shard {
if (options.debugger) this.debugger = options.debugger;
const safe = this.calculateSafeRequests();
this.bucket = new DynamicBucket({
limit: safe,
refillAmount: safe,
refillInterval: 6e4,
debugger: this.debugger,
});
this.bucket = new DynamicBucket({ refillInterval: 6e4, limit: safe, debugger: options.debugger });
}
get latency() {
@ -102,7 +97,7 @@ export class Shard {
};
}
async send<T extends GatewaySendPayload = GatewaySendPayload>(priority: number, message: T) {
async send<T extends GatewaySendPayload = GatewaySendPayload>(message: T) {
this.debugger?.info(
`[Shard #${this.id}] Sending: ${GatewayOpcodes[message.op]} ${JSON.stringify(
message.d,
@ -117,14 +112,14 @@ export class Shard {
1,
)}`,
);
await this.checkOffline(priority);
await this.bucket.acquire(priority);
await this.checkOffline(priority);
await this.checkOffline();
await this.bucket.acquire();
await this.checkOffline();
this.websocket?.send(JSON.stringify(message));
}
async identify() {
await this.send(0, {
await this.send({
op: GatewayOpcodes.Identify,
d: {
token: `Bot ${this.options.token}`,
@ -142,7 +137,7 @@ export class Shard {
}
async resume() {
await this.send(0, {
await this.send({
op: GatewayOpcodes.Resume,
d: {
seq: this.data.resumeSeq!,
@ -234,13 +229,13 @@ export class Shard {
{
switch (packet.t) {
case GatewayDispatchEvents.Resumed:
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
this.offlineSendQueue.map((resolve: () => any) => resolve());
this.options.handlePayload(this.id, packet);
break;
case GatewayDispatchEvents.Ready: {
this.data.resume_gateway_url = packet.d.resume_gateway_url;
this.data.session_id = packet.d.session_id;
this.offlineSendQueue.toArray().map((resolve: () => any) => resolve());
this.offlineSendQueue.map((resolve: () => any) => resolve());
this.options.handlePayload(this.id, packet);
break;
}
@ -311,9 +306,9 @@ export class Shard {
return this.onpacket(JSON.parse(data as string));
}
checkOffline(priority: number) {
checkOffline() {
if (!this.isOpen) {
return new Promise(resolve => this.offlineSendQueue.push(resolve, priority));
return new Promise(resolve => this.offlineSendQueue.push(resolve));
}
return Promise.resolve();
}

View File

@ -14,7 +14,7 @@ import {
type WatcherSendToShard,
} from '../../common';
import { ShardManagerDefaults } from '../constants';
import { SequentialBucket } from '../structures';
import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout';
import { Shard } from './shard.js';
import type { ShardManagerOptions } from './shared';
@ -111,7 +111,7 @@ export class ShardManager extends Map<number, Shard> {
*/
spawnBuckets(): Shard[][] {
this.debugger?.info('#0 Preparing buckets');
const chunks = SequentialBucket.chunk(new Array(this.shardEnd - this.shardStart), this.concurrency);
const chunks = DynamicBucket.chunk(new Array(this.shardEnd - this.shardStart), this.concurrency);
chunks.forEach((arr: any[], index: number) => {
for (let i = 0; i < arr.length; i++) {
const id = i + (index > 0 ? index * this.concurrency : 0) + this.shardStart;
@ -148,7 +148,7 @@ export class ShardManager extends Map<number, Shard> {
});
}
setPresence(payload: GatewayUpdatePresence['d']): Promise<void> | undefined {
setPresence(payload: GatewayUpdatePresence['d']): Promise<void> {
return new Promise(resolve => {
this.forEach(shard => {
this.setShardPresence(shard.id, payload);
@ -198,6 +198,6 @@ export class ShardManager extends Map<number, Shard> {
payload,
} satisfies WatcherSendToShard);
}
this.get(shardId)?.send(1, payload);
this.get(shardId)?.send(payload);
}
}

View File

@ -5,9 +5,9 @@ import { Worker as ThreadWorker } from 'node:worker_threads';
import { ApiHandler, Logger, Router } from '../..';
import { MemoryAdapter, type Adapter } from '../../cache';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import { type MakePartial, MergeOptions } from '../../common';
import { MergeOptions, type MakePartial } from '../../common';
import { WorkerManagerDefaults } from '../constants';
import { SequentialBucket } from '../structures';
import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout';
import { MemberUpdateHandler } from './events/memberUpdate';
import { PresenceUpdateHandler } from './events/presenceUpdate';
@ -100,7 +100,7 @@ export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) &
prepareSpaces() {
this.debugger?.info('Preparing buckets');
const chunks = SequentialBucket.chunk<number>(
const chunks = DynamicBucket.chunk<number>(
new Array(this.shardEnd - this.shardStart),
this.options.shardsPerWorker,
);

View File

@ -1,10 +1,4 @@
import type { Logger } from '../../common';
import { delay } from '../../common';
/**
* just any kind of request to queue and resolve later
*/
export type QueuedRequest = (value: void | Promise<void>) => Promise<unknown> | any;
import { delay, type Logger } from '../../common';
/**
* options of the dynamic bucket
@ -12,323 +6,74 @@ export type QueuedRequest = (value: void | Promise<void>) => Promise<unknown> |
export interface DynamicBucketOptions {
limit: number;
refillInterval: number;
refillAmount: number;
debugger?: Logger;
}
/**
* generally useless for interaction based bots
* ideally this would only be triggered on certain paths
* example: a huge amount of messages being spammed
*
* a dynamic bucket is just a priority queue implemented using linked lists
* we create an empty bucket for every path
* dynamically allocating memory improves the final memory footprint
*/
export class DynamicBucket {
limit: number;
refillInterval: number;
refillAmount: number;
/** The queue of requests to acquire an available request. Mapped by <shardId, resolve()> */
queue = new PriorityQueue<QueuedRequest>();
/** The amount of requests that have been used up already. */
queue: ((value?: unknown) => any)[] = [];
used = 0;
/** Whether or not the queue is already processing. */
processing = false;
/** The timeout id for the timer to reduce the used amount by the refill amount. */
processing?: boolean;
refillsAt? = 0;
timeoutId?: NodeJS.Timeout;
/** The timestamp in milliseconds when the next refill is scheduled. */
refillsAt?: number;
debugger?: Logger;
constructor(options: DynamicBucketOptions) {
this.limit = options.limit;
this.refillInterval = options.refillInterval;
this.refillAmount = options.refillAmount;
if (options.debugger) {
this.debugger = options.debugger;
}
}
constructor(public options: DynamicBucketOptions) {}
get remaining(): number {
if (this.limit < this.used) {
if (this.options.limit < this.used) {
return 0;
}
return this.limit - this.used;
return this.options.limit - this.used;
}
refill(): void {
// Lower the used amount by the refill amount
this.used = this.refillAmount > this.used ? 0 : this.used - this.refillAmount;
// Reset the refillsAt timestamp since it just got refilled
refill() {
this.refillsAt = undefined;
if (this.timeoutId) {
clearTimeout(this.timeoutId);
this.timeoutId = undefined;
}
if (this.used > 0) {
if (this.timeoutId) {
clearTimeout(this.timeoutId);
}
this.used = 0;
this.timeoutId = setTimeout(() => {
this.refill();
}, this.refillInterval);
this.refillsAt = Date.now() + this.refillInterval;
}, this.options.refillInterval);
this.refillsAt = Date.now() + this.options.refillInterval;
}
}
/** Begin processing the queue. */
async processQueue(): Promise<void> {
// There is already a queue that is processing
if (this.processing) {
return;
}
// Begin going through the queue.
while (!this.queue.isEmpty()) {
async processQueue() {
if (this.processing) return;
this.processing = true;
while (this.queue.length) {
if (this.remaining) {
this.debugger?.debug(`Processing queue. Remaining: ${this.remaining} Length: ${this.queue.size()}`);
// Resolves the promise allowing the paused execution of this request to resolve and continue.
this.queue.peek()();
this.queue.pop();
// A request can be made
this.options.debugger?.debug(`Processing queue. Remaining: ${this.remaining} Length: ${this.queue.length}`);
this.queue.shift()!();
this.used++;
// Create a new timeout for this request if none exists.
if (!this.timeoutId) {
this.timeoutId = setTimeout(() => {
this.refill();
}, this.refillInterval);
// Set the time for when this refill will occur.
this.refillsAt = Date.now() + this.refillInterval;
}, this.options.refillInterval);
this.refillsAt = Date.now() + this.options.refillInterval;
}
// Check if a refill is scheduled, since we have used up all available requests
} else if (this.refillsAt) {
const now = Date.now();
// If there is time left until next refill, just delay execution.
if (this.refillsAt > now) {
this.options.debugger?.info(`Waiting ${this.refillsAt - now}ms to process queue`);
await delay(this.refillsAt - now);
this.used = 0;
}
}
}
// Loop has ended mark false so it can restart later when needed
this.processing = false;
}
/** Pauses the execution until the request is available to be made. */
async acquire(priority: number): Promise<void> {
return new Promise(resolve => {
this.queue.push(resolve, priority);
acquire() {
return new Promise(res => {
this.queue.push(res);
void this.processQueue();
});
}
toString() {
return [...this.queue].toString();
}
}
/**
* abstract node lol
*/
export interface AbstractNode<T> {
data: T;
next: this | null;
}
export interface QueuePusher<T> {
push(data: T): NonNullable<TNode<T>>;
}
export interface QueuePusherWithPriority<T> {
push(data: T, priority: number): NonNullable<PNode<T>>;
}
export class TNode<T> implements AbstractNode<T> {
data: T;
next: this | null;
constructor(data: T) {
this.data = data;
this.next = null;
}
static null<T>(list: AbstractNode<T> | null): list is null {
return !list;
}
}
export class PNode<T> extends TNode<T> {
priority: number;
constructor(data: T, priority: number) {
super(data);
this.priority = priority;
}
}
export abstract class Queue<T> {
protected abstract head: AbstractNode<T> | null;
/**
* O(1)
*/
public pop() {
if (TNode.null(this.head)) {
throw new Error('cannot pop a list without elements');
}
return (this.head = this.head.next);
}
/**
* O(1)
*/
public peek(): T {
if (TNode.null(this.head)) {
throw new Error('cannot peek an empty list');
}
return this.head.data;
}
/**
* O(n)
*/
public size(): number {
let aux = this.head;
if (TNode.null(aux)) {
return 0;
}
let count = 1;
while (aux.next !== null) {
count++;
aux = aux.next;
}
return count;
}
/**
* O(1)
*/
public isEmpty() {
return TNode.null(this.head);
}
*[Symbol.iterator](): IterableIterator<T> {
let temp = this.head;
while (temp !== null) {
yield temp.data;
temp = temp.next;
}
}
public toArray(): T[] {
return Array.from(this);
}
public toString() {
return this.head?.toString() || '';
}
}
export class LinkedList<T> extends Queue<T> implements QueuePusher<T> {
protected head: TNode<T> | null = null;
/**
* O(1)
*/
public push(data: T): NonNullable<TNode<T>> {
const temp = new TNode<T>(data);
temp.next = this.head;
this.head = temp;
return this.head;
}
}
export class PriorityQueue<T> extends Queue<T> implements QueuePusherWithPriority<T> {
protected head: PNode<T> | null = null;
/**
* O(#priorities)
*/
public push(data: T, priority: number): NonNullable<PNode<T>> {
let start = this.head;
const temp = new PNode(data, priority);
if (TNode.null(this.head) || TNode.null(start)) {
this.head = temp;
return this.head;
}
if (this.head.priority > priority) {
temp.next = this.head;
this.head = temp;
return this.head;
}
while (start.next !== null && start.next.priority < priority) {
start = start.next;
}
temp.next = start.next as PNode<T>;
start.next = temp;
return this.head;
}
}
export class SequentialBucket {
private connections: LinkedList<QueuedRequest>;
private capacity: number; // max_concurrency
private spawnTimeout: number;
constructor(maxCapacity: number) {
this.connections = new LinkedList();
this.capacity = maxCapacity;
this.spawnTimeout = 5000;
}
public async destroy() {
this.connections = new LinkedList();
}
public async push(promise: QueuedRequest) {
this.connections.push(promise);
if (this.capacity <= this.connections.size()) {
await this.acquire();
await delay(this.spawnTimeout);
}
return;
}
public async acquire(promises = this.connections) {
while (!promises.isEmpty()) {
const item = promises.peek();
item().catch((...args: any[]) => {
Promise.reject(...args);
});
promises.pop();
}
return Promise.resolve(true);
}
public static chunk<T>(array: T[], chunks: number): T[][] {
static chunk<T>(array: T[], chunks: number): T[][] {
let index = 0;
let resIndex = 0;
const result = Array(Math.ceil(array.length / chunks));

View File

@ -36,11 +36,11 @@ export class ConnectQueue {
public concurrency = 1,
) {}
push(callback: () => any) {
async push(callback: () => any) {
this.queue.push({ cb: callback });
if (this.queue.length === this.concurrency) {
for (let i = 0; i < this.concurrency; i++) {
this.queue[i].cb?.();
await this.queue[i].cb?.();
this.queue[i].cb = undefined;
}
this.interval = setInterval(() => {
@ -51,7 +51,7 @@ export class ConnectQueue {
}
}
shift(): any {
async shift(): Promise<any> {
const shift = this.queue.shift();
if (!shift) {
if (!this.queue.length) {
@ -61,7 +61,7 @@ export class ConnectQueue {
return;
}
if (!shift.cb) return this.shift();
shift.cb?.();
await shift.cb?.();
if (!this.queue.length) {
clearInterval(this.interval);
this.interval = undefined;