Feat: cloudfare workers support (#190)

* feat: cloudfare workers

* fix: chokidar

* fix: handlers

* fix: lazyLoadPackage
This commit is contained in:
MARCROCK22 2024-05-07 16:53:10 -04:00 committed by GitHub
parent 6110337704
commit 3b2cbd3089
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 279 additions and 78 deletions

View File

@ -1,8 +1,6 @@
import { filetypeinfo } from 'magic-bytes.js';
import { randomUUID } from 'node:crypto';
import { setTimeout as delay } from 'node:timers/promises';
import { parentPort, workerData } from 'node:worker_threads';
import { Logger } from '../common';
import { Logger, delay, lazyLoadPackage } from '../common';
import { snowflakeToTimestamp } from '../structures/extra/functions';
import type { WorkerData } from '../websocket';
import type { WorkerSendApiRequest } from '../websocket/discord/worker';
@ -20,6 +18,9 @@ import {
} from './shared';
import { isBufferLike } from './utils/utils';
let parentPort: import('node:worker_threads').MessagePort;
let workerData: WorkerData;
export class ApiHandler {
options: ApiHandlerInternalOptions;
globalBlock = false;
@ -43,8 +44,15 @@ export class ApiHandler {
});
}
if (options.workerProxy && !parentPort) throw new Error('Cannot use workerProxy without a parent.');
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (options.workerProxy && !worker_threads?.parentPort) throw new Error('Cannot use workerProxy without a parent.');
if (options.workerProxy) this.workerPromises = new Map();
if (worker_threads) {
workerData = worker_threads.workerData;
if (worker_threads.parentPort) parentPort = worker_threads.parentPort;
}
}
globalUnblock() {

View File

@ -1,9 +1,8 @@
import type { APIAttachment, RESTAPIAttachment } from 'discord-api-types/v10';
import { randomBytes } from 'node:crypto';
import { readFile, stat } from 'node:fs/promises';
import path from 'node:path';
import { type UsingClient, throwError, type RawFile } from '..';
import type { ImageResolvable, ObjectToLower } from '../common';
import { promisesReadFile, promisesStat, type ImageResolvable, type ObjectToLower } from '../common';
import { Base } from '../structures/extra/Base';
export interface AttachmentResolvableMap {
@ -196,12 +195,12 @@ export async function resolveAttachmentData(
}
case 'path': {
const file = path.resolve(data as string);
const stats = await stat(file);
const stats = await promisesStat(file);
if (!stats.isFile())
return throwError(
`The attachment type has been expressed as ${type.toUpperCase()} but cannot be resolved as one.`,
);
return { data: await readFile(file) };
return { data: await promisesReadFile(file) };
}
case 'buffer': {
if (Buffer.isBuffer(data)) return { data };

View File

@ -1,14 +1,19 @@
import { randomUUID } from 'node:crypto';
import { parentPort } from 'node:worker_threads';
import type { WorkerData } from '../../websocket';
import type { WorkerSendCacheRequest } from '../../websocket/discord/worker';
import type { Adapter } from './types';
import { lazyLoadPackage } from '../../common';
let parentPort: import('node:worker_threads').MessagePort;
export class WorkerAdapter implements Adapter {
isAsync = true;
promises = new Map<string, { resolve: (value: unknown) => void; timeout: NodeJS.Timeout }>();
constructor(public workerData: WorkerData) {}
constructor(public workerData: WorkerData) {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads?.parentPort) parentPort = worker_threads.parentPort;
}
postMessage(body: any) {
if (parentPort) return parentPort.postMessage(body);

View File

@ -81,6 +81,8 @@ export class BaseClient {
options: BaseClientOptions | undefined;
static seyfertConfig?: InternalRuntimeConfigHTTP | InternalRuntimeConfig;
constructor(options?: BaseClientOptions) {
this.options = MergeOptions(
{
@ -300,11 +302,12 @@ export class BaseClient {
async getRC<
T extends InternalRuntimeConfigHTTP | InternalRuntimeConfig = InternalRuntimeConfigHTTP | InternalRuntimeConfig,
>() {
const { locations, debug, ...env } = (await magicImport(join(process.cwd(), 'seyfert.config.js')).then(
x => x.default ?? x,
)) as T;
const seyfertConfig = (BaseClient.seyfertConfig ||
(await magicImport(join(process.cwd(), 'seyfert.config.js')).then(x => x.default ?? x))) as T;
return {
const { locations, debug, ...env } = seyfertConfig;
const obj = {
debug: !!debug,
...env,
templates: locations.templates ? join(process.cwd(), locations.base, locations.templates) : undefined,
@ -316,6 +319,10 @@ export class BaseClient {
base: join(process.cwd(), locations.base),
output: join(process.cwd(), locations.output),
};
BaseClient.seyfertConfig = seyfertConfig;
return obj;
}
}

View File

@ -1,7 +1,6 @@
import { GatewayIntentBits, type GatewayDispatchPayload, type GatewayPresenceUpdateData } from 'discord-api-types/v10';
import { parentPort, workerData } from 'node:worker_threads';
import type { Command, CommandContext, Message, SubCommand } from '..';
import type { DeepPartial, If, WatcherPayload, WatcherSendToShard } from '../common';
import { lazyLoadPackage, type DeepPartial, type If, type WatcherPayload, type WatcherSendToShard } from '../common';
import { EventHandler } from '../events';
import { ClientUser } from '../structures';
import { ShardManager, properties, type ShardManagerOptions } from '../websocket';
@ -12,6 +11,8 @@ import { BaseClient } from './base';
import { onInteractionCreate } from './oninteractioncreate';
import { onMessageCreate } from './onmessagecreate';
let parentPort: import('node:worker_threads').MessagePort;
export class Client<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: Set<string> = new Set();
gateway!: ShardManager;
@ -68,7 +69,14 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
protected async execute(options: { token?: string; intents?: number } = {}) {
await super.execute(options);
if (!workerData?.__USING_WATCHER__) {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads?.parentPort) {
parentPort = worker_threads.parentPort;
}
if (!worker_threads?.workerData.__USING_WATCHER__) {
await this.gateway.spawnShards();
} else {
parentPort?.on('message', (data: WatcherPayload | WatcherSendToShard) => {

View File

@ -35,9 +35,9 @@ export class HttpClient extends BaseClient {
constructor(options?: BaseClientOptions) {
super(options);
if (!UWS) {
throw new Error('No uws installed.');
}
// if (!UWS) {
// throw new Error('No uws installed.');
// }
if (!nacl) {
throw new Error('No tweetnacl installed.');
}
@ -89,20 +89,40 @@ export class HttpClient extends BaseClient {
this.publicKey = publicKey;
this.publicKeyHex = Buffer.from(this.publicKey, 'hex');
this.app = UWS!.App();
this.app.post('/interactions', (res, req) => {
return this.onPacket(res, req);
});
this.app.listen(port, () => {
this.logger.info(`Listening to port ${port}`);
});
if (UWS) {
this.app = UWS.App();
this.app.post('/interactions', (res, req) => {
return this.onPacket(res, req);
});
this.app.listen(port, () => {
this.logger.info(`Listening to port ${port}`);
});
} else {
this.logger.warn('No UWS installed.');
}
}
async start(options: DeepPartial<Omit<StartOptions, 'connection'>> = {}) {
async start(options: DeepPartial<Omit<StartOptions, 'connection' | 'eventsDir'>> = {}) {
await super.start(options);
return this.execute(options.httpConnection);
}
protected async verifySignatureGenericRequest(req: Request) {
const timestamp = req.headers.get('x-signature-timestamp');
const ed25519 = req.headers.get('x-signature-ed25519') ?? '';
const body = (await req.json()) as APIInteraction;
if (
nacl!.sign.detached.verify(
Buffer.from(timestamp + JSON.stringify(body)),
Buffer.from(ed25519, 'hex'),
this.publicKeyHex,
)
) {
return body;
}
return;
}
// https://discord.com/developers/docs/interactions/receiving-and-responding#security-and-authorization
protected async verifySignature(res: HttpResponse, req: HttpRequest) {
const timestamp = req.getHeader('x-signature-timestamp');
@ -120,7 +140,71 @@ export class HttpClient extends BaseClient {
return;
}
async onPacket(res: HttpResponse, req: HttpRequest) {
async fetch(req: Request): Promise<Response> {
const rawBody = await this.verifySignatureGenericRequest(req);
if (!rawBody) {
this.debugger?.debug('Invalid request/No info, returning 418 status.');
// I'm a teapot
return new Response('', { status: 418 });
}
switch (rawBody.type) {
case InteractionType.Ping:
this.debugger?.debug('Ping interaction received, responding.');
return Response.json(
{ type: InteractionResponseType.Pong },
{
headers: {
'Content-Type': 'application/json',
},
},
);
default:
return new Promise<Response>(r => {
onInteractionCreate(this, rawBody, -1, async ({ body, files }) => {
let response: FormData | APIInteractionResponse;
const headers: { 'Content-Type'?: string } = {};
if (files) {
response = new FormData();
for (const [index, file] of files.entries()) {
const fileKey = file.key ?? `files[${index}]`;
if (isBufferLike(file.data)) {
let contentType = file.contentType;
if (!contentType) {
const [parsedType] = filetypeinfo(file.data);
if (parsedType) {
contentType =
OverwrittenMimeTypes[parsedType.mime as keyof typeof OverwrittenMimeTypes] ??
parsedType.mime ??
'application/octet-stream';
}
}
response.append(fileKey, new Blob([file.data], { type: contentType }), file.name);
} else {
response.append(fileKey, new Blob([`${file.data}`], { type: file.contentType }), file.name);
}
}
if (body) {
response.append('payload_json', JSON.stringify(body));
}
} else {
response = body ?? {};
headers['Content-Type'] = 'application/json';
}
r(
Response.json(response, {
headers,
}),
);
});
});
}
}
protected async onPacket(res: HttpResponse, req: HttpRequest) {
const rawBody = await this.verifySignature(res, req);
if (!rawBody) {
this.debugger?.debug('Invalid request/No info, returning 418 status.');

View File

@ -1,10 +1,9 @@
import { GatewayIntentBits, type GatewayDispatchPayload, type GatewaySendPayload } from 'discord-api-types/v10';
import { randomUUID } from 'node:crypto';
import { parentPort as manager } from 'node:worker_threads';
import { ApiHandler, Logger } from '..';
import type { Cache } from '../cache';
import { WorkerAdapter } from '../cache';
import { LogLevels, type DeepPartial, type When } from '../common';
import { LogLevels, lazyLoadPackage, type DeepPartial, type When } from '../common';
import { EventHandler } from '../events';
import { ClientUser } from '../structures';
import { Shard, type ShardManagerOptions, type WorkerData } from '../websocket';
@ -28,6 +27,7 @@ import { onInteractionCreate } from './oninteractioncreate';
import { onMessageCreate } from './onmessagecreate';
let workerData: WorkerData;
let manager: import('node:worker_threads').MessagePort;
try {
workerData = {
debug: process.env.SEYFERT_WORKER_DEBUG === 'true',
@ -51,6 +51,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
shards = new Map<number, Shard>();
declare options: WorkerClientOptions | undefined;
constructor(options?: WorkerClientOptions) {
@ -62,7 +63,13 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
type: 'WORKER_START',
workerId: workerData.workerId,
} satisfies WorkerStart);
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads?.parentPort) {
manager = worker_threads?.parentPort;
}
(manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data));
this.setServices({
cache: {
adapter: new WorkerAdapter(workerData),

View File

@ -9,7 +9,7 @@ import {
type GuildMember,
type InteractionGuildMember,
} from '../../structures';
import { BaseContext } from '../basecontex';
import { BaseContext } from '../basecontext';
import type { RegisteredMiddlewares } from '../decorators';
import type { OptionResolver } from '../optionresolver';
import type { Command, ContextOptions, OptionsRecord, SubCommand } from './chat';

View File

@ -17,7 +17,7 @@ import {
type MessageCommandInteraction,
type UserCommandInteraction,
} from '../../structures';
import { BaseContext } from '../basecontex';
import { BaseContext } from '../basecontext';
import type { RegisteredMiddlewares } from '../decorators';
import type { CommandMetadata, ExtendContext, GlobalMetadata, UsingClient } from './shared';

View File

@ -8,6 +8,7 @@ import type { UsingClient } from './applications/shared';
export class CommandHandler extends BaseHandler {
values: (Command | ContextMenuCommand)[] = [];
protected filter = (path: string) => path.endsWith('.js') || (!path.endsWith('.d.ts') && path.endsWith('.ts'));
constructor(
@ -36,10 +37,15 @@ export class CommandHandler extends BaseHandler {
}
}
async load(commandsDir: string, client: UsingClient) {
const result = (
await this.loadFilesK<{ new (): Command | SubCommand | ContextMenuCommand }>(await this.getFiles(commandsDir))
).filter(x => x.file);
async load(commandsDir: string, client: UsingClient, instances?: { new (): Command | ContextMenuCommand }[]) {
const result =
instances?.map(x => {
const i = new x();
return { name: i.name, file: x, path: i.__filePath ?? '*' };
}) ??
(
await this.loadFilesK<{ new (): Command | SubCommand | ContextMenuCommand }>(await this.getFiles(commandsDir))
).filter(x => x.file);
this.values = [];
for (const command of result) {

View File

@ -1,18 +1,17 @@
import { watch } from 'chokidar';
import type { GatewayDispatchPayload, GatewaySendPayload } from 'discord-api-types/v10';
import { execSync } from 'node:child_process';
import { Worker } from 'node:worker_threads';
import { ApiHandler, Router } from '../../api';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import { ShardManager, type ShardManagerOptions } from '../../websocket';
import { Logger } from '../it/logger';
import type { MakeRequired } from '../types/util';
import { lazyLoadPackage } from '../it/utils';
/**
* Represents a watcher class that extends the ShardManager.
*/
export class Watcher extends ShardManager {
worker?: Worker;
worker?: import('node:worker_threads').Worker;
logger = new Logger({
name: '[Watcher]',
});
@ -47,11 +46,13 @@ export class Watcher extends ShardManager {
* Resets the worker instance.
*/
resetWorker() {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (!worker_threads) throw new Error('Cannot use worker_threads');
if (this.worker) {
this.worker.terminate();
}
this.build();
this.worker = new Worker(this.options.filePath, {
this.worker = new worker_threads.Worker(this.options.filePath, {
argv: this.options.argv,
workerData: {
__USING_WATCHER__: true,
@ -95,7 +96,12 @@ export class Watcher extends ShardManager {
this.connectQueue.concurrency = this.options.info.session_start_limit.max_concurrency;
await super.spawnShards();
const watcher = watch(this.options.srcPath).on('ready', () => {
const chokidar = lazyLoadPackage<typeof import('chokidar')>('chokidar');
if (!chokidar?.watch) return this.logger.warn('No chokidar installed.');
const watcher = chokidar.watch(this.options.srcPath).on('ready', () => {
this.logger.debug(`Watching ${this.options.srcPath}`);
watcher.on('all', event => {
this.logger.debug(`${event} event detected, building`);

View File

@ -1,8 +1,7 @@
import { createWriteStream, existsSync, mkdirSync, type WriteStream } from 'node:fs';
import { readdir, unlink } from 'node:fs/promises';
import { join } from 'node:path';
import { bgBrightWhite, black, bold, brightBlack, cyan, gray, italic, red, stripColor, yellow } from './colors';
import { MergeOptions } from './utils';
import { MergeOptions, promisesReaddir, promisesUnlink } from './utils';
export enum LogLevels {
Debug = 0,
Info = 1,
@ -52,10 +51,10 @@ export class Logger {
}
static async clearLogs() {
for (const i of await readdir(join(process.cwd(), Logger.dirname))) {
if (this.streams[i]) await new Promise(res => this.streams[i]!.close(res));
await unlink(join(process.cwd(), Logger.dirname, i)).catch(() => {});
delete this.streams[i];
for (const i of await promisesReaddir(join(process.cwd(), Logger.dirname))) {
if (this.streams[i.name]) await new Promise(res => this.streams[i.name]!.close(res));
await promisesUnlink(join(process.cwd(), Logger.dirname, i.name)).catch(() => {});
delete this.streams[i.name];
}
}
@ -131,10 +130,10 @@ export class Logger {
if (!Logger.__callback) {
const color = Logger.colorFunctions.get(level) ?? Logger.noColor;
const memoryData = process.memoryUsage();
const memoryData = process.memoryUsage?.();
const date = new Date();
log = [
brightBlack(formatMemoryUsage(memoryData.rss)),
brightBlack(formatMemoryUsage(memoryData?.rss ?? 0)),
bgBrightWhite(black(`[${date.toLocaleDateString()} ${date.toLocaleTimeString()}]`)),
color(Logger.prefixes.get(level) ?? 'DEBUG'),
this.name ? `${this.name} >` : '>',

View File

@ -1,6 +1,6 @@
import { readdir } from 'node:fs/promises';
import type fs from 'node:fs';
import { type Dirent, readFile, readdir, stat, unlink } from 'node:fs';
import { basename, join } from 'node:path';
import { setTimeout } from 'node:timers/promises';
import { EmbedColors, type ColorResolvable, type Logger, type ObjectToLower, type ObjectToSnake } from '..';
/**
@ -33,7 +33,7 @@ export function resolveColor(color: ColorResolvable): number {
* @returns A Promise that resolves after the specified time with the provided result.
*/
export function delay<T>(time: number, result?: T): Promise<T> {
return setTimeout(time, result);
return new Promise(r => setTimeout(r, time, result));
}
/**
@ -115,7 +115,7 @@ export class BaseHandler {
protected async getFiles(dir: string) {
const files: string[] = [];
for (const i of await readdir(dir, { withFileTypes: true })) {
for (const i of await promisesReaddir(dir)) {
if (i.isDirectory()) {
files.push(...(await this.getFiles(join(dir, i.name))));
} else {
@ -268,3 +268,54 @@ export function fakePromise<T = unknown | Promise<unknown>>(
then: callback => callback(value as Awaited<T>),
};
}
export function lazyLoadPackage<T>(mod: string): T | undefined {
try {
return require(mod);
} catch (e) {
console.log(`Cannot import ${mod}`);
return;
}
}
export function isCloudfareWorker() {
//@ts-expect-error
return process.platform === 'browser';
}
//cloudfare support
export function promisesReadFile(file: string) {
return new Promise<Buffer>((res, rej) =>
readFile(file, (err, result) => {
if (err) return rej(err);
res(result);
}),
);
}
export function promisesUnlink(file: string) {
return new Promise<void>((res, rej) =>
unlink(file, err => {
if (err) return rej(err);
res();
}),
);
}
export function promisesStat(file: string) {
return new Promise<fs.Stats>((res, rej) =>
stat(file, (err, result) => {
if (err) return rej(err);
res(result);
}),
);
}
export function promisesReaddir(file: string) {
return new Promise<Dirent[]>((res, rej) =>
readdir(file, { withFileTypes: true }, (err, result) => {
if (err) return rej(err);
res(result);
}),
);
}

View File

@ -15,7 +15,7 @@ import type {
WebhookMessage,
} from '..';
import type { ExtendContext, UsingClient } from '../commands';
import { BaseContext } from '../commands/basecontex';
import { BaseContext } from '../commands/basecontext';
import type {
ComponentInteractionMessageUpdate,
InteractionCreateBodyRequest,

View File

@ -145,10 +145,12 @@ export class ComponentHandler extends BaseHandler {
this.deleteValue(id, 'messageDelete');
}
async load(componentsDir: string) {
const paths = await this.loadFilesK<{ new (): ModalCommand | ComponentCommand }>(
await this.getFiles(componentsDir),
);
async load(componentsDir: string, instances?: { new (): ModalCommand | ComponentCommand }[]) {
const paths =
instances?.map(x => {
const i = new x();
return { file: x, path: i.__filePath ?? '*' };
}) ?? (await this.loadFilesK<{ new (): ModalCommand | ComponentCommand }>(await this.getFiles(componentsDir)));
for (let i = 0; i < paths.length; i++) {
let component;

View File

@ -20,8 +20,8 @@ export class EventHandler extends BaseHandler {
values: Partial<Record<GatewayEvents, EventValue>> = {};
async load(eventsDir: string) {
for (const i of await this.loadFilesK<ClientEvent>(await this.getFiles(eventsDir))) {
async load(eventsDir: string, instances?: { file: ClientEvent; path: string }[]) {
for (const i of instances ?? (await this.loadFilesK<ClientEvent>(await this.getFiles(eventsDir)))) {
const instance = this.callback(i.file);
if (!instance) continue;
if (typeof instance?.run !== 'function') {

View File

@ -1,13 +1,15 @@
import { GatewayIntentBits } from 'discord-api-types/gateway/v10';
import type {
BaseClientOptions,
InternalRuntimeConfig,
InternalRuntimeConfigHTTP,
RuntimeConfig,
RuntimeConfigHTTP,
import {
BaseClient,
type BaseClientOptions,
type InternalRuntimeConfig,
type InternalRuntimeConfigHTTP,
type RuntimeConfig,
type RuntimeConfigHTTP,
} from './client/base';
import type { ClientNameEvents, EventContext } from './events';
export { Logger, PermissionStrings, Watcher } from './common';
import { isCloudfareWorker } from './common';
export { Logger, PermissionStrings } from './common';
//
export { Collection, LimitedCollection } from './collection';
//
@ -80,10 +82,12 @@ export const config = {
* @returns The internal runtime configuration for HTTP.
*/
http(data: RuntimeConfigHTTP) {
return {
const obj = {
port: 8080,
...data,
} as InternalRuntimeConfigHTTP;
if (isCloudfareWorker()) BaseClient.seyfertConfig = obj;
return obj;
},
};

View File

@ -36,8 +36,8 @@ export class LangsHandler extends BaseHandler {
return LangRouter(locale, this.defaultLang ?? locale, this.values)();
}
async load(dir: string) {
const files = await this.loadFilesK<Record<string, any>>(await this.getFiles(dir));
async load(dir: string, instances?: { name: string; file: Record<string, any> }[]) {
const files = instances ?? (await this.loadFilesK<Record<string, any>>(await this.getFiles(dir)));
for (const i of files) {
const locale = i.name.split('.').slice(0, -1).join('.');
const result = this.callback(locale, i.file);

View File

@ -4,11 +4,11 @@ import {
type GatewayUpdatePresence,
type GatewayVoiceStateUpdate,
} from 'discord-api-types/v10';
import { parentPort, workerData } from 'node:worker_threads';
import {
LogLevels,
Logger,
MergeOptions,
lazyLoadPackage,
toSnakeCase,
type ObjectToLower,
type WatcherSendToShard,
@ -17,7 +17,10 @@ import { ShardManagerDefaults } from '../constants';
import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout';
import { Shard } from './shard.js';
import type { ShardManagerOptions } from './shared';
import type { ShardManagerOptions, WorkerData } from './shared';
let parentPort: import('node:worker_threads').MessagePort;
let workerData: WorkerData;
export class ShardManager extends Map<number, Shard> {
connectQueue: ConnectQueue;
@ -36,6 +39,13 @@ export class ShardManager extends Map<number, Shard> {
logLevel: LogLevels.Debug,
});
}
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (worker_threads) {
workerData = worker_threads.workerData;
if (worker_threads.parentPort) parentPort = worker_threads.parentPort;
}
}
get totalShards() {

View File

@ -120,4 +120,5 @@ export interface WorkerData {
workerId: number;
debug: boolean;
workerProxy: boolean;
__USING_WATCHER__?: boolean;
}

View File

@ -1,11 +1,10 @@
import type { GatewayPresenceUpdateData, GatewaySendPayload } from 'discord-api-types/v10';
import cluster, { type Worker as ClusterWorker } from 'node:cluster';
import { randomUUID } from 'node:crypto';
import { Worker as ThreadWorker } from 'node:worker_threads';
import { ApiHandler, Logger, Router } from '../..';
import { MemoryAdapter, type Adapter } from '../../cache';
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
import { MergeOptions, type MakePartial } from '../../common';
import { MergeOptions, lazyLoadPackage, type MakePartial } from '../../common';
import { WorkerManagerDefaults } from '../constants';
import { DynamicBucket } from '../structures';
import { ConnectQueue } from '../structures/timeout';
@ -14,7 +13,10 @@ import { PresenceUpdateHandler } from './events/presenceUpdate';
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
import type { WorkerInfo, WorkerMessage, WorkerShardInfo, WorkerStart } from './worker';
export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) & { ready?: boolean }> {
export class WorkerManager extends Map<
number,
(ClusterWorker | import('node:worker_threads').Worker) & { ready?: boolean }
> {
options!: Required<WorkerManagerOptions>;
debugger?: Logger;
connectQueue!: ConnectQueue;
@ -124,7 +126,7 @@ export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) &
(worker as ClusterWorker).send(body);
break;
case 'threads':
(worker as ThreadWorker).postMessage(body);
(worker as import('worker_threads').Worker).postMessage(body);
break;
}
}
@ -162,6 +164,8 @@ export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) &
}
createWorker(workerData: WorkerData) {
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
if (!worker_threads) throw new Error('Cannot create worker without worker_threads.');
const env: Record<string, any> = {
SEYFERT_SPAWNING: 'true',
};
@ -170,7 +174,7 @@ export class WorkerManager extends Map<number, (ClusterWorker | ThreadWorker) &
}
switch (this.options.mode) {
case 'threads': {
const worker = new ThreadWorker(workerData.path, {
const worker = new worker_threads.Worker(workerData.path, {
env,
});
worker.on('message', data => this.handleWorkerMessage(data));