fix: gateway events (#305)

* fix: gateway events

* fix: oops

* fix: num of shards

* fix: xd

* fix: ok discord

* fix: worker_ready event

* fix: types & use message cache

* fix: resharding
This commit is contained in:
MARCROCK22 2024-12-12 23:46:20 -04:00 committed by GitHub
parent bad33536b9
commit c014ce6dc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 149 additions and 145 deletions

2
src/cache/index.ts vendored
View File

@ -468,9 +468,11 @@ export class Cache {
break;
case 'GUILD_CREATE':
case 'GUILD_UPDATE':
case 'RAW_GUILD_CREATE':
await this.guilds?.patch(event.d.id, { unavailable: false, ...event.d });
break;
case 'GUILD_DELETE':
case 'RAW_GUILD_DELETE':
if (event.d.unavailable) {
await this.guilds?.patch(event.d.id, event.d);
} else {

View File

@ -7,7 +7,6 @@ import {
type WatcherPayload,
type WatcherSendToShard,
assertString,
hasIntent,
lazyLoadPackage,
} from '../common';
import { EventHandler } from '../events';
@ -23,7 +22,6 @@ import { type ClientUserStructure, type MessageStructure, Transformers } from '.
let parentPort: import('node:worker_threads').MessagePort;
export class Client<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: string[];
gateway!: ShardManager;
me!: If<Ready, ClientUserStructure>;
declare options: Omit<ClientOptions, 'commands'> & {
@ -155,21 +153,6 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
await this.events.execute(packet, this as Client<true>, shardId);
}
break;
case 'GUILD_DELETE':
case 'GUILD_CREATE': {
if (this.__handleGuilds?.includes(packet.d.id)) {
this.__handleGuilds?.splice(this.__handleGuilds!.indexOf(packet.d.id), 1);
if (!this.__handleGuilds?.length && [...this.gateway.values()].every(shard => shard.data.session_id)) {
delete this.__handleGuilds;
await this.cache.onPacket(packet);
return this.events.runEvent('BOT_READY', this, this.me, -1);
}
if (!this.__handleGuilds?.length) delete this.__handleGuilds;
return this.cache.onPacket(packet);
}
await this.events.execute(packet, this as Client<true>, shardId);
break;
}
//rest of the events
default: {
switch (packet.t) {
@ -186,23 +169,21 @@ export class Client<Ready extends boolean = boolean> extends BaseClient {
}
break;
case 'READY': {
const ids = packet.d.guilds.map(x => x.id);
if (hasIntent(this.gateway.options.intents, 'Guilds')) {
this.__handleGuilds = this.__handleGuilds?.concat(ids) ?? ids;
}
this.botId = packet.d.user.id;
this.applicationId = packet.d.application.id;
this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never;
if (!this.__handleGuilds?.length) {
if ([...this.gateway.values()].every(shard => shard.data.session_id)) {
await this.events.runEvent('BOT_READY', this, this.me, -1);
}
delete this.__handleGuilds;
}
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
await this.events.execute(packet, this as Client<true>, shardId);
break;
}
case 'GUILDS_READY':
{
await this.events.execute(packet, this as Client<true>, shardId);
if ([...this.gateway.values()].every(shard => shard.isReady)) {
await this.events.runEvent('BOT_READY', this, this.me, -1);
}
}
break;
default:
await this.events.execute(packet, this as Client<true>, shardId);
break;

View File

@ -1,7 +1,7 @@
import { type UUID, randomUUID } from 'node:crypto';
import { ApiHandler, Logger } from '..';
import { WorkerAdapter } from '../cache';
import { type DeepPartial, LogLevels, type MakeRequired, type When, hasIntent, lazyLoadPackage } from '../common';
import { type DeepPartial, LogLevels, type MakeRequired, type When, lazyLoadPackage } from '../common';
import { EventHandler } from '../events';
import type { GatewayDispatchPayload, GatewaySendPayload } from '../types';
import { Shard, type ShardManagerOptions, type WorkerData, properties } from '../websocket';
@ -57,9 +57,6 @@ try {
}
export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
private __handleGuilds?: string[];
private __handleGuildsResharding?: string[];
memberUpdateHandler = new MemberUpdateHandler();
presenceUpdateHandler = new PresenceUpdateHandler();
collectors = new Collectors();
@ -69,7 +66,6 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
shards = new Map<number, Shard>();
resharding = new Map<number, Shard>();
private _ready?: boolean;
declare options: WorkerClientOptions;
@ -150,15 +146,12 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
type: workerData.resharding ? 'WORKER_START_RESHARDING' : 'WORKER_START',
workerId: workerData.workerId,
} satisfies WorkerStart | WorkerStartResharding);
if (workerData.resharding) {
this.__handleGuildsResharding = [];
}
await super.start(options);
await this.loadEvents(options.eventsDir);
}
async loadEvents(dir?: string) {
dir ??= await this.getRC().then(x => ('events' in x.locations ? x.locations.events : undefined));
dir ??= await this.getRC().then(x => x.locations.events);
if (dir) {
await this.events.load(dir);
this.logger.info('EventHandler loaded');
@ -243,30 +236,9 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
...this.options.gateway?.properties,
},
handlePayload(_, payload) {
if (payload.t === 'GUILD_CREATE' || payload.t === 'GUILD_DELETE') {
const indexOf = self.__handleGuildsResharding!.indexOf(payload.d.id);
if (indexOf !== -1) {
self.__handleGuildsResharding!.splice(indexOf, 1);
if (!self.__handleGuildsResharding?.length && shardsConnected === workerData.shards.length) {
delete self.__handleGuildsResharding;
self.postMessage({
type: 'WORKER_READY_RESHARDING',
workerId: workerData.workerId,
} satisfies WorkerReadyResharding);
}
}
}
if (payload.t !== 'READY') return;
shardsConnected++;
const ids = payload.d.guilds.map(x => x.id);
if (hasIntent(workerData.intents, 'Guilds')) {
self.__handleGuildsResharding = self.__handleGuildsResharding?.concat(ids) ?? ids;
}
if (shardsConnected === workerData.shards.length && !self.__handleGuildsResharding?.length) {
delete self.__handleGuildsResharding;
if (payload.t === 'READY') {
shardsConnected++;
} else if (payload.t === 'GUILDS_READY' && shardsConnected === workerData.shards.length) {
self.postMessage({
type: 'WORKER_READY_RESHARDING',
workerId: workerData.workerId,
@ -370,7 +342,6 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
break;
case 'WORKER_ALREADY_EXISTS_RESHARDING':
{
this.__handleGuildsResharding = [];
this.postMessage({
type: 'WORKER_START_RESHARDING',
workerId: workerData.workerId,
@ -510,25 +481,6 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
await this.events.execute(packet, this as WorkerClient<true>, shardId);
}
break;
case 'GUILD_DELETE':
case 'GUILD_CREATE': {
if (this.__handleGuilds?.includes(packet.d.id)) {
this.__handleGuilds?.splice(this.__handleGuilds!.indexOf(packet.d.id), 1);
if (!this.__handleGuilds?.length && [...this.shards.values()].every(shard => shard.data.session_id)) {
delete this.__handleGuilds;
await this.cache.onPacket(packet);
this.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerReady);
return this.events.runEvent('WORKER_READY', this, this.me, -1);
}
if (!this.__handleGuilds?.length) delete this.__handleGuilds;
return this.cache.onPacket(packet);
}
await this.events.execute(packet, this, shardId);
break;
}
default: {
switch (packet.t) {
case 'INTERACTION_CREATE':
@ -543,35 +495,31 @@ export class WorkerClient<Ready extends boolean = boolean> extends BaseClient {
await this.handleCommand.message(packet.d, shardId);
}
break;
case 'READY':
case 'READY': {
this.botId = packet.d.user.id;
this.applicationId = packet.d.application.id;
this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never;
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
this.postMessage({
type: 'WORKER_SHARDS_CONNECTED',
workerId: this.workerId,
} as WorkerShardsConnected);
await this.events.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1);
}
await this.events.execute(packet, this, shardId);
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
break;
}
case 'GUILDS_READY':
{
const ids = packet.d.guilds.map(x => x.id);
if (hasIntent(this.workerData.intents, 'Guilds')) {
this.__handleGuilds = this.__handleGuilds?.concat(ids) ?? ids;
}
this.botId = packet.d.user.id;
this.applicationId = packet.d.application.id;
this.me = Transformers.ClientUser(this, packet.d.user, packet.d.application) as never;
await this.events.execute(packet, this, shardId);
if (!this._ready && [...this.shards.values()].every(shard => shard.data.session_id)) {
this._ready = true;
if ([...this.shards.values()].every(shard => shard.isReady)) {
this.postMessage({
type: 'WORKER_SHARDS_CONNECTED',
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerShardsConnected);
await this.events.runEvent('WORKER_SHARDS_CONNECTED', this, this.me, -1);
} as WorkerReady);
await this.events.runEvent('WORKER_READY', this, this.me, -1);
}
if (!this.__handleGuilds?.length) {
if ([...this.shards.values()].every(shard => shard.data.session_id)) {
this.postMessage({
type: 'WORKER_READY',
workerId: this.workerId,
} as WorkerReady);
await this.events.runEvent('WORKER_READY', this, this.me, -1);
}
delete this.__handleGuilds;
}
this.debugger?.debug(`#${shardId}[${packet.d.user.username}](${this.botId}) is online...`);
await this.events.execute(packet, this, shardId);
}
break;
default:

View File

@ -131,7 +131,7 @@ export class GuildShorter extends BaseShorter {
let channel: APIChannel | ReturnType<Channels['get']>;
if (!force) {
channel = await this.client.cache.channels?.get(channelId);
if (channel) return channel;
if (channel) return channel as ReturnType<typeof channelFrom>;
}
channel = await this.client.proxy.channels(channelId).get();

View File

@ -70,7 +70,12 @@ export class MessageShorter extends BaseShorter {
});
}
fetch(messageId: string, channelId: string) {
async fetch(messageId: string, channelId: string, force = false) {
if (!force) {
const message = await this.client.cache.messages?.get(messageId);
if (message) return message;
}
return this.client.proxy
.channels(channelId)
.messages(messageId)

View File

@ -1,5 +1,6 @@
import type { ClientUserStructure } from '../../client/transformers';
import { type ClientUserStructure, Transformers } from '../../client/transformers';
import type { UsingClient } from '../../commands';
import type { GatewayRawGuildCreateDispatch, GatewayRawGuildDeleteDispatch } from '../../types';
export const BOT_READY = (_self: UsingClient, me: ClientUserStructure) => {
return me;
@ -12,3 +13,15 @@ export const WORKER_READY = (_self: UsingClient, me: ClientUserStructure) => {
export const WORKER_SHARDS_CONNECTED = (_self: UsingClient, me: ClientUserStructure) => {
return me;
};
export const RAW_GUILD_CREATE = (self: UsingClient, data: GatewayRawGuildCreateDispatch['d']) => {
return Transformers.Guild<'create'>(self, data);
};
export const RAW_GUILD_DELETE = async (self: UsingClient, data: GatewayRawGuildDeleteDispatch['d']) => {
return (await self.cache.guilds?.get(data.id)) ?? data;
};
export const GUILDS_READY = () => {
return;
};

View File

@ -125,8 +125,8 @@ export class Message extends BaseMessage {
super(client, data);
}
fetch() {
return this.client.messages.fetch(this.id, this.channelId);
fetch(force = false) {
return this.client.messages.fetch(this.id, this.channelId, force);
}
reply(body: Omit<MessageCreateBodyRequest, 'message_reference'>, fail = true) {

View File

@ -271,7 +271,7 @@ export class MessagesMethods extends DiscordBase {
crosspost: (messageId: string, reason?: string) =>
ctx.client.messages.crosspost(messageId, ctx.channelId, reason),
delete: (messageId: string, reason?: string) => ctx.client.messages.delete(messageId, ctx.channelId, reason),
fetch: (messageId: string) => ctx.client.messages.fetch(messageId, ctx.channelId),
fetch: (messageId: string, force = false) => ctx.client.messages.fetch(messageId, ctx.channelId, force),
purge: (messages: string[], reason?: string) => ctx.client.messages.purge(messages, ctx.channelId, reason),
list: (fetchOptions: RESTGetAPIChannelMessagesQuery) => ctx.client.messages.list(ctx.channelId, fetchOptions),
};

View File

@ -1,5 +1,6 @@
// https://github.com/discordjs/discord-api-types/blob/main/gateway/v10.ts
import type { OmitInsert } from '../common';
import type { ChannelType, GatewayDispatchEvents, GatewayOpcodes, Snowflake } from './index';
import type { GatewayPresenceUpdate } from './payloads/gateway';
import type {
@ -81,7 +82,10 @@ export type GatewayDispatchPayload =
| GatewayGuildAuditLogEntryCreateDispatch
| GatewayGuildBanModifyDispatch
| GatewayGuildCreateDispatch
| GatewayRawGuildCreateDispatch
| GatewayGuildDeleteDispatch
| GatewayRawGuildDeleteDispatch
| GatewayGuildsReadyDispatch
| GatewayGuildEmojisUpdateDispatch
| GatewayGuildIntegrationsUpdateDispatch
| GatewayGuildMemberAddDispatch
@ -531,6 +535,12 @@ export type GatewayGuildModifyDispatchData = APIGuild;
*/
export type GatewayGuildCreateDispatch = DataPayload<GatewayDispatchEvents.GuildCreate, GatewayGuildCreateDispatchData>;
export type GatewayRawGuildCreateDispatch = OmitInsert<
GatewayGuildCreateDispatch,
't',
{ t: GatewayDispatchEvents.RawGuildCreate }
>;
/**
* https://discord.com/developers/docs/topics/gateway-events#guild-create
* https://discord.com/developers/docs/topics/gateway-events#guild-create-guild-create-extra-fields
@ -639,6 +649,18 @@ export type GatewayGuildUpdateDispatchData = GatewayGuildModifyDispatchData;
*/
export type GatewayGuildDeleteDispatch = DataPayload<GatewayDispatchEvents.GuildDelete, GatewayGuildDeleteDispatchData>;
export type GatewayRawGuildDeleteDispatch = OmitInsert<
GatewayGuildDeleteDispatch,
't',
{ t: GatewayDispatchEvents.RawGuildDelete }
>;
export type GatewayGuildsReadyDispatch = OmitInsert<
GatewayReadyDispatch,
't' | 'd',
{ t: GatewayDispatchEvents.GuildsReady; d?: never }
>;
/**
* https://discord.com/developers/docs/topics/gateway-events#guild-delete
*/

View File

@ -325,7 +325,10 @@ export enum GatewayDispatchEvents {
GuildBanAdd = 'GUILD_BAN_ADD',
GuildBanRemove = 'GUILD_BAN_REMOVE',
GuildCreate = 'GUILD_CREATE',
RawGuildCreate = 'RAW_GUILD_CREATE',
GuildDelete = 'GUILD_DELETE',
RawGuildDelete = 'RAW_GUILD_DELETE',
GuildsReady = 'GUILDS_READY',
GuildEmojisUpdate = 'GUILD_EMOJIS_UPDATE',
GuildIntegrationsUpdate = 'GUILD_INTEGRATIONS_UPDATE',
GuildMemberAdd = 'GUILD_MEMBER_ADD',

View File

@ -1,5 +1,5 @@
import { inflateSync } from 'node:zlib';
import { LogLevels, Logger, type MakeRequired, MergeOptions } from '../../common';
import { LogLevels, Logger, type MakeRequired, MergeOptions, hasIntent } from '../../common';
import {
GatewayCloseCodes,
GatewayDispatchEvents,
@ -39,8 +39,9 @@ export class Shard {
bucket: DynamicBucket;
offlineSendQueue: ((_?: unknown) => void)[] = [];
pendingGuilds = new Set<string>();
options: MakeRequired<ShardOptions, 'properties' | 'ratelimitOptions'>;
isReady = false;
constructor(
public id: number,
@ -175,13 +176,13 @@ export class Shard {
});
}
async heartbeat(requested: boolean) {
heartbeat(requested: boolean) {
this.debugger?.debug(
`[Shard #${this.id}] Sending ${requested ? '' : 'un'}requested heartbeat (Ack=${this.heart.ack})`,
);
if (!requested) {
if (!this.heart.ack) {
await this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection');
this.close(ShardSocketCloseCodes.ZombiedConnection, 'Zombied connection');
return;
}
this.heart.ack = false;
@ -197,18 +198,18 @@ export class Shard {
);
}
async disconnect() {
disconnect() {
this.debugger?.info(`[Shard #${this.id}] Disconnecting`);
await this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request');
this.close(ShardSocketCloseCodes.Shutdown, 'Shard down request');
}
async reconnect() {
this.debugger?.info(`[Shard #${this.id}] Reconnecting`);
await this.disconnect();
this.disconnect();
await this.connect();
}
async onpacket(packet: GatewayReceivePayload) {
onpacket(packet: GatewayReceivePayload) {
if (packet.s !== null) {
this.data.resume_seq = packet.s;
}
@ -216,21 +217,19 @@ export class Shard {
this.debugger?.debug(`[Shard #${this.id}]`, packet.t ? packet.t : GatewayOpcodes[packet.op], this.data.resume_seq);
switch (packet.op) {
case GatewayOpcodes.Hello:
{
clearInterval(this.heart.nodeInterval);
case GatewayOpcodes.Hello: {
clearInterval(this.heart.nodeInterval);
this.heart.interval = packet.d.heartbeat_interval;
this.heart.interval = packet.d.heartbeat_interval;
await this.heartbeat(false);
this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval);
this.heartbeat(false);
this.heart.nodeInterval = setInterval(() => this.heartbeat(false), this.heart.interval);
if (this.resumable) {
return this.resume();
}
await this.identify();
if (this.resumable) {
return this.resume();
}
break;
return this.identify();
}
case GatewayOpcodes.HeartbeatAck:
{
this.heart.ack = true;
@ -241,36 +240,66 @@ export class Shard {
this.heartbeat(true);
break;
case GatewayOpcodes.Reconnect:
await this.reconnect();
break;
case GatewayOpcodes.InvalidSession:
return this.reconnect();
case GatewayOpcodes.InvalidSession: {
if (packet.d) {
if (!this.resumable) {
return this.logger.fatal('This is a completely unexpected error message.');
}
await this.resume();
} else {
this.data.resume_seq = 0;
this.data.session_id = undefined;
await this.identify();
return this.resume();
}
break;
this.data.resume_seq = 0;
this.data.session_id = undefined;
return this.identify();
}
case GatewayOpcodes.Dispatch:
{
switch (packet.t) {
case GatewayDispatchEvents.Resumed:
{
this.offlineSendQueue.map((resolve: () => any) => resolve());
this.isReady = true;
this.offlineSendQueue.map(resolve => resolve());
this.options.handlePayload(this.id, packet);
}
break;
case GatewayDispatchEvents.Ready: {
if (hasIntent(this.options.intents, 'Guilds')) {
for (let i = 0; i < packet.d.guilds.length; i++) {
this.pendingGuilds.add(packet.d.guilds.at(i)!.id);
}
}
this.data.resume_gateway_url = packet.d.resume_gateway_url;
this.data.session_id = packet.d.session_id;
this.offlineSendQueue.map((resolve: () => any) => resolve());
this.offlineSendQueue.map(resolve => resolve());
this.options.handlePayload(this.id, packet);
if (this.pendingGuilds.size === 0) {
this.isReady = true;
this.options.handlePayload(this.id, {
t: GatewayDispatchEvents.GuildsReady,
op: packet.op,
s: packet.s,
});
}
break;
}
case GatewayDispatchEvents.GuildCreate:
case GatewayDispatchEvents.GuildDelete:
if (this.pendingGuilds.delete(packet.d.id)) {
(packet as any).t = `RAW_${packet.t}`;
this.options.handlePayload(this.id, packet);
if (this.pendingGuilds.size === 0) {
this.isReady = true;
this.options.handlePayload(this.id, {
t: GatewayDispatchEvents.GuildsReady,
op: packet.op,
s: packet.s,
});
}
} else {
this.options.handlePayload(this.id, packet);
}
break;
default:
this.options.handlePayload(this.id, packet);
break;
@ -281,6 +310,7 @@ export class Shard {
}
protected async handleClosed(close: { code: number; reason: string }) {
this.isReady = false;
clearInterval(this.heart.nodeInterval);
this.logger.warn(
`${ShardSocketCloseCodes[close.code] ?? GatewayCloseCodes[close.code] ?? close.code} (${close.code})`,
@ -332,7 +362,7 @@ export class Shard {
}
}
async close(code: number, reason: string) {
close(code: number, reason: string) {
clearInterval(this.heart.nodeInterval);
if (!this.isOpen) {
return this.debugger?.warn(`[Shard #${this.id}] Is not open, reason:`, reason);

View File

@ -28,7 +28,7 @@ export class WorkerManager extends Map<
},
logger?: Logger,
) {
logger?.info('Preparing buckets', options);
logger?.info('Preparing buckets');
const chunks = DynamicBucket.chunk<number>(
new Array(options.shardEnd - options.shardStart),
@ -391,7 +391,7 @@ export class WorkerManager extends Map<
case 'WORKER_READY':
{
this.get(message.workerId)!.ready = true;
if ([...this.values()].every(w => w.ready)) {
if (this.size === this.totalWorkers && [...this.values()].every(w => w.ready)) {
this.postMessage(this.keys().next().value!, {
type: 'BOT_READY',
} satisfies ManagerSendBotReady);
@ -614,7 +614,7 @@ export class WorkerManager extends Map<
`Percentage is not enough to reshard ${percentage}/${this.options.resharding.percentage}`,
);
this.debugger?.info('Starting resharding process');
this.debugger?.info(`Starting resharding process to ${info.shards}`);
this._info = info;
this.connectQueue.concurrency = info.session_start_limit.max_concurrency;