fix: types & workerClient#resumeShard

This commit is contained in:
MARCROCK22 2024-11-10 21:51:28 +00:00
parent 103dcc53f3
commit e4233e6a40
11 changed files with 124 additions and 92 deletions

View File

@ -1,5 +1,5 @@
import { type UUID, randomUUID } from 'node:crypto';
import { type Awaitable, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common';
import { type Awaitable, BASE_HOST, Logger, delay, lazyLoadPackage, snowflakeToTimestamp } from '../common';
import type { WorkerData } from '../websocket';
import type { WorkerSendApiRequest } from '../websocket/discord/worker';
import { CDNRouter, Router } from './Router';
@ -39,16 +39,12 @@ export class ApiHandler {
constructor(options: ApiHandlerOptions) {
this.options = {
baseUrl: 'api/v10',
domain: 'https://discord.com',
domain: BASE_HOST,
type: 'Bot',
...options,
userAgent: DefaultUserAgent,
};
if (options.debug) {
this.debugger = new Logger({
name: '[API]',
});
}
if (options.debug) this.debug = true;
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
@ -61,6 +57,14 @@ export class ApiHandler {
}
}
set debug(active: boolean) {
this.debugger = active
? new Logger({
name: '[API]',
})
: undefined;
}
get proxy() {
return (this._proxy_ ??= new Router(this).createProxy());
}

View File

@ -59,8 +59,8 @@ import type { LocaleString, RESTPostAPIChannelMessageJSONBody } from '../types';
import type { MessageStructure } from './transformers';
export class BaseClient {
rest!: ApiHandler;
cache!: Cache;
rest = new ApiHandler({ token: 'INVALID' });
cache = new Cache(0, new MemoryAdapter());
applications = new ApplicationShorter(this);
users = new UsersShorter(this);
@ -194,6 +194,7 @@ export class BaseClient {
setServices({ rest, cache, langs, middlewares, handleCommand }: ServicesOptions) {
if (rest) {
rest.onRatelimit ??= this.rest.onRatelimit?.bind(rest);
this.rest = rest;
}
if (cache) {
@ -213,7 +214,7 @@ export class BaseClient {
'users',
'voiceStates',
];
let disabledCache: Partial<Record<keyof Cache['disabledCache'], boolean>> = this.cache?.disabledCache ?? {};
let disabledCache: Partial<Record<keyof Cache['disabledCache'], boolean>> = this.cache.disabledCache;
if (typeof cache.disabledCache === 'boolean') {
for (const i of caches) {
@ -227,12 +228,7 @@ export class BaseClient {
disabledCache = cache.disabledCache;
}
this.cache = new Cache(
this.cache?.intents ?? 0,
cache?.adapter ?? this.cache?.adapter ?? new MemoryAdapter(),
disabledCache,
this,
);
this.cache = new Cache(this.cache.intents, cache.adapter ?? this.cache.adapter, disabledCache, this);
}
if (middlewares) {
this.middlewares = middlewares;
@ -270,22 +266,12 @@ export class BaseClient {
const { token: tokenRC, debug } = await this.getRC();
const token = options?.token ?? tokenRC;
if (!this.rest) {
BaseClient.assertString(token, 'token is not a string');
this.rest = new ApiHandler({
token,
baseUrl: 'api/v10',
domain: 'https://discord.com',
debug,
});
}
if (this.cache) {
if (this.rest.options.token === 'INVALID') this.rest.options.token = token;
this.rest.debug = debug;
this.cache.__setClient(this);
} else {
this.cache = new Cache(0, new MemoryAdapter(), {}, this);
}
if (!this.handleCommand) this.handleCommand = new HandleCommand(this);
@ -310,7 +296,6 @@ export class BaseClient {
}
private syncCachePath(cachePath: string) {
this.logger.debug('Syncing commands cache');
return promises.writeFile(
cachePath,
JSON.stringify(

View File

@ -94,6 +94,7 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
const { token: tokenRC, intents: intentsRC, debug: debugRC } = await this.getRC<InternalRuntimeConfig>();
const token = options?.token ?? tokenRC;
const intents = options?.connection?.intents ?? intentsRC;
this.cache.intents = intents;
if (!this.gateway) {
BaseClient.assertString(token, 'token is not a string');
@ -123,8 +124,6 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
});
}
this.cache.intents = this.gateway.options.intents;
if (execute) {
await this.execute(options.connection);
} else {
@ -138,7 +137,6 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
this.collectors.run('RAW', packet, this),
]); //ignore promise
switch (packet.t) {
// Cases where we must obtain the old data before updating
case 'GUILD_MEMBER_UPDATE':
{
if (!this.memberUpdateHandler.check(packet.d)) {

View File

@ -1,7 +1,7 @@
import { type UUID, randomUUID } from 'node:crypto';
import { ApiHandler, Logger } from '..';
import { WorkerAdapter } from '../cache';
import { type DeepPartial, LogLevels, type When, hasIntent, lazyLoadPackage } from '../common';
import { type DeepPartial, LogLevels, type MakeRequired, type When, hasIntent, lazyLoadPackage } from '../common';
import { EventHandler } from '../events';
import type { GatewayDispatchPayload, GatewaySendPayload } from '../types';
import { Shard, type ShardManagerOptions, type WorkerData, properties } from '../websocket';
@ -23,13 +23,14 @@ import type {
WorkerStart,
WorkerStartResharding,
} from '../websocket/discord/worker';
import type { ManagerMessages } from '../websocket/discord/workermanager';
import type { ManagerMessages, ManagerSpawnShards } from '../websocket/discord/workermanager';
import type { BaseClientOptions, ServicesOptions, StartOptions } from './base';
import { BaseClient } from './base';
import type { Client, ClientOptions } from './client';
import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate';
import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate';
import type { ShardData } from '../websocket/discord/shared';
import { Collectors } from './collectors';
import { type ClientUserStructure, Transformers } from './transformers';
@ -40,7 +41,7 @@ try {
debug: process.env.SEYFERT_WORKER_DEBUG === 'true',
intents: Number(process.env.SEYFERT_WORKER_INTENTS),
path: process.env.SEYFERT_WORKER_PATH!,
shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number(id)),
shards: JSON.parse(process.env.SEYFERT_WORKER_SHARDS!),
token: process.env.SEYFERT_WORKER_TOKEN!,
workerId: Number(process.env.SEYFERT_WORKER_WORKERID),
workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true',
@ -48,6 +49,8 @@ try {
mode: process.env.SEYFERT_WORKER_MODE as 'custom' | 'threads' | 'clusters',
resharding: process.env.SEYFERT_WORKER_RESHARDING === 'true',
totalWorkers: Number(process.env.SEYFERT_WORKER_TOTALWORKERS),
info: JSON.parse(process.env.SEYFERT_WORKER_INFO!),
compress: process.env.SEYFERT_WORKER_COMPRESS === 'true',
} satisfies WorkerData;
} catch {
//
@ -155,6 +158,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}),
});
}
this.cache.intents = workerData.intents;
this.postMessage({
type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START',
workerId: workerData.workerId,
@ -164,7 +168,6 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
}
await super.start(options);
await this.loadEvents(options.eventsDir);
this.cache.intents = workerData.intents;
}
async loadEvents(dir?: string) {
@ -295,9 +298,6 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
break;
case 'SPAWN_SHARDS':
{
const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this);
const self = this;
for (const id of workerData.shards) {
const existsShard = this.shards.has(id);
if (existsShard) {
@ -305,28 +305,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
continue;
}
const shard = new Shard(id, {
token: workerData.token,
intents: workerData.intents,
info: data.info,
compress: data.compress,
debugger: this.debugger,
properties: {
...properties,
...this.options.gateway?.properties,
},
async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload);
await onPacket(payload, shardId);
if (self.options.sendPayloadToParent)
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
},
});
const shard = this.createShard(id, data);
this.shards.set(id, shard);
this.postMessage({
type: 'CONNECT_QUEUE',
@ -380,7 +359,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
let result: unknown;
try {
result = await eval(`
(${data.func})(this)
(${data.func})(this, ${data.vars})
`);
} catch (e) {
result = e;
@ -455,7 +434,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
});
}
tellWorker<R>(workerId: number, func: (_: this) => R) {
tellWorker<R, V extends Record<string, unknown>>(workerId: number, func: (_: this, vars: V) => R, vars: V) {
const nonce = this.generateNonce();
this.postMessage({
type: 'EVAL_TO_WORKER',
@ -463,25 +442,71 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
toWorkerId: workerId,
workerId: workerData.workerId,
nonce,
vars: JSON.stringify(vars),
} satisfies WorkerSendToWorkerEval);
return this.generateSendPromise<R>(nonce);
}
tellWorkers<R>(func: (_: this) => R) {
tellWorkers<R, V extends Record<string, unknown>>(func: (_: this, vars: V) => R, vars: V) {
const promises: Promise<R>[] = [];
for (let i = 0; i < workerData.totalWorkers; i++) {
promises.push(this.tellWorker(i, func));
promises.push(this.tellWorker(i, func, vars));
}
return Promise.all(promises);
}
createShard(id: number, data: Pick<ManagerSpawnShards, 'info' | 'compress'>) {
const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this);
const self = this;
const shard = new Shard(id, {
token: workerData.token,
intents: workerData.intents,
info: data.info,
compress: data.compress,
debugger: this.debugger,
properties: {
...properties,
...this.options.gateway?.properties,
},
async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload);
await onPacket(payload, shardId);
if (self.options.sendPayloadToParent)
self.postMessage({
workerId: workerData.workerId,
shardId,
type: 'RECEIVE_PAYLOAD',
payload,
} satisfies WorkerReceivePayload);
},
});
return shard;
}
async resumeShard(shardId: number, shardData: MakeRequired<ShardData>) {
const exists = (await this.tellWorkers((r, vars) => r.shards.has(vars.shardId), { shardId })).some(x => x);
if (exists) throw new Error('Cannot override existing shard');
const shard = this.createShard(shardId, {
info: this.workerData.info,
compress: this.workerData.compress,
});
shard.data = shardData;
this.shards.set(shardId, shard);
return this.postMessage({
workerId: this.workerId,
shardId,
type: 'CONNECT_QUEUE',
});
}
protected async onPacket(packet: GatewayDispatchPayload, shardId: number) {
Promise.allSettled([
this.events?.runEvent('RAW', this, packet, shardId, false),
this.collectors.run('RAW', packet, this),
]); //ignore promise
switch (packet.t) {
//// Cases where we must obtain the old data before updating
case 'GUILD_MEMBER_UPDATE':
{
if (!this.memberUpdateHandler.check(packet.d)) {

View File

@ -466,16 +466,18 @@ export class CommandHandler extends BaseHandler {
stablishContextCommandDefaults(commandInstance: InstanceType<HandleableCommand>): ContextMenuCommand | false {
if (!(commandInstance instanceof ContextMenuCommand)) return false;
commandInstance.onAfterRun ??= this.client.options.commands?.defaults?.onAfterRun;
//@ts-expect-error magic.
if (this.client.options.commands?.defaults?.onBotPermissionsFail)
commandInstance.onBotPermissionsFail ??= this.client.options.commands?.defaults?.onBotPermissionsFail;
//@ts-expect-error magic.
commandInstance.onInternalError ??= this.client.options.commands?.defaults?.onInternalError;
//@ts-expect-error magic.
commandInstance.onMiddlewaresError ??= this.client.options.commands?.defaults?.onMiddlewaresError;
//@ts-expect-error magic.
commandInstance.onPermissionsFail ??= this.client.options.commands?.defaults?.onPermissionsFail;
//@ts-expect-error magic.
commandInstance.onRunError ??= this.client.options.commands?.defaults?.onRunError;
if (this.client.options.commands?.defaults?.onInternalError)
commandInstance.onInternalError ??= this.client.options.commands.defaults.onInternalError;
if (this.client.options.commands?.defaults?.onMiddlewaresError)
commandInstance.onMiddlewaresError ??= this.client.options.commands.defaults.onMiddlewaresError;
if (this.client.options.commands?.defaults?.onRunError)
commandInstance.onRunError ??= this.client.options.commands.defaults.onRunError;
return commandInstance;
}

View File

@ -9,6 +9,7 @@ import type {
ModalCreateBodyRequest,
UnionToTuple,
} from '../common';
import type { Interaction } from '../structures/Interaction';
import { MessageFlags } from '../types';
export interface ModalContext extends BaseContext, ExtendContext {}
@ -101,7 +102,7 @@ export class ModalContext<M extends keyof RegisteredMiddlewares = never> extends
return this.interaction.deleteResponse();
}
modal(body: ModalCreateBodyRequest) {
modal(body: ModalCreateBodyRequest): ReturnType<Interaction['modal']> {
//@ts-expect-error
return this.interaction.modal(body);
}

View File

@ -224,21 +224,21 @@ export class EventHandler extends BaseHandler {
async runCustom<T extends CustomEventsKeys>(name: T, ...args: ResolveEventRunParams<T>) {
const Event = this.values[name];
if (!Event) {
// @ts-expect-error working with non-existent types is hard
// @ts-expect-error
return this.client.collectors.run(name, args, this.client);
}
try {
if (Event.data.once && Event.fired) {
// @ts-expect-error working with non-existent types is hard
// @ts-expect-error
return this.client.collectors.run(name, args, this.client);
}
Event.fired = true;
this.logger.debug(`executed a custom event [${name}]`, Event.data.once ? 'once' : '');
await Promise.all([
// @ts-expect-error working with non-existent types is hard
// @ts-expect-error
Event.run(...args, this.client),
// @ts-expect-error working with non-existent types is hard
// @ts-expect-error
this.client.collectors.run(name, args, this.client),
]);
} catch (e) {

View File

@ -105,8 +105,8 @@ export class Shard {
this.debugger?.debug(`[Shard #${this.id}] Connecting to ${this.currentGatewayURL}`);
// @ts-expect-error @types/bun cause erros in compile
// biome-ignore lint/correctness/noUndeclaredVariables: /\ bun lol
// @ts-expect-error Use native websocket when using Bun
// biome-ignore lint/correctness/noUndeclaredVariables: /\
this.websocket = new BaseSocket(typeof Bun === 'undefined' ? 'ws' : 'bun', this.currentGatewayURL);
this.websocket.onmessage = ({ data }: { data: string | Buffer }) => {

View File

@ -140,6 +140,8 @@ export interface WorkerData {
workerId: number;
debug: boolean;
workerProxy: boolean;
info: APIGatewayBotInfo;
compress: boolean;
__USING_WATCHER__?: boolean;
resharding: boolean;
}

View File

@ -81,6 +81,7 @@ export type WorkerSendToWorkerEval = CreateWorkerMessage<
{
func: string;
nonce: string;
vars: string;
toWorkerId: number;
}
>;

View File

@ -4,7 +4,7 @@ import type { Worker as WorkerThreadsWorker } from 'node:worker_threads';
import { ApiHandler, Logger, type UsingClient, type WorkerClient } from '../..';
import { type Adapter, MemoryAdapter } from '../../cache';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import { type MakePartial, MergeOptions, lazyLoadPackage } from '../../common';
import { BASE_HOST, type MakePartial, MergeOptions, lazyLoadPackage } from '../../common';
import type { GatewayPresenceUpdateData, GatewaySendPayload, RESTGetAPIGatewayBotResult } from '../../types';
import { WorkerManagerDefaults, properties } from '../constants';
import { DynamicBucket } from '../structures';
@ -169,6 +169,11 @@ export class WorkerManager extends Map<
mode: this.options.mode,
resharding,
totalWorkers: shards.length,
info: {
...this.options.info,
shards: this.totalShards,
},
compress: this.options.compress,
});
this.set(i, worker);
});
@ -193,7 +198,8 @@ export class WorkerManager extends Map<
};
if (workerData.resharding) env.SEYFERT_WORKER_RESHARDING = 'true';
for (const i in workerData) {
env[`SEYFERT_WORKER_${i.toUpperCase()}`] = workerData[i as keyof WorkerData];
const data = workerData[i as keyof WorkerData];
env[`SEYFERT_WORKER_${i.toUpperCase()}`] = typeof data === 'object' && data ? JSON.stringify(data) : data;
}
switch (this.options.mode) {
case 'threads': {
@ -425,6 +431,7 @@ export class WorkerManager extends Map<
func: message.func,
type: 'EXECUTE_EVAL_TO_WORKER',
toWorkerId: message.toWorkerId,
vars: message.vars,
} satisfies ManagerExecuteEvalToWorker);
this.generateSendPromise(nonce, 'Eval timeout').then(val =>
this.postMessage(message.workerId, {
@ -503,20 +510,25 @@ export class WorkerManager extends Map<
return this.generateSendPromise<WorkerInfo>(nonce, 'Get worker info timeout');
}
tellWorker<R>(workerId: number, func: (_: WorkerClient & UsingClient) => R) {
tellWorker<R, V extends Record<string, unknown>>(
workerId: number,
func: (_: WorkerClient & UsingClient, vars: V) => R,
vars: V,
) {
const nonce = this.generateNonce();
this.postMessage(workerId, {
type: 'EXECUTE_EVAL',
func: func.toString(),
nonce,
vars: JSON.stringify(vars),
} satisfies ManagerExecuteEval);
return this.generateSendPromise<R>(nonce);
}
tellWorkers<R>(func: (_: WorkerClient & UsingClient) => R) {
tellWorkers<R, V extends Record<string, unknown>>(func: (_: WorkerClient & UsingClient, vars: V) => R, vars: V) {
const promises: Promise<R>[] = [];
for (const i of this.keys()) {
promises.push(this.tellWorker(i, func));
promises.push(this.tellWorker(i, func, vars));
}
return Promise.all(promises);
}
@ -530,7 +542,7 @@ export class WorkerManager extends Map<
this.rest ??= new ApiHandler({
token: this.options.token,
baseUrl: 'api/v10',
domain: 'https://discord.com',
domain: BASE_HOST,
debug: this.options.debug,
});
this.options.info ??= await this.rest.proxy.gateway.bot.get();
@ -562,7 +574,7 @@ export class WorkerManager extends Map<
},
this.debugger,
);
await this.prepareWorkers(spaces);
this.prepareWorkers(spaces);
// Start workers queue
this.workerQueue.shift()!();
await this.startResharding();
@ -645,6 +657,7 @@ export type ManagerExecuteEvalToWorker = CreateManagerMessage<
{
func: string;
nonce: string;
vars: string;
toWorkerId: number;
}
>;
@ -653,6 +666,7 @@ export type ManagerExecuteEval = CreateManagerMessage<
'EXECUTE_EVAL',
{
func: string;
vars: string;
nonce: string;
}
>;