fix: cache flow

This commit is contained in:
MARCROCK22 2024-06-01 20:14:40 +00:00
parent bf3db57c71
commit 0da67319f3
4 changed files with 1315 additions and 1359 deletions

1150
src/cache/index.ts vendored

File diff suppressed because it is too large Load Diff

View File

@ -1,237 +1,217 @@
import { GatewayIntentBits, type GatewayDispatchPayload, type GatewayPresenceUpdateData } from 'discord-api-types/v10'; import { GatewayIntentBits, type GatewayDispatchPayload, type GatewayPresenceUpdateData } from 'discord-api-types/v10';
import type { Command, CommandContext, Message, SubCommand } from '..'; import type { Command, CommandContext, Message, SubCommand } from '..';
import { lazyLoadPackage, type DeepPartial, type If, type WatcherPayload, type WatcherSendToShard } from '../common'; import { lazyLoadPackage, type DeepPartial, type If, type WatcherPayload, type WatcherSendToShard } from '../common';
import { EventHandler } from '../events'; import { EventHandler } from '../events';
import { ClientUser } from '../structures'; import { ClientUser } from '../structures';
import { ShardManager, properties, type ShardManagerOptions } from '../websocket'; import { ShardManager, properties, type ShardManagerOptions } from '../websocket';
import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate'; import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate';
import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate'; import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate';
import type { BaseClientOptions, InternalRuntimeConfig, ServicesOptions, StartOptions } from './base'; import type { BaseClientOptions, InternalRuntimeConfig, ServicesOptions, StartOptions } from './base';
import { BaseClient } from './base'; import { BaseClient } from './base';
import { onInteractionCreate } from './oninteractioncreate'; import { onInteractionCreate } from './oninteractioncreate';
import { onMessageCreate } from './onmessagecreate'; import { onMessageCreate } from './onmessagecreate';
import { Collectors } from './collectors'; import { Collectors } from './collectors';
let parentPort: import('node:worker_threads').MessagePort; let parentPort: import('node:worker_threads').MessagePort;
export class Client<Ready extends boolean = boolean> extends BaseClient { export class Client<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: Set<string> = new Set(); private __handleGuilds?: Set<string> = new Set();
gateway!: ShardManager; gateway!: ShardManager;
me!: If<Ready, ClientUser>; me!: If<Ready, ClientUser>;
declare options: ClientOptions; declare options: ClientOptions;
memberUpdateHandler = new MemberUpdateHandler(); memberUpdateHandler = new MemberUpdateHandler();
presenceUpdateHandler = new PresenceUpdateHandler(); presenceUpdateHandler = new PresenceUpdateHandler();
collectors = new Collectors(); collectors = new Collectors();
events? = new EventHandler(this.logger, this.collectors); events? = new EventHandler(this);
constructor(options?: ClientOptions) { constructor(options?: ClientOptions) {
super(options); super(options);
} }
setServices({ setServices({
gateway, gateway,
...rest ...rest
}: ServicesOptions & { }: ServicesOptions & {
gateway?: ShardManager; gateway?: ShardManager;
handlers?: ServicesOptions['handlers'] & { handlers?: ServicesOptions['handlers'] & {
events?: EventHandler['callback']; events?: EventHandler['callback'];
}; };
}) { }) {
super.setServices(rest); super.setServices(rest);
if (gateway) { if (gateway) {
const onPacket = this.onPacket.bind(this); const onPacket = this.onPacket.bind(this);
const oldFn = gateway.options.handlePayload; const oldFn = gateway.options.handlePayload;
gateway.options.handlePayload = async (shardId, packet) => { gateway.options.handlePayload = async (shardId, packet) => {
await onPacket(shardId, packet); await onPacket(shardId, packet);
return oldFn(shardId, packet); return oldFn(shardId, packet);
}; };
this.gateway = gateway; this.gateway = gateway;
} }
if (rest.handlers && 'events' in rest.handlers) { if (rest.handlers && 'events' in rest.handlers) {
if (!rest.handlers.events) { if (!rest.handlers.events) {
this.events = undefined; this.events = undefined;
} else if (typeof rest.handlers.events === 'function') { } else if (typeof rest.handlers.events === 'function') {
this.events = new EventHandler(this.logger, this.collectors); this.events = new EventHandler(this);
this.events.setHandlers({ this.events.setHandlers({
callback: rest.handlers.events, callback: rest.handlers.events,
}); });
} else { } else {
this.events = rest.handlers.events; this.events = rest.handlers.events;
} }
} }
} }
async loadEvents(dir?: string) { async loadEvents(dir?: string) {
dir ??= await this.getRC().then(x => x.events); dir ??= await this.getRC().then(x => x.events);
if (dir && this.events) { if (dir && this.events) {
await this.events.load(dir); await this.events.load(dir);
this.logger.info('EventHandler loaded'); this.logger.info('EventHandler loaded');
} }
} }
protected async execute(options: { token?: string; intents?: number } = {}) { protected async execute(options: { token?: string; intents?: number } = {}) {
await super.execute(options); await super.execute(options);
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads'); const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads?.parentPort) { if (worker_threads?.parentPort) {
parentPort = worker_threads.parentPort; parentPort = worker_threads.parentPort;
} }
if (worker_threads?.workerData?.__USING_WATCHER__) { if (worker_threads?.workerData?.__USING_WATCHER__) {
parentPort?.on('message', (data: WatcherPayload | WatcherSendToShard) => { parentPort?.on('message', (data: WatcherPayload | WatcherSendToShard) => {
switch (data.type) { switch (data.type) {
case 'PAYLOAD': case 'PAYLOAD':
this.gateway.options.handlePayload(data.shardId, data.payload); this.gateway.options.handlePayload(data.shardId, data.payload);
break; break;
case 'SEND_TO_SHARD': case 'SEND_TO_SHARD':
this.gateway.send(data.shardId, data.payload); this.gateway.send(data.shardId, data.payload);
break; break;
} }
}); });
} else { } else {
await this.gateway.spawnShards(); await this.gateway.spawnShards();
} }
} }
async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection'> = {}, execute = true) { async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection'> = {}, execute = true) {
await super.start(options); await super.start(options);
await this.loadEvents(options.eventsDir); await this.loadEvents(options.eventsDir);
const { token: tokenRC, intents: intentsRC, debug: debugRC } = await this.getRC<InternalRuntimeConfig>(); const { token: tokenRC, intents: intentsRC, debug: debugRC } = await this.getRC<InternalRuntimeConfig>();
const token = options?.token ?? tokenRC; const token = options?.token ?? tokenRC;
const intents = options?.connection?.intents ?? intentsRC; const intents = options?.connection?.intents ?? intentsRC;
if (!this.gateway) { if (!this.gateway) {
BaseClient.assertString(token, 'token is not a string'); BaseClient.assertString(token, 'token is not a string');
this.gateway = new ShardManager({ this.gateway = new ShardManager({
token, token,
info: await this.proxy.gateway.bot.get(), info: await this.proxy.gateway.bot.get(),
intents, intents,
handlePayload: async (shardId, packet) => { handlePayload: async (shardId, packet) => {
await this.options?.handlePayload?.(shardId, packet); await this.options?.handlePayload?.(shardId, packet);
return this.onPacket(shardId, packet); return this.onPacket(shardId, packet);
}, },
presence: this.options?.presence, presence: this.options?.presence,
debug: debugRC, debug: debugRC,
shardStart: this.options?.shards?.start, shardStart: this.options?.shards?.start,
shardEnd: this.options?.shards?.end ?? this.options?.shards?.total, shardEnd: this.options?.shards?.end ?? this.options?.shards?.total,
totalShards: this.options?.shards?.total ?? this.options?.shards?.end, totalShards: this.options?.shards?.total ?? this.options?.shards?.end,
properties: { ...this.options?.gateway?.properties, ...properties }, properties: { ...this.options?.gateway?.properties, ...properties },
compress: this.options?.gateway?.compress, compress: this.options?.gateway?.compress,
}); });
} }
this.cache.intents = this.gateway.options.intents; this.cache.intents = this.gateway.options.intents;
if (execute) { if (execute) {
await this.execute(options.connection); await this.execute(options.connection);
} else { } else {
await super.execute(options); await super.execute(options);
} }
} }
protected async onPacket(shardId: number, packet: GatewayDispatchPayload) { protected async onPacket(shardId: number, packet: GatewayDispatchPayload) {
await this.events?.runEvent('RAW', this, packet, shardId); // await this.events?.runEvent('RAW', this, packet, shardId);
switch (packet.t) { switch (packet.t) {
//// Cases where we must obtain the old data before updating //// Cases where we must obtain the old data before updating
case 'GUILD_MEMBER_UPDATE': case 'GUILD_MEMBER_UPDATE':
if (!this.memberUpdateHandler.check(packet.d)) { if (!this.memberUpdateHandler.check(packet.d)) {
return; return;
} }
await this.events?.execute(packet.t, packet, this as Client<true>, shardId); await this.events?.execute(packet.t, packet, this as Client<true>, shardId);
await this.cache.onPacket(packet); break;
break; case 'PRESENCE_UPDATE':
case 'PRESENCE_UPDATE': if (!this.presenceUpdateHandler.check(packet.d as any)) {
if (!this.presenceUpdateHandler.check(packet.d as any)) { return;
return; }
} await this.events?.execute(packet.t, packet, this as Client<true>, shardId);
await this.events?.execute(packet.t, packet, this as Client<true>, shardId); break;
await this.cache.onPacket(packet);
break; //rest of the events
default: {
case 'MESSAGE_UPDATE': await this.events?.execute(packet.t, packet, this as Client<true>, shardId);
case 'MESSAGE_DELETE_BULK': switch (packet.t) {
case 'MESSAGE_DELETE': case 'INTERACTION_CREATE':
case 'GUILD_DELETE': await onInteractionCreate(this, packet.d, shardId);
case 'CHANNEL_UPDATE': break;
case 'GUILD_EMOJIS_UPDATE': case 'MESSAGE_CREATE':
case 'GUILD_UPDATE': await onMessageCreate(this, packet.d, shardId);
case 'GUILD_ROLE_UPDATE': break;
case 'GUILD_ROLE_DELETE': case 'READY':
case 'THREAD_UPDATE': for (const g of packet.d.guilds) {
case 'USER_UPDATE': this.__handleGuilds?.add(g.id);
case 'VOICE_STATE_UPDATE': }
case 'STAGE_INSTANCE_UPDATE': this.botId = packet.d.user.id;
case 'GUILD_STICKERS_UPDATE': this.applicationId = packet.d.application.id;
await this.events?.execute(packet.t, packet, this as Client<true>, shardId); this.me = new ClientUser(this, packet.d.user, packet.d.application) as never;
await this.cache.onPacket(packet); if (
break; !(
//rest of the events this.__handleGuilds?.size &&
default: { (this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds
await this.cache.onPacket(packet); )
await this.events?.execute(packet.t, packet, this as Client<true>, shardId); ) {
switch (packet.t) { if ([...this.gateway.values()].every(shard => shard.data.session_id)) {
case 'INTERACTION_CREATE': await this.events?.runEvent('BOT_READY', this, this.me, -1);
await onInteractionCreate(this, packet.d, shardId); }
break; delete this.__handleGuilds;
case 'MESSAGE_CREATE': }
await onMessageCreate(this, packet.d, shardId); this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
break; break;
case 'READY': case 'GUILD_CREATE': {
for (const g of packet.d.guilds) { if (this.__handleGuilds?.has(packet.d.id)) {
this.__handleGuilds?.add(g.id); this.__handleGuilds.delete(packet.d.id);
} if (!this.__handleGuilds.size && [...this.gateway.values()].every(shard => shard.data.session_id)) {
this.botId = packet.d.user.id; await this.events?.runEvent('BOT_READY', this, this.me, -1);
this.applicationId = packet.d.application.id; }
this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; if (!this.__handleGuilds.size) delete this.__handleGuilds;
if ( return;
!( }
this.__handleGuilds?.size && break;
(this.gateway.options.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds }
) }
) { break;
if ([...this.gateway.values()].every(shard => shard.data.session_id)) { }
await this.events?.runEvent('BOT_READY', this, this.me, -1); }
} }
delete this.__handleGuilds; }
}
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`); export interface ClientOptions extends BaseClientOptions {
break; presence?: (shardId: number) => GatewayPresenceUpdateData;
case 'GUILD_CREATE': { shards?: {
if (this.__handleGuilds?.has(packet.d.id)) { start: number;
this.__handleGuilds.delete(packet.d.id); end: number;
if (!this.__handleGuilds.size && [...this.gateway.values()].every(shard => shard.data.session_id)) { total?: number;
await this.events?.runEvent('BOT_READY', this, this.me, -1); };
} gateway?: {
if (!this.__handleGuilds.size) delete this.__handleGuilds; properties?: Partial<ShardManagerOptions['properties']>;
return; compress?: ShardManagerOptions['compress'];
} };
break; commands?: BaseClientOptions['commands'] & {
} prefix?: (message: Message) => Promise<string[]> | string[];
} deferReplyResponse?: (ctx: CommandContext) => Parameters<Message['write']>[0];
break; reply?: (ctx: CommandContext) => boolean;
} argsParser?: (content: string, command: SubCommand | Command, message: Message) => Record<string, string>;
} };
} handlePayload?: ShardManagerOptions['handlePayload'];
} }
export interface ClientOptions extends BaseClientOptions {
presence?: (shardId: number) => GatewayPresenceUpdateData;
shards?: {
start: number;
end: number;
total?: number;
};
gateway?: {
properties?: Partial<ShardManagerOptions['properties']>;
compress?: ShardManagerOptions['compress'];
};
commands?: BaseClientOptions['commands'] & {
prefix?: (message: Message) => Promise<string[]> | string[];
deferReplyResponse?: (ctx: CommandContext) => Parameters<Message['write']>[0];
reply?: (ctx: CommandContext) => boolean;
argsParser?: (content: string, command: SubCommand | Command, message: Message) => Record<string, string>;
};
handlePayload?: ShardManagerOptions['handlePayload'];
}

View File

@ -1,418 +1,387 @@
import { GatewayIntentBits, type GatewayDispatchPayload, type GatewaySendPayload } from 'discord-api-types/v10'; import { GatewayIntentBits, type GatewayDispatchPayload, type GatewaySendPayload } from 'discord-api-types/v10';
import { randomUUID } from 'node:crypto'; import { randomUUID } from 'node:crypto';
import { ApiHandler, Logger } from '..'; import { ApiHandler, Logger } from '..';
import type { Cache } from '../cache'; import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache'; import { WorkerAdapter } from '../cache';
import { LogLevels, lazyLoadPackage, type DeepPartial, type When } from '../common'; import { LogLevels, lazyLoadPackage, type DeepPartial, type When } from '../common';
import { EventHandler } from '../events'; import { EventHandler } from '../events';
import { ClientUser } from '../structures'; import { ClientUser } from '../structures';
import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket'; import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket';
import type { import type {
WorkerReady, WorkerReady,
WorkerReceivePayload, WorkerReceivePayload,
WorkerRequestConnect, WorkerRequestConnect,
WorkerSendEval, WorkerSendEval,
WorkerSendEvalResponse, WorkerSendEvalResponse,
WorkerSendInfo, WorkerSendInfo,
WorkerSendResultPayload, WorkerSendResultPayload,
WorkerSendShardInfo, WorkerSendShardInfo,
WorkerShardInfo, WorkerShardInfo,
WorkerStart, WorkerStart,
} from '../websocket/discord/worker'; } from '../websocket/discord/worker';
import type { ManagerMessages } from '../websocket/discord/workermanager'; import type { ManagerMessages } from '../websocket/discord/workermanager';
import type { BaseClientOptions, ServicesOptions, StartOptions } from './base'; import type { BaseClientOptions, ServicesOptions, StartOptions } from './base';
import { BaseClient } from './base'; import { BaseClient } from './base';
import type { Client } from './client'; import type { Client } from './client';
import { onInteractionCreate } from './oninteractioncreate'; import { onInteractionCreate } from './oninteractioncreate';
import { onMessageCreate } from './onmessagecreate'; import { onMessageCreate } from './onmessagecreate';
import { Collectors } from './collectors'; import { Collectors } from './collectors';
let workerData: WorkerData; let workerData: WorkerData;
let manager: import('node:worker_threads').MessagePort; let manager: import('node:worker_threads').MessagePort;
try { try {
workerData = { workerData = {
debug: process.env.SEYFERT_WORKER_DEBUG === 'true', debug: process.env.SEYFERT_WORKER_DEBUG === 'true',
intents: Number.parseInt(process.env.SEYFERT_WORKER_INTENTS!), intents: Number.parseInt(process.env.SEYFERT_WORKER_INTENTS!),
path: process.env.SEYFERT_WORKER_PATH!, path: process.env.SEYFERT_WORKER_PATH!,
shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number.parseInt(id)), shards: process.env.SEYFERT_WORKER_SHARDS!.split(',').map(id => Number.parseInt(id)),
token: process.env.SEYFERT_WORKER_TOKEN!, token: process.env.SEYFERT_WORKER_TOKEN!,
workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!), workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!),
workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true', workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true',
} as WorkerData; } as WorkerData;
} catch {} } catch {}
export class WorkerClient<Ready extends boolean = boolean> extends BaseClient { export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: Set<string> = new Set(); private __handleGuilds?: Set<string> = new Set();
logger = new Logger({ logger = new Logger({
name: `[Worker #${workerData.workerId}]`, name: `[Worker #${workerData.workerId}]`,
}); });
collectors = new Collectors(); collectors = new Collectors();
events? = new EventHandler(this.logger, this.collectors); events? = new EventHandler(this);
me!: When<Ready, ClientUser>; me!: When<Ready, ClientUser>;
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>(); promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
shards = new Map<number, Shard>(); shards = new Map<number, Shard>();
declare options: WorkerClientOptions; declare options: WorkerClientOptions;
constructor(options?: WorkerClientOptions) { constructor(options?: WorkerClientOptions) {
super(options); super(options);
if (!process.env.SEYFERT_SPAWNING) { if (!process.env.SEYFERT_SPAWNING) {
throw new Error('WorkerClient cannot spawn without manager'); throw new Error('WorkerClient cannot spawn without manager');
} }
this.postMessage({ this.postMessage({
type: 'WORKER_START', type: 'WORKER_START',
workerId: workerData.workerId, workerId: workerData.workerId,
} satisfies WorkerStart); } satisfies WorkerStart);
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads'); const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads?.parentPort) { if (worker_threads?.parentPort) {
manager = worker_threads?.parentPort; manager = worker_threads?.parentPort;
} }
(manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data)); (manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data));
this.setServices({ this.setServices({
cache: { cache: {
adapter: new WorkerAdapter(workerData), adapter: new WorkerAdapter(workerData),
disabledCache: options?.disabledCache, disabledCache: options?.disabledCache,
}, },
}); });
if (workerData.debug) { if (workerData.debug) {
this.debugger = new Logger({ this.debugger = new Logger({
name: `[Worker #${workerData.workerId}]`, name: `[Worker #${workerData.workerId}]`,
logLevel: LogLevels.Debug, logLevel: LogLevels.Debug,
}); });
} }
if (workerData.workerProxy) { if (workerData.workerProxy) {
this.setServices({ this.setServices({
rest: new ApiHandler({ rest: new ApiHandler({
token: workerData.token, token: workerData.token,
workerProxy: true, workerProxy: true,
debug: workerData.debug, debug: workerData.debug,
}), }),
}); });
} }
} }
get workerId() { get workerId() {
return workerData.workerId; return workerData.workerId;
} }
get latency() { get latency() {
let acc = 0; let acc = 0;
this.shards.forEach(s => (acc += s.latency)); this.shards.forEach(s => (acc += s.latency));
return acc / this.shards.size; return acc / this.shards.size;
} }
setServices({ setServices({
...rest ...rest
}: ServicesOptions & { }: ServicesOptions & {
handlers?: ServicesOptions['handlers'] & { handlers?: ServicesOptions['handlers'] & {
events?: EventHandler['callback']; events?: EventHandler['callback'];
}; };
}) { }) {
super.setServices(rest); super.setServices(rest);
if (rest.handlers && 'events' in rest.handlers) { if (rest.handlers && 'events' in rest.handlers) {
if (!rest.handlers.events) { if (!rest.handlers.events) {
this.events = undefined; this.events = undefined;
} else if (typeof rest.handlers.events === 'function') { } else if (typeof rest.handlers.events === 'function') {
this.events = new EventHandler(this.logger, this.collectors); this.events = new EventHandler(this);
this.events.setHandlers({ this.events.setHandlers({
callback: rest.handlers.events, callback: rest.handlers.events,
}); });
} else { } else {
this.events = rest.handlers.events; this.events = rest.handlers.events;
} }
} }
} }
async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) { async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) {
await super.start(options); await super.start(options);
await this.loadEvents(options.eventsDir); await this.loadEvents(options.eventsDir);
this.cache.intents = workerData.intents; this.cache.intents = workerData.intents;
} }
async loadEvents(dir?: string) { async loadEvents(dir?: string) {
dir ??= await this.getRC().then(x => x.events); dir ??= await this.getRC().then(x => x.events);
if (dir && this.events) { if (dir && this.events) {
await this.events.load(dir); await this.events.load(dir);
this.logger.info('EventHandler loaded'); this.logger.info('EventHandler loaded');
} }
} }
postMessage(body: any) { postMessage(body: any) {
if (manager) return manager.postMessage(body); if (manager) return manager.postMessage(body);
return process.send!(body); return process.send!(body);
} }
protected async handleManagerMessages(data: ManagerMessages) { protected async handleManagerMessages(data: ManagerMessages) {
switch (data.type) { switch (data.type) {
case 'CACHE_RESULT': case 'CACHE_RESULT':
if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) { if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) {
const cacheData = this.cache.adapter.promises.get(data.nonce)!; const cacheData = this.cache.adapter.promises.get(data.nonce)!;
clearTimeout(cacheData.timeout); clearTimeout(cacheData.timeout);
cacheData.resolve(data.result); cacheData.resolve(data.result);
this.cache.adapter.promises.delete(data.nonce); this.cache.adapter.promises.delete(data.nonce);
} }
break; break;
case 'SEND_PAYLOAD': case 'SEND_PAYLOAD':
{ {
const shard = this.shards.get(data.shardId); const shard = this.shards.get(data.shardId);
if (!shard) { if (!shard) {
this.logger.fatal('Worker trying send payload by non-existent shard'); this.logger.fatal('Worker trying send payload by non-existent shard');
return; return;
} }
await shard.send(true, { await shard.send(true, {
...data, ...data,
} satisfies GatewaySendPayload); } satisfies GatewaySendPayload);
this.postMessage({ this.postMessage({
type: 'RESULT_PAYLOAD', type: 'RESULT_PAYLOAD',
nonce: data.nonce, nonce: data.nonce,
workerId: this.workerId, workerId: this.workerId,
} satisfies WorkerSendResultPayload); } satisfies WorkerSendResultPayload);
} }
break; break;
case 'ALLOW_CONNECT': case 'ALLOW_CONNECT':
{ {
const shard = this.shards.get(data.shardId); const shard = this.shards.get(data.shardId);
if (!shard) { if (!shard) {
this.logger.fatal('Worker trying connect non-existent shard'); this.logger.fatal('Worker trying connect non-existent shard');
return; return;
} }
shard.options.presence = data.presence; shard.options.presence = data.presence;
await shard.connect(); await shard.connect();
} }
break; break;
case 'SPAWN_SHARDS': case 'SPAWN_SHARDS':
{ {
const onPacket = this.onPacket.bind(this); const onPacket = this.onPacket.bind(this);
const handlePayload = this.options?.handlePayload?.bind(this); const handlePayload = this.options?.handlePayload?.bind(this);
const self = this; const self = this;
for (const id of workerData.shards) { for (const id of workerData.shards) {
let shard = this.shards.get(id); let shard = this.shards.get(id);
if (!shard) { if (!shard) {
shard = new Shard(id, { shard = new Shard(id, {
token: workerData.token, token: workerData.token,
intents: workerData.intents, intents: workerData.intents,
info: data.info, info: data.info,
compress: data.compress, compress: data.compress,
debugger: this.debugger, debugger: this.debugger,
async handlePayload(shardId, payload) { async handlePayload(shardId, payload) {
await handlePayload?.(shardId, payload); await handlePayload?.(shardId, payload);
await self.cache.onPacket(payload); await onPacket?.(payload, shardId);
await onPacket?.(payload, shardId); self.postMessage({
self.postMessage({ workerId: workerData.workerId,
workerId: workerData.workerId, shardId,
shardId, type: 'RECEIVE_PAYLOAD',
type: 'RECEIVE_PAYLOAD', payload,
payload, } satisfies WorkerReceivePayload);
} satisfies WorkerReceivePayload); },
}, });
}); this.shards.set(id, shard);
this.shards.set(id, shard); }
}
this.postMessage({
this.postMessage({ type: 'CONNECT_QUEUE',
type: 'CONNECT_QUEUE', shardId: id,
shardId: id, workerId: workerData.workerId,
workerId: workerData.workerId, } satisfies WorkerRequestConnect);
} satisfies WorkerRequestConnect); }
} }
} break;
break; case 'SHARD_INFO':
case 'SHARD_INFO': {
{ const shard = this.shards.get(data.shardId);
const shard = this.shards.get(data.shardId); if (!shard) {
if (!shard) { this.logger.fatal('Worker trying get non-existent shard');
this.logger.fatal('Worker trying get non-existent shard'); return;
return; }
}
this.postMessage({
this.postMessage({ ...generateShardInfo(shard),
...generateShardInfo(shard), nonce: data.nonce,
nonce: data.nonce, type: 'SHARD_INFO',
type: 'SHARD_INFO', workerId: this.workerId,
workerId: this.workerId, } satisfies WorkerSendShardInfo);
} satisfies WorkerSendShardInfo); }
} break;
break; case 'WORKER_INFO':
case 'WORKER_INFO': {
{ this.postMessage({
this.postMessage({ shards: [...this.shards.values()].map(generateShardInfo),
shards: [...this.shards.values()].map(generateShardInfo), workerId: workerData.workerId,
workerId: workerData.workerId, type: 'WORKER_INFO',
type: 'WORKER_INFO', nonce: data.nonce,
nonce: data.nonce, } satisfies WorkerSendInfo);
} satisfies WorkerSendInfo); }
} break;
break; case 'BOT_READY':
case 'BOT_READY': await this.events?.runEvent('BOT_READY', this, this.me, -1);
await this.events?.runEvent('BOT_READY', this, this.me, -1); break;
break; case 'API_RESPONSE':
case 'API_RESPONSE': {
{ const promise = this.rest.workerPromises!.get(data.nonce);
const promise = this.rest.workerPromises!.get(data.nonce); if (!promise) return;
if (!promise) return; this.rest.workerPromises!.delete(data.nonce);
this.rest.workerPromises!.delete(data.nonce); if (data.error) return promise.reject(data.error);
if (data.error) return promise.reject(data.error); promise.resolve(data.response);
promise.resolve(data.response); }
} break;
break; case 'EXECUTE_EVAL':
case 'EXECUTE_EVAL': {
{ let result;
let result; try {
try { // biome-ignore lint/security/noGlobalEval: yes
// biome-ignore lint/security/noGlobalEval: yes result = await eval(`
result = await eval(` (${data.func})(this)
(${data.func})(this) `);
`); } catch (e) {
} catch (e) { result = e;
result = e; }
} this.postMessage({
this.postMessage({ type: 'EVAL_RESPONSE',
type: 'EVAL_RESPONSE', response: result,
response: result, workerId: workerData.workerId,
workerId: workerData.workerId, nonce: data.nonce,
nonce: data.nonce, } satisfies WorkerSendEvalResponse);
} satisfies WorkerSendEvalResponse); }
} break;
break; case 'EVAL_RESPONSE':
case 'EVAL_RESPONSE': {
{ const evalResponse = this.promises.get(data.nonce);
const evalResponse = this.promises.get(data.nonce); if (!evalResponse) return;
if (!evalResponse) return; this.promises.delete(data.nonce);
this.promises.delete(data.nonce); clearTimeout(evalResponse.timeout);
clearTimeout(evalResponse.timeout); evalResponse.resolve(data.response);
evalResponse.resolve(data.response); }
} break;
break; }
} }
}
private generateNonce(large = true): string {
private generateNonce(large = true): string { const uuid = randomUUID();
const uuid = randomUUID(); const nonce = large ? uuid : uuid.split('-')[0];
const nonce = large ? uuid : uuid.split('-')[0]; if (this.promises.has(nonce)) return this.generateNonce(large);
if (this.promises.has(nonce)) return this.generateNonce(large); return nonce;
return nonce; }
}
private generateSendPromise<T = unknown>(nonce: string, message = 'Timeout'): Promise<T> {
private generateSendPromise<T = unknown>(nonce: string, message = 'Timeout'): Promise<T> { return new Promise<T>((res, rej) => {
return new Promise<T>((res, rej) => { const timeout = setTimeout(() => {
const timeout = setTimeout(() => { this.promises.delete(nonce);
this.promises.delete(nonce); rej(new Error(message));
rej(new Error(message)); }, 60e3);
}, 60e3); this.promises.set(nonce, { resolve: res, timeout });
this.promises.set(nonce, { resolve: res, timeout }); });
}); }
}
tellWorker(workerId: number, func: (_: this) => {}) {
tellWorker(workerId: number, func: (_: this) => {}) { const nonce = this.generateNonce();
const nonce = this.generateNonce(); this.postMessage({
this.postMessage({ type: 'EVAL',
type: 'EVAL', func: func.toString(),
func: func.toString(), toWorkerId: workerId,
toWorkerId: workerId, workerId: workerData.workerId,
workerId: workerData.workerId, nonce,
nonce, } satisfies WorkerSendEval);
} satisfies WorkerSendEval); return this.generateSendPromise(nonce);
return this.generateSendPromise(nonce); }
}
protected async onPacket(packet: GatewayDispatchPayload, shardId: number) {
protected async onPacket(packet: GatewayDispatchPayload, shardId: number) { await this.events?.execute('RAW', packet, this as WorkerClient<true>, shardId);
await this.events?.execute('RAW', packet, this as WorkerClient<true>, shardId); await this.events?.execute(packet.t, packet, this, shardId);
switch (packet.t) { switch (packet.t) {
case 'GUILD_MEMBER_UPDATE': case 'READY':
case 'PRESENCE_UPDATE': for (const g of packet.d.guilds) {
this.__handleGuilds?.add(g.id);
case 'MESSAGE_UPDATE': }
case 'MESSAGE_DELETE_BULK': this.botId = packet.d.user.id;
case 'MESSAGE_DELETE': this.applicationId = packet.d.application.id;
case 'GUILD_DELETE': this.me = new ClientUser(this, packet.d.user, packet.d.application) as never;
case 'CHANNEL_UPDATE': if (
case 'GUILD_EMOJIS_UPDATE': !(this.__handleGuilds?.size && (workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds)
case 'GUILD_UPDATE': ) {
case 'GUILD_ROLE_UPDATE': if ([...this.shards.values()].every(shard => shard.data.session_id)) {
case 'GUILD_ROLE_DELETE': this.postMessage({
case 'THREAD_UPDATE': type: 'WORKER_READY',
case 'USER_UPDATE': workerId: this.workerId,
case 'VOICE_STATE_UPDATE': } as WorkerReady);
case 'STAGE_INSTANCE_UPDATE': await this.events?.runEvent('WORKER_READY', this, this.me, -1);
case 'GUILD_STICKERS_UPDATE': }
await this.events?.execute(packet.t, packet, this as WorkerClient<true>, shardId); delete this.__handleGuilds;
await this.cache.onPacket(packet); }
break; this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`);
//rest of the events break;
default: case 'INTERACTION_CREATE':
{ await onInteractionCreate(this, packet.d, shardId);
await this.events?.execute(packet.t, packet, this, shardId); break;
switch (packet.t) { case 'MESSAGE_CREATE':
case 'READY': await onMessageCreate(this, packet.d, shardId);
for (const g of packet.d.guilds) { break;
this.__handleGuilds?.add(g.id); case 'GUILD_CREATE': {
} if (this.__handleGuilds?.has(packet.d.id)) {
this.botId = packet.d.user.id; this.__handleGuilds.delete(packet.d.id);
this.applicationId = packet.d.application.id; if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) {
this.me = new ClientUser(this, packet.d.user, packet.d.application) as never; this.postMessage({
if ( type: 'WORKER_READY',
!( workerId: this.workerId,
this.__handleGuilds?.size && } as WorkerReady);
(workerData.intents & GatewayIntentBits.Guilds) === GatewayIntentBits.Guilds await this.events?.runEvent('WORKER_READY', this, this.me, -1);
) }
) { if (!this.__handleGuilds.size) delete this.__handleGuilds;
if ([...this.shards.values()].every(shard => shard.data.session_id)) { return;
this.postMessage({ }
type: 'WORKER_READY', }
workerId: this.workerId, }
} as WorkerReady); }
await this.events?.runEvent('WORKER_READY', this, this.me, -1); }
}
delete this.__handleGuilds; export function generateShardInfo(shard: Shard): WorkerShardInfo {
} return {
this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`); open: shard.isOpen,
break; shardId: shard.id,
case 'INTERACTION_CREATE': latency: shard.latency,
await onInteractionCreate(this, packet.d, shardId); resumable: shard.resumable,
break; };
case 'MESSAGE_CREATE': }
await onMessageCreate(this, packet.d, shardId);
break; interface WorkerClientOptions extends BaseClientOptions {
case 'GUILD_CREATE': { disabledCache: Cache['disabledCache'];
if (this.__handleGuilds?.has(packet.d.id)) { commands?: NonNullable<Client['options']>['commands'];
this.__handleGuilds.delete(packet.d.id); handlePayload?: ShardManagerOptions['handlePayload'];
if (!this.__handleGuilds.size && [...this.shards.values()].every(shard => shard.data.session_id)) { }
this.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerReady);
await this.events?.runEvent('WORKER_READY', this, this.me, -1);
}
if (!this.__handleGuilds.size) delete this.__handleGuilds;
return;
}
}
}
}
break;
}
}
}
export function generateShardInfo(shard: Shard): WorkerShardInfo {
return {
open: shard.isOpen,
shardId: shard.id,
latency: shard.latency,
resumable: shard.resumable,
};
}
interface WorkerClientOptions extends BaseClientOptions {
disabledCache: Cache['disabledCache'];
commands?: NonNullable<Client['options']>['commands'];
handlePayload?: ShardManagerOptions['handlePayload'];
}

View File

@ -1,129 +1,136 @@
import type { import type {
GatewayDispatchPayload, GatewayDispatchPayload,
GatewayMessageCreateDispatch, GatewayMessageCreateDispatch,
GatewayMessageDeleteBulkDispatch, GatewayMessageDeleteBulkDispatch,
GatewayMessageDeleteDispatch, GatewayMessageDeleteDispatch,
} from 'discord-api-types/v10'; } from 'discord-api-types/v10';
import type { Client, WorkerClient } from '../client'; import type { Client, WorkerClient } from '../client';
import { BaseHandler, type Logger, ReplaceRegex, magicImport, type MakeRequired, type SnakeCase } from '../common'; import { BaseHandler, ReplaceRegex, magicImport, type MakeRequired, type SnakeCase } from '../common';
import type { ClientEvents } from '../events/hooks'; import type { ClientEvents } from '../events/hooks';
import * as RawEvents from '../events/hooks'; import * as RawEvents from '../events/hooks';
import type { ClientEvent, ClientNameEvents } from './event'; import type { ClientEvent, ClientNameEvents } from './event';
import type { Collectors } from '../client/collectors';
export type EventValue = MakeRequired<ClientEvent, '__filePath'> & { fired?: boolean };
export type EventValue = MakeRequired<ClientEvent, '__filePath'> & { fired?: boolean };
export type GatewayEvents = Uppercase<SnakeCase<keyof ClientEvents>>;
export type GatewayEvents = Uppercase<SnakeCase<keyof ClientEvents>>;
export class EventHandler extends BaseHandler {
export class EventHandler extends BaseHandler { constructor(protected client: Client | WorkerClient) {
constructor( super(client.logger);
logger: Logger, }
protected collectors: Collectors,
) { onFail = (event: GatewayEvents, err: unknown) => this.logger.warn('<Client>.events.onFail', err, event);
super(logger); protected filter = (path: string) => path.endsWith('.js') || (!path.endsWith('.d.ts') && path.endsWith('.ts'));
}
values: Partial<Record<GatewayEvents, EventValue>> = {};
onFail = (event: GatewayEvents, err: unknown) => this.logger.warn('<Client>.events.onFail', err, event);
protected filter = (path: string) => path.endsWith('.js') || (!path.endsWith('.d.ts') && path.endsWith('.ts')); async load(eventsDir: string, instances?: { file: ClientEvent; path: string }[]) {
for (const i of instances ?? (await this.loadFilesK<ClientEvent>(await this.getFiles(eventsDir)))) {
values: Partial<Record<GatewayEvents, EventValue>> = {}; const instance = this.callback(i.file);
if (!instance) continue;
async load(eventsDir: string, instances?: { file: ClientEvent; path: string }[]) { if (typeof instance?.run !== 'function') {
for (const i of instances ?? (await this.loadFilesK<ClientEvent>(await this.getFiles(eventsDir)))) { this.logger.warn(
const instance = this.callback(i.file); i.path.split(process.cwd()).slice(1).join(process.cwd()),
if (!instance) continue; 'Missing run function, use `export default {...}` syntax',
if (typeof instance?.run !== 'function') { );
this.logger.warn( continue;
i.path.split(process.cwd()).slice(1).join(process.cwd()), }
'Missing run function, use `export default {...}` syntax', instance.__filePath = i.path;
); this.values[ReplaceRegex.snake(instance.data.name).toUpperCase() as GatewayEvents] = instance as EventValue;
continue; }
} }
instance.__filePath = i.path;
this.values[ReplaceRegex.snake(instance.data.name).toUpperCase() as GatewayEvents] = instance as EventValue; async execute(name: GatewayEvents, ...args: [GatewayDispatchPayload, Client<true> | WorkerClient<true>, number]) {
} switch (name) {
} case 'MESSAGE_CREATE':
{
async execute(name: GatewayEvents, ...args: [GatewayDispatchPayload, Client<true> | WorkerClient<true>, number]) { const { d: data } = args[0] as GatewayMessageCreateDispatch;
switch (name) { if (args[1].components?.values.has(data.interaction_metadata?.id ?? data.id)) {
case 'MESSAGE_CREATE': args[1].components.values.get(data.interaction_metadata?.id ?? data.id)!.messageId = data.id;
{ }
const { d: data } = args[0] as GatewayMessageCreateDispatch; }
if (args[1].components?.values.has(data.interaction_metadata?.id ?? data.id)) { break;
args[1].components.values.get(data.interaction_metadata?.id ?? data.id)!.messageId = data.id; case 'MESSAGE_DELETE':
} {
} const { d: data } = args[0] as GatewayMessageDeleteDispatch;
break; const value = [...(args[1].components?.values ?? [])].find(x => x[1].messageId === data.id);
case 'MESSAGE_DELETE': if (value) {
{ args[1].components!.onMessageDelete(value[0]);
const { d: data } = args[0] as GatewayMessageDeleteDispatch; }
const value = [...(args[1].components?.values ?? [])].find(x => x[1].messageId === data.id); }
if (value) { break;
args[1].components!.onMessageDelete(value[0]); case 'MESSAGE_DELETE_BULK':
} {
} const { d: data } = args[0] as GatewayMessageDeleteBulkDispatch;
break; const values = [...(args[1].components?.values ?? [])];
case 'MESSAGE_DELETE_BULK': data.ids.forEach(id => {
{ const value = values.find(x => x[1].messageId === id);
const { d: data } = args[0] as GatewayMessageDeleteBulkDispatch; if (value) {
const values = [...(args[1].components?.values ?? [])]; args[1].components!.onMessageDelete(value[0]);
data.ids.forEach(id => { }
const value = values.find(x => x[1].messageId === id); });
if (value) { }
args[1].components!.onMessageDelete(value[0]); break;
} }
});
} await this.runEvent(args[0].t, args[1], args[0].d, args[2]);
break; await this.client.collectors.run(args[0].t, args[0].d);
} }
await this.runEvent(args[0].t, args[1], args[0].d, args[2]); async runEvent(name: GatewayEvents, client: Client | WorkerClient, packet: any, shardId: number) {
await this.collectors.run(args[0].t, args[0].d); const Event = this.values[name];
} if (!Event) {
return this.client.cache.onPacket({
async runEvent(name: GatewayEvents, client: Client | WorkerClient, packet: any, shardId: number) { t: name,
const Event = this.values[name]; d: packet,
if (!Event) { } as GatewayDispatchPayload);
return; }
} try {
try { if (Event.data.once && Event.fired) {
if (Event.data.once && Event.fired) { return this.client.cache.onPacket({
return; t: name,
} d: packet,
Event.fired = true; } as GatewayDispatchPayload);
const hook = await RawEvents[name]?.(client, packet as never); }
await Event.run(...[hook, client, shardId]); Event.fired = true;
} catch (e) { const hook = await RawEvents[name]?.(client, packet as never);
await this.onFail(name, e); if (name !== 'RAW')
} await this.client.cache.onPacket({
} t: name,
d: packet,
async reload(name: ClientNameEvents) { } as GatewayDispatchPayload);
const eventName = ReplaceRegex.snake(name).toUpperCase() as GatewayEvents; await Event.run(...[hook, client, shardId]);
const event = this.values[eventName]; } catch (e) {
if (!event?.__filePath) return null; await this.onFail(name, e);
delete require.cache[event.__filePath]; }
const imported = await magicImport(event.__filePath).then(x => x.default ?? x); }
imported.__filePath = event.__filePath;
this.values[eventName] = imported; async reload(name: ClientNameEvents) {
return imported; const eventName = ReplaceRegex.snake(name).toUpperCase() as GatewayEvents;
} const event = this.values[eventName];
if (!event?.__filePath) return null;
async reloadAll(stopIfFail = true) { delete require.cache[event.__filePath];
for (const i in this.values) { const imported = await magicImport(event.__filePath).then(x => x.default ?? x);
try { imported.__filePath = event.__filePath;
await this.reload(ReplaceRegex.camel(i) as ClientNameEvents); this.values[eventName] = imported;
} catch (e) { return imported;
if (stopIfFail) { }
throw e;
} async reloadAll(stopIfFail = true) {
} for (const i in this.values) {
} try {
} await this.reload(ReplaceRegex.camel(i) as ClientNameEvents);
} catch (e) {
setHandlers({ callback }: { callback: EventHandler['callback'] }) { if (stopIfFail) {
this.callback = callback; throw e;
} }
}
callback = (file: ClientEvent): ClientEvent | false => file; }
} }
setHandlers({ callback }: { callback: EventHandler['callback'] }) {
this.callback = callback;
}
callback = (file: ClientEvent): ClientEvent | false => file;
}