mirror of
https://github.com/tiramisulabs/seyfert.git
synced 2025-07-03 05:26:07 +00:00
feat: workerManager & workerclient adapter (#217)
This commit is contained in:
parent
0be7c73d4a
commit
8e655ee1c7
2
src/cache/adapters/workeradapter.ts
vendored
2
src/cache/adapters/workeradapter.ts
vendored
@ -15,7 +15,7 @@ export class WorkerAdapter implements Adapter {
|
|||||||
if (worker_threads?.parentPort) parentPort = worker_threads.parentPort;
|
if (worker_threads?.parentPort) parentPort = worker_threads.parentPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
postMessage(body: any) {
|
postMessage(body: any): unknown {
|
||||||
if (parentPort) return parentPort.postMessage(body);
|
if (parentPort) return parentPort.postMessage(body);
|
||||||
return process.send!(body);
|
return process.send!(body);
|
||||||
}
|
}
|
||||||
|
@ -25,6 +25,8 @@ import type { Client, ClientOptions } from './client';
|
|||||||
|
|
||||||
import { Collectors } from './collectors';
|
import { Collectors } from './collectors';
|
||||||
import { type ClientUserStructure, Transformers } from './transformers';
|
import { type ClientUserStructure, Transformers } from './transformers';
|
||||||
|
import { MemberUpdateHandler } from '../websocket/discord/events/memberUpdate';
|
||||||
|
import { PresenceUpdateHandler } from '../websocket/discord/events/presenceUpdate';
|
||||||
|
|
||||||
let workerData: WorkerData;
|
let workerData: WorkerData;
|
||||||
let manager: import('node:worker_threads').MessagePort;
|
let manager: import('node:worker_threads').MessagePort;
|
||||||
@ -37,15 +39,16 @@ try {
|
|||||||
token: process.env.SEYFERT_WORKER_TOKEN!,
|
token: process.env.SEYFERT_WORKER_TOKEN!,
|
||||||
workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!),
|
workerId: Number.parseInt(process.env.SEYFERT_WORKER_WORKERID!),
|
||||||
workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true',
|
workerProxy: process.env.SEYFERT_WORKER_WORKERPROXY === 'true',
|
||||||
|
totalShards: Number(process.env.SEYFERT_WORKER_TOTALSHARDS),
|
||||||
|
mode: process.env.SEYFERT_WORKER_MODE,
|
||||||
} as WorkerData;
|
} as WorkerData;
|
||||||
} catch {}
|
} catch {}
|
||||||
|
|
||||||
export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
||||||
private __handleGuilds?: Set<string> = new Set();
|
private __handleGuilds?: Set<string> = new Set();
|
||||||
logger = new Logger({
|
|
||||||
name: `[Worker #${workerData.workerId}]`,
|
|
||||||
});
|
|
||||||
|
|
||||||
|
memberUpdateHandler = new MemberUpdateHandler();
|
||||||
|
presenceUpdateHandler = new PresenceUpdateHandler();
|
||||||
collectors = new Collectors();
|
collectors = new Collectors();
|
||||||
events? = new EventHandler(this);
|
events? = new EventHandler(this);
|
||||||
me!: When<Ready, ClientUserStructure>;
|
me!: When<Ready, ClientUserStructure>;
|
||||||
@ -57,41 +60,8 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
|
|
||||||
constructor(options?: WorkerClientOptions) {
|
constructor(options?: WorkerClientOptions) {
|
||||||
super(options);
|
super(options);
|
||||||
|
if (options?.postMessage) {
|
||||||
if (!process.env.SEYFERT_SPAWNING) {
|
this.postMessage = options.postMessage;
|
||||||
throw new Error('WorkerClient cannot spawn without manager');
|
|
||||||
}
|
|
||||||
this.postMessage({
|
|
||||||
type: 'WORKER_START',
|
|
||||||
workerId: workerData.workerId,
|
|
||||||
} satisfies WorkerStart);
|
|
||||||
|
|
||||||
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
|
|
||||||
if (worker_threads?.parentPort) {
|
|
||||||
manager = worker_threads?.parentPort;
|
|
||||||
}
|
|
||||||
(manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data));
|
|
||||||
|
|
||||||
this.setServices({
|
|
||||||
cache: {
|
|
||||||
adapter: new WorkerAdapter(workerData),
|
|
||||||
disabledCache: options?.disabledCache,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
if (workerData.debug) {
|
|
||||||
this.debugger = new Logger({
|
|
||||||
name: `[Worker #${workerData.workerId}]`,
|
|
||||||
logLevel: LogLevels.Debug,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
if (workerData.workerProxy) {
|
|
||||||
this.setServices({
|
|
||||||
rest: new ApiHandler({
|
|
||||||
token: workerData.token,
|
|
||||||
workerProxy: true,
|
|
||||||
debug: workerData.debug,
|
|
||||||
}),
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,7 +96,54 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setWorkerData(data: WorkerData) {
|
||||||
|
workerData = data;
|
||||||
|
}
|
||||||
|
|
||||||
async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) {
|
async start(options: Omit<DeepPartial<StartOptions>, 'httpConnection' | 'token' | 'connection'> = {}) {
|
||||||
|
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
|
||||||
|
|
||||||
|
if (worker_threads?.parentPort) {
|
||||||
|
manager = worker_threads?.parentPort;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerData.mode !== 'custom')
|
||||||
|
(manager ?? process).on('message', (data: ManagerMessages) => this.handleManagerMessages(data));
|
||||||
|
|
||||||
|
this.logger = new Logger({
|
||||||
|
name: `[Worker #${workerData.workerId}]`,
|
||||||
|
});
|
||||||
|
|
||||||
|
const adapter = new WorkerAdapter(workerData);
|
||||||
|
if (this.options.postMessage) {
|
||||||
|
adapter.postMessage = this.options.postMessage;
|
||||||
|
}
|
||||||
|
this.setServices({
|
||||||
|
cache: {
|
||||||
|
adapter,
|
||||||
|
disabledCache: this.options.disabledCache,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
if (workerData.debug) {
|
||||||
|
this.debugger = new Logger({
|
||||||
|
name: `[Worker #${workerData.workerId}]`,
|
||||||
|
logLevel: LogLevels.Debug,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
if (workerData.workerProxy) {
|
||||||
|
this.setServices({
|
||||||
|
rest: new ApiHandler({
|
||||||
|
token: workerData.token,
|
||||||
|
workerProxy: true,
|
||||||
|
debug: workerData.debug,
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
this.postMessage({
|
||||||
|
type: 'WORKER_START',
|
||||||
|
workerId: workerData.workerId,
|
||||||
|
} satisfies WorkerStart);
|
||||||
await super.start(options);
|
await super.start(options);
|
||||||
await this.loadEvents(options.eventsDir);
|
await this.loadEvents(options.eventsDir);
|
||||||
this.cache.intents = workerData.intents;
|
this.cache.intents = workerData.intents;
|
||||||
@ -140,12 +157,12 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
postMessage(body: any) {
|
postMessage(body: unknown): unknown {
|
||||||
if (manager) return manager.postMessage(body);
|
if (manager) return manager.postMessage(body);
|
||||||
return process.send!(body);
|
return process.send!(body);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected async handleManagerMessages(data: ManagerMessages) {
|
async handleManagerMessages(data: ManagerMessages) {
|
||||||
switch (data.type) {
|
switch (data.type) {
|
||||||
case 'CACHE_RESULT':
|
case 'CACHE_RESULT':
|
||||||
if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) {
|
if (this.cache.adapter instanceof WorkerAdapter && this.cache.adapter.promises.has(data.nonce)) {
|
||||||
@ -329,6 +346,23 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
this.collectors.run('RAW', packet),
|
this.collectors.run('RAW', packet),
|
||||||
]); //ignore promise
|
]); //ignore promise
|
||||||
switch (packet.t) {
|
switch (packet.t) {
|
||||||
|
//// Cases where we must obtain the old data before updating
|
||||||
|
case 'GUILD_MEMBER_UPDATE':
|
||||||
|
{
|
||||||
|
if (!this.memberUpdateHandler.check(packet.d)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await this.events?.execute(packet.t, packet, this as WorkerClient<true>, shardId);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 'PRESENCE_UPDATE':
|
||||||
|
{
|
||||||
|
if (!this.presenceUpdateHandler.check(packet.d)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
await this.events?.execute(packet.t, packet, this as WorkerClient<true>, shardId);
|
||||||
|
}
|
||||||
|
break;
|
||||||
case 'GUILD_CREATE': {
|
case 'GUILD_CREATE': {
|
||||||
if (this.__handleGuilds?.has(packet.d.id)) {
|
if (this.__handleGuilds?.has(packet.d.id)) {
|
||||||
this.__handleGuilds.delete(packet.d.id);
|
this.__handleGuilds.delete(packet.d.id);
|
||||||
@ -348,6 +382,12 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
default: {
|
default: {
|
||||||
await this.events?.execute(packet.t as never, packet, this, shardId);
|
await this.events?.execute(packet.t as never, packet, this, shardId);
|
||||||
switch (packet.t) {
|
switch (packet.t) {
|
||||||
|
case 'INTERACTION_CREATE':
|
||||||
|
await this.handleCommand.interaction(packet.d, shardId);
|
||||||
|
break;
|
||||||
|
case 'MESSAGE_CREATE':
|
||||||
|
await this.handleCommand.message(packet.d, shardId);
|
||||||
|
break;
|
||||||
case 'READY':
|
case 'READY':
|
||||||
if (!this.__handleGuilds) this.__handleGuilds = new Set();
|
if (!this.__handleGuilds) this.__handleGuilds = new Set();
|
||||||
for (const g of packet.d.guilds) {
|
for (const g of packet.d.guilds) {
|
||||||
@ -370,13 +410,7 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
|
|||||||
}
|
}
|
||||||
delete this.__handleGuilds;
|
delete this.__handleGuilds;
|
||||||
}
|
}
|
||||||
this.debugger?.debug(`#${shardId} [${packet.d.user.username}](${this.botId}) is online...`);
|
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
|
||||||
break;
|
|
||||||
case 'INTERACTION_CREATE':
|
|
||||||
await this.handleCommand.interaction(packet.d, shardId);
|
|
||||||
break;
|
|
||||||
case 'MESSAGE_CREATE':
|
|
||||||
await this.handleCommand.message(packet.d, shardId);
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -395,8 +429,9 @@ export function generateShardInfo(shard: Shard): WorkerShardInfo {
|
|||||||
}
|
}
|
||||||
|
|
||||||
interface WorkerClientOptions extends BaseClientOptions {
|
interface WorkerClientOptions extends BaseClientOptions {
|
||||||
disabledCache: Cache['disabledCache'];
|
disabledCache?: Cache['disabledCache'];
|
||||||
commands?: NonNullable<Client['options']>['commands'];
|
commands?: NonNullable<Client['options']>['commands'];
|
||||||
handlePayload?: ShardManagerOptions['handlePayload'];
|
handlePayload?: ShardManagerOptions['handlePayload'];
|
||||||
gateway?: ClientOptions['gateway'];
|
gateway?: ClientOptions['gateway'];
|
||||||
|
postMessage?: (body: unknown) => unknown;
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@ export class MemberUpdateHandler {
|
|||||||
|
|
||||||
membersEquals(old: GatewayGuildMemberUpdateDispatchData, member: GatewayGuildMemberUpdateDispatchData) {
|
membersEquals(old: GatewayGuildMemberUpdateDispatchData, member: GatewayGuildMemberUpdateDispatchData) {
|
||||||
return (
|
return (
|
||||||
|
old.guild_id === member.guild_id &&
|
||||||
old.joined_at === member.joined_at &&
|
old.joined_at === member.joined_at &&
|
||||||
old.nick === member.nick &&
|
old.nick === member.nick &&
|
||||||
old.avatar === member.avatar &&
|
old.avatar === member.avatar &&
|
||||||
|
@ -33,7 +33,6 @@ export class PresenceUpdateHandler {
|
|||||||
|
|
||||||
presenceEquals(oldPresence: GatewayPresenceUpdateDispatchData, newPresence: GatewayPresenceUpdateDispatchData) {
|
presenceEquals(oldPresence: GatewayPresenceUpdateDispatchData, newPresence: GatewayPresenceUpdateDispatchData) {
|
||||||
return (
|
return (
|
||||||
newPresence &&
|
|
||||||
oldPresence.status === newPresence.status &&
|
oldPresence.status === newPresence.status &&
|
||||||
oldPresence.activities?.length === newPresence.activities?.length &&
|
oldPresence.activities?.length === newPresence.activities?.length &&
|
||||||
oldPresence.activities?.every((activity, index) =>
|
oldPresence.activities?.every((activity, index) =>
|
||||||
|
@ -4,7 +4,7 @@ import type {
|
|||||||
GatewayIntentBits,
|
GatewayIntentBits,
|
||||||
GatewayPresenceUpdateData,
|
GatewayPresenceUpdateData,
|
||||||
} from 'discord-api-types/v10';
|
} from 'discord-api-types/v10';
|
||||||
import type { Logger } from '../../common';
|
import type { Awaitable, DeepPartial, Logger } from '../../common';
|
||||||
import type { IdentifyProperties } from '../constants';
|
import type { IdentifyProperties } from '../constants';
|
||||||
|
|
||||||
export interface ShardManagerOptions extends ShardDetails {
|
export interface ShardManagerOptions extends ShardDetails {
|
||||||
@ -38,8 +38,15 @@ export interface ShardManagerOptions extends ShardDetails {
|
|||||||
compress?: boolean;
|
compress?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handlePayload'> {
|
export interface CustomManagerAdapter {
|
||||||
mode: 'threads' | 'clusters';
|
postMessage(workerId: number, body: unknown): Awaitable<unknown>;
|
||||||
|
spawn(workerData: WorkerData, env: Record<string, any>): Awaitable<unknown>;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handlePayload' | 'properties'> {
|
||||||
|
mode: 'threads' | 'clusters' | 'custom';
|
||||||
|
|
||||||
|
adapter?: CustomManagerAdapter;
|
||||||
|
|
||||||
workers?: number;
|
workers?: number;
|
||||||
|
|
||||||
@ -53,6 +60,8 @@ export interface WorkerManagerOptions extends Omit<ShardManagerOptions, 'handleP
|
|||||||
path: string;
|
path: string;
|
||||||
|
|
||||||
handlePayload(shardId: number, workerId: number, packet: GatewayDispatchPayload): unknown;
|
handlePayload(shardId: number, workerId: number, packet: GatewayDispatchPayload): unknown;
|
||||||
|
|
||||||
|
properties?: DeepPartial<NonNullable<ShardManagerOptions['properties']>>;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ShardData {
|
export interface ShardData {
|
||||||
@ -117,6 +126,8 @@ export interface WorkerData {
|
|||||||
token: string;
|
token: string;
|
||||||
path: string;
|
path: string;
|
||||||
shards: number[];
|
shards: number[];
|
||||||
|
totalShards: number;
|
||||||
|
mode: 'custom' | 'clusters' | 'threads';
|
||||||
workerId: number;
|
workerId: number;
|
||||||
debug: boolean;
|
debug: boolean;
|
||||||
workerProxy: boolean;
|
workerProxy: boolean;
|
||||||
|
@ -5,25 +5,21 @@ import { ApiHandler, Logger, Router } from '../..';
|
|||||||
import { MemoryAdapter, type Adapter } from '../../cache';
|
import { MemoryAdapter, type Adapter } from '../../cache';
|
||||||
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
|
import { BaseClient, type InternalRuntimeConfig } from '../../client/base';
|
||||||
import { MergeOptions, lazyLoadPackage, type MakePartial } from '../../common';
|
import { MergeOptions, lazyLoadPackage, type MakePartial } from '../../common';
|
||||||
import { WorkerManagerDefaults } from '../constants';
|
import { WorkerManagerDefaults, properties } from '../constants';
|
||||||
import { DynamicBucket } from '../structures';
|
import { DynamicBucket } from '../structures';
|
||||||
import { ConnectQueue } from '../structures/timeout';
|
import { ConnectQueue } from '../structures/timeout';
|
||||||
import { MemberUpdateHandler } from './events/memberUpdate';
|
|
||||||
import { PresenceUpdateHandler } from './events/presenceUpdate';
|
|
||||||
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
|
import type { ShardOptions, WorkerData, WorkerManagerOptions } from './shared';
|
||||||
import type { WorkerInfo, WorkerMessage, WorkerShardInfo, WorkerStart } from './worker';
|
import type { WorkerInfo, WorkerMessage, WorkerShardInfo } from './worker';
|
||||||
|
|
||||||
export class WorkerManager extends Map<
|
export class WorkerManager extends Map<
|
||||||
number,
|
number,
|
||||||
(ClusterWorker | import('node:worker_threads').Worker) & { ready?: boolean }
|
(ClusterWorker | import('node:worker_threads').Worker | { ready: boolean }) & { ready?: boolean }
|
||||||
> {
|
> {
|
||||||
options!: Required<WorkerManagerOptions>;
|
options!: MakePartial<Required<WorkerManagerOptions>, 'adapter'>;
|
||||||
debugger?: Logger;
|
debugger?: Logger;
|
||||||
connectQueue!: ConnectQueue;
|
connectQueue!: ConnectQueue;
|
||||||
cacheAdapter: Adapter;
|
cacheAdapter: Adapter;
|
||||||
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
|
promises = new Map<string, { resolve: (value: any) => void; timeout: NodeJS.Timeout }>();
|
||||||
memberUpdateHandler = new MemberUpdateHandler();
|
|
||||||
presenceUpdateHandler = new PresenceUpdateHandler();
|
|
||||||
rest!: ApiHandler;
|
rest!: ApiHandler;
|
||||||
constructor(options: MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info' | 'handlePayload'>) {
|
constructor(options: MakePartial<WorkerManagerOptions, 'token' | 'intents' | 'info' | 'handlePayload'>) {
|
||||||
super();
|
super();
|
||||||
@ -128,10 +124,16 @@ export class WorkerManager extends Map<
|
|||||||
case 'threads':
|
case 'threads':
|
||||||
(worker as import('worker_threads').Worker).postMessage(body);
|
(worker as import('worker_threads').Worker).postMessage(body);
|
||||||
break;
|
break;
|
||||||
|
case 'custom':
|
||||||
|
this.options.adapter!.postMessage(id, body);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async prepareWorkers(shards: number[][]) {
|
async prepareWorkers(shards: number[][]) {
|
||||||
|
const worker_threads = lazyLoadPackage<typeof import('node:worker_threads')>('node:worker_threads');
|
||||||
|
if (!worker_threads) throw new Error('Cannot prepare workers without worker_threads.');
|
||||||
|
|
||||||
for (let i = 0; i < shards.length; i++) {
|
for (let i = 0; i < shards.length; i++) {
|
||||||
let worker = this.get(i);
|
let worker = this.get(i);
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
@ -143,23 +145,11 @@ export class WorkerManager extends Map<
|
|||||||
intents: this.options.intents,
|
intents: this.options.intents,
|
||||||
workerId: i,
|
workerId: i,
|
||||||
workerProxy: this.options.workerProxy,
|
workerProxy: this.options.workerProxy,
|
||||||
|
totalShards: this.totalShards,
|
||||||
|
mode: this.options.mode,
|
||||||
});
|
});
|
||||||
this.set(i, worker);
|
this.set(i, worker);
|
||||||
}
|
}
|
||||||
const listener = (message: WorkerStart) => {
|
|
||||||
if (message.type !== 'WORKER_START') return;
|
|
||||||
worker!.removeListener('message', listener);
|
|
||||||
this.postMessage(i, {
|
|
||||||
type: 'SPAWN_SHARDS',
|
|
||||||
compress: this.options.compress ?? false,
|
|
||||||
info: {
|
|
||||||
...this.options.info,
|
|
||||||
shards: this.totalShards,
|
|
||||||
},
|
|
||||||
properties: this.options.properties,
|
|
||||||
} satisfies ManagerSpawnShards);
|
|
||||||
};
|
|
||||||
worker.on('message', listener);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -188,12 +178,17 @@ export class WorkerManager extends Map<
|
|||||||
worker.on('message', data => this.handleWorkerMessage(data));
|
worker.on('message', data => this.handleWorkerMessage(data));
|
||||||
return worker;
|
return worker;
|
||||||
}
|
}
|
||||||
|
case 'custom':
|
||||||
|
this.options.adapter!.spawn(workerData, env);
|
||||||
|
return {
|
||||||
|
ready: false,
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
spawn(workerId: number, shardId: number) {
|
spawn(workerId: number, shardId: number) {
|
||||||
this.connectQueue.push(() => {
|
this.connectQueue.push(() => {
|
||||||
const worker = this.get(workerId);
|
const worker = this.has(workerId);
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
this.debugger?.fatal("Trying spawn with worker doesn't exist");
|
this.debugger?.fatal("Trying spawn with worker doesn't exist");
|
||||||
return;
|
return;
|
||||||
@ -208,12 +203,28 @@ export class WorkerManager extends Map<
|
|||||||
|
|
||||||
async handleWorkerMessage(message: WorkerMessage) {
|
async handleWorkerMessage(message: WorkerMessage) {
|
||||||
switch (message.type) {
|
switch (message.type) {
|
||||||
|
case 'WORKER_START':
|
||||||
|
{
|
||||||
|
this.postMessage(message.workerId, {
|
||||||
|
type: 'SPAWN_SHARDS',
|
||||||
|
compress: this.options.compress ?? false,
|
||||||
|
info: {
|
||||||
|
...this.options.info,
|
||||||
|
shards: this.totalShards,
|
||||||
|
},
|
||||||
|
properties: {
|
||||||
|
...properties,
|
||||||
|
...this.options.properties,
|
||||||
|
},
|
||||||
|
} satisfies ManagerSpawnShards);
|
||||||
|
}
|
||||||
|
break;
|
||||||
case 'CONNECT_QUEUE':
|
case 'CONNECT_QUEUE':
|
||||||
this.spawn(message.workerId, message.shardId);
|
this.spawn(message.workerId, message.shardId);
|
||||||
break;
|
break;
|
||||||
case 'CACHE_REQUEST':
|
case 'CACHE_REQUEST':
|
||||||
{
|
{
|
||||||
const worker = this.get(message.workerId);
|
const worker = this.has(message.workerId);
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
throw new Error('Invalid request from unavailable worker');
|
throw new Error('Invalid request from unavailable worker');
|
||||||
}
|
}
|
||||||
@ -227,21 +238,7 @@ export class WorkerManager extends Map<
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case 'RECEIVE_PAYLOAD':
|
case 'RECEIVE_PAYLOAD':
|
||||||
{
|
await this.options.handlePayload(message.shardId, message.workerId, message.payload);
|
||||||
switch (message.payload.t) {
|
|
||||||
case 'GUILD_MEMBER_UPDATE':
|
|
||||||
if (!this.memberUpdateHandler.check(message.payload.d)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'PRESENCE_UPDATE':
|
|
||||||
if (!this.presenceUpdateHandler.check(message.payload.d)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
this.options.handlePayload(message.shardId, message.workerId, message.payload);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
case 'RESULT_PAYLOAD':
|
case 'RESULT_PAYLOAD':
|
||||||
{
|
{
|
||||||
@ -353,7 +350,7 @@ export class WorkerManager extends Map<
|
|||||||
|
|
||||||
async send(data: GatewaySendPayload, shardId: number) {
|
async send(data: GatewaySendPayload, shardId: number) {
|
||||||
const workerId = this.calculateWorkerId(shardId);
|
const workerId = this.calculateWorkerId(shardId);
|
||||||
const worker = this.get(workerId);
|
const worker = this.has(workerId);
|
||||||
|
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
throw new Error(`Worker #${workerId} doesnt exist`);
|
throw new Error(`Worker #${workerId} doesnt exist`);
|
||||||
@ -373,7 +370,7 @@ export class WorkerManager extends Map<
|
|||||||
|
|
||||||
async getShardInfo(shardId: number) {
|
async getShardInfo(shardId: number) {
|
||||||
const workerId = this.calculateWorkerId(shardId);
|
const workerId = this.calculateWorkerId(shardId);
|
||||||
const worker = this.get(workerId);
|
const worker = this.has(workerId);
|
||||||
|
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
throw new Error(`Worker #${workerId} doesnt exist`);
|
throw new Error(`Worker #${workerId} doesnt exist`);
|
||||||
@ -387,7 +384,7 @@ export class WorkerManager extends Map<
|
|||||||
}
|
}
|
||||||
|
|
||||||
async getWorkerInfo(workerId: number) {
|
async getWorkerInfo(workerId: number) {
|
||||||
const worker = this.get(workerId);
|
const worker = this.has(workerId);
|
||||||
|
|
||||||
if (!worker) {
|
if (!worker) {
|
||||||
throw new Error(`Worker #${workerId} doesnt exist`);
|
throw new Error(`Worker #${workerId} doesnt exist`);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user