fix workerManager

This commit is contained in:
MARCROCK22 2024-03-11 17:58:13 -04:00
parent 5c68c4a636
commit 4a95cc962f
4 changed files with 85 additions and 41 deletions

View File

@ -1,12 +1,13 @@
import { parentPort, workerData } from 'node:worker_threads'; import { parentPort, workerData } from 'node:worker_threads';
import type { Command, CommandContext, Message, SubCommand } from '..'; import type { Command, CommandContext, Message, SubCommand } from '..';
import type { import {
DeepPartial, GatewayIntentBits,
GatewayDispatchPayload, type DeepPartial,
GatewayPresenceUpdateData, type GatewayDispatchPayload,
If, type GatewayPresenceUpdateData,
WatcherPayload, type If,
WatcherSendToShard, type WatcherPayload,
type WatcherSendToShard,
} from '../common'; } from '../common';
import { EventHandler } from '../events'; import { EventHandler } from '../events';
import { ClientUser } from '../structures'; import { ClientUser } from '../structures';
@ -145,15 +146,18 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
this.botId = packet.d.user.id; this.botId = packet.d.user.id;
this.applicationId = packet.d.application.id; this.applicationId = packet.d.application.id;
this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; this.me = new ClientUser(this, packet.d.user, packet.d.application) as never;
if (!this.__handleGuilds?.size) { if (
!this.__handleGuilds?.size ||
!((this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
) {
if ( if (
[...this.gateway.values()].every(shard => shard.data.session_id) && [...this.gateway.values()].every(shard => shard.data.session_id) &&
this.events.values.BOT_READY && this.events.values.BOT_READY &&
(this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true) (this.events.values.BOT_READY.fired ? !this.events.values.BOT_READY.data.once : true)
) { ) {
await this.events.runEvent('BOT_READY', this, this.me, -1); await this.events.runEvent('BOT_READY', this, this.me, -1);
delete this.__handleGuilds;
} }
delete this.__handleGuilds;
} }
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`); this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
break; break;
@ -168,6 +172,7 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
) { ) {
await this.events.runEvent('BOT_READY', this, this.me, -1); await this.events.runEvent('BOT_READY', this, this.me, -1);
} }
if (!this.__handleGuilds.size) delete this.__handleGuilds;
return; return;
} }
break; break;

View File

@ -2,7 +2,7 @@ import { workerData as __workerData__, parentPort as manager } from 'node:worker
import type { Cache } from '../cache'; import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache'; import { WorkerAdapter } from '../cache';
import type { GatewayDispatchPayload, GatewaySendPayload, When } from '../common'; import type { GatewayDispatchPayload, GatewaySendPayload, When } from '../common';
import { LogLevels, Logger, type DeepPartial } from '../common'; import { GatewayIntentBits, LogLevels, Logger, type DeepPartial } from '../common';
import { EventHandler } from '../events'; import { EventHandler } from '../events';
import { ClientUser } from '../structures'; import { ClientUser } from '../structures';
import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket'; import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket';
@ -60,6 +60,14 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
return workerData.workerId; return workerData.workerId;
} }
get latency() {
let acc = 0;
this.shards.forEach(s => (acc += s.latency));
return acc / this.shards.size;
}
async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) { async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) {
await super.start(options); await super.start(options);
await this.loadEvents(options.eventsDir); await this.loadEvents(options.eventsDir);
@ -208,7 +216,10 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
this.botId = packet.d.user.id; this.botId = packet.d.user.id;
this.applicationId = packet.d.application.id; this.applicationId = packet.d.application.id;
this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; this.me = new ClientUser(this, packet.d.user, packet.d.application) as never;
if (!this.__handleGuilds?.size) { if (
!this.__handleGuilds?.size ||
!((workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
) {
if ( if (
[...this.shards.values()].every(shard => shard.data.session_id) && [...this.shards.values()].every(shard => shard.data.session_id) &&
this.events.values.WORKER_READY && this.events.values.WORKER_READY &&
@ -219,8 +230,8 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
workerId: this.workerId, workerId: this.workerId,
} as WorkerReady); } as WorkerReady);
await this.events.runEvent('WORKER_READY', this, this.me, -1); await this.events.runEvent('WORKER_READY', this, this.me, -1);
delete this.__handleGuilds;
} }
delete this.__handleGuilds;
} }
this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`); this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`);
break; break;
@ -245,6 +256,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
} as WorkerReady); } as WorkerReady);
await this.events.runEvent('WORKER_READY', this, this.me, -1); await this.events.runEvent('WORKER_READY', this, this.me, -1);
} }
if (!this.__handleGuilds.size) delete this.__handleGuilds;
return; return;
} }
} }

View File

@ -14,7 +14,7 @@ export type ToClass<T, This> = new (
export type StringToNumber<T extends string> = T extends `${infer N extends number}` ? N : never; export type StringToNumber<T extends string> = T extends `${infer N extends number}` ? N : never;
export type MakePartial<T, K extends keyof T> = T & { [P in K]?: T[P] }; export type MakePartial<T, K extends keyof T> = Omit<T, K> & { [P in K]?: T[P] };
export type DeepPartial<T> = { export type DeepPartial<T> = {
[K in keyof T]?: T[K] extends Record<any, any> [K in keyof T]?: T[K] extends Record<any, any>

View File

@ -1,7 +1,15 @@
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
import { Worker } from 'node:worker_threads'; import { Worker } from 'node:worker_threads';
import { ApiHandler, Router } from '../..';
import { MemoryAdapter, type Adapter } from '../../cache'; import { MemoryAdapter, type Adapter } from '../../cache';
import { Logger, MergeOptions, type GatewayPresenceUpdateData, type GatewaySendPayload } from '../../common'; import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import {
Logger,
type MakePartial,
MergeOptions,
type GatewayPresenceUpdateData,
type GatewaySendPayload,
} from '../../common';
import { WorkerManagerDefaults } from '../constants'; import { WorkerManagerDefaults } from '../constants';
import { SequentialBucket } from '../structures'; import { SequentialBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout'; import { ConnectQueue } from '../structures/timeout';
@ -9,39 +17,18 @@ import { MemberUpdateHandler } from './events/memberUpdate';
import { PresenceUpdateHandler } from './events/presenceUpdate'; import { PresenceUpdateHandler } from './events/presenceUpdate';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared'; import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker'; import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker';
export class WorkerManager extends Map<number, Worker & { ready?: boolean }> {
export class WorkerManager extends Map<number, Worker> { options!: Required<WorkerManagerOptions>;
options: Required<WorkerManagerOptions>;
debugger?: Logger; debugger?: Logger;
connectQueue: ConnectQueue; connectQueue!: ConnectQueue;
cacheAdapter: Adapter; cacheAdapter: Adapter;
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>(); promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
memberUpdateHandler = new MemberUpdateHandler(); memberUpdateHandler = new MemberUpdateHandler();
presenceUpdateHandler = new PresenceUpdateHandler(); presenceUpdateHandler = new PresenceUpdateHandler();
constructor(options: WorkerManagerOptions) { rest!: ApiHandler;
constructor(options: MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info'>) {
super(); super();
options.totalShards ??= options.info.shards;
this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, options); this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, options);
this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker);
this.options.info.shards = options.totalShards;
options.shardEnd ??= options.totalShards;
options.shardStart ??= 0;
this.connectQueue = new ConnectQueue(5.5e3, this.concurrency);
if (this.options.debug) {
this.debugger = new Logger({
name: '[WorkerManager]',
});
}
if (this.totalShards / this.shardsPerWorker > this.workers) {
throw new Error(
`Cannot create enough shards in the specified workers, minimum: ${Math.ceil(
this.totalShards / this.shardsPerWorker,
)}`,
);
}
this.cacheAdapter = new MemoryAdapter(); this.cacheAdapter = new MemoryAdapter();
} }
@ -49,6 +36,10 @@ export class WorkerManager extends Map<number, Worker> {
this.cacheAdapter = adapter; this.cacheAdapter = adapter;
} }
setRest(rest: ApiHandler) {
this.rest = rest;
}
get remaining() { get remaining() {
return this.options.info.session_start_limit.remaining; return this.options.info.session_start_limit.remaining;
} }
@ -244,10 +235,14 @@ export class WorkerManager extends Map<number, Worker> {
break; break;
case 'WORKER_READY': case 'WORKER_READY':
{ {
if (message.workerId === [...this.keys()].at(-1)) { this.get(message.workerId)!.ready = true;
if ([...this.values()].every(w => w.ready)) {
this.get(this.keys().next().value)?.postMessage({ this.get(this.keys().next().value)?.postMessage({
type: 'BOT_READY', type: 'BOT_READY',
} satisfies ManagerSendBotReady); } satisfies ManagerSendBotReady);
this.forEach(w => {
delete w.ready;
});
} }
} }
break; break;
@ -330,6 +325,38 @@ export class WorkerManager extends Map<number, Worker> {
} }
async start() { async start() {
const rc = await BaseClient.prototype.getRC<InternalRuntimeConfig>();
this.options.debug ||= rc.debug;
this.options.intents ||= rc.intents ?? 0;
this.options.token ??= rc.token;
this.rest ??= new ApiHandler({
token: this.options.token,
baseUrl: 'api/v10',
domain: 'https://discord.com',
}); //TODO: share ratelimits with all workers
this.options.info ??= await new Router(this.rest).createProxy().gateway.bot.get();
this.options.totalShards ??= this.options.info.shards;
this.options = MergeOptions<Required<WorkerManagerOptions>>(WorkerManagerDefaults, this.options);
this.options.workers ??= Math.ceil(this.options.totalShards / this.options.shardsPerWorker);
this.options.info.shards = this.options.totalShards;
this.options.shardEnd ??= this.options.totalShards;
this.options.shardStart ??= 0;
this.connectQueue = new ConnectQueue(5.5e3, this.concurrency);
if (this.options.debug) {
this.debugger = new Logger({
name: '[WorkerManager]',
});
}
if (this.totalShards / this.shardsPerWorker > this.workers) {
throw new Error(
`Cannot create enough shards in the specified workers, minimum: ${Math.ceil(
this.totalShards / this.shardsPerWorker,
)}`,
);
}
const spaces = this.prepareSpaces(); const spaces = this.prepareSpaces();
await this.prepareWorkers(spaces); await this.prepareWorkers(spaces);
} }