mirror of
https://github.com/tiramisulabs/seyfert.git
synced 2025-07-01 20:46:08 +00:00
153 lines
5.5 KiB
TypeScript
153 lines
5.5 KiB
TypeScript
import { DiscordGatewayPayload, DiscordHello, DiscordReady } from '../../types/discord.ts';
|
|
import { GatewayOpcodes } from '../../types/shared.ts';
|
|
import { createLeakyBucket } from '../../util/bucket.ts';
|
|
import { delay } from '../../util/delay.ts';
|
|
import { decompressWith } from './deps.ts';
|
|
import { GATEWAY_RATE_LIMIT_RESET_INTERVAL, Shard, ShardState } from './types.ts';
|
|
|
|
const decoder = new TextDecoder();
|
|
|
|
export async function handleMessage(shard: Shard, message_: MessageEvent<any>): Promise<void> {
|
|
let message = message_.data;
|
|
|
|
// If message compression is enabled,
|
|
// Discord might send zlib compressed payloads.
|
|
if (shard.gatewayConfig.compress && message instanceof Blob) {
|
|
message = decoder.decode(decompressWith(new Uint8Array(await message.arrayBuffer())));
|
|
}
|
|
|
|
// Safeguard incase decompression failed to make a string.
|
|
if (typeof message !== 'string') return;
|
|
|
|
const messageData = JSON.parse(message) as DiscordGatewayPayload;
|
|
// gateway.debug("GW RAW", { shardId, payload: messageData });
|
|
|
|
// TODO: remove
|
|
// console.log({ messageData: censor(messageData) });
|
|
|
|
switch (messageData.op) {
|
|
case GatewayOpcodes.Heartbeat: {
|
|
// TODO: can this actually happen
|
|
if (!shard.isOpen()) return;
|
|
|
|
shard.heart.lastBeat = Date.now();
|
|
// Discord randomly sends this requiring an immediate heartbeat back.
|
|
// Using a direct socket.send call here because heartbeat requests are reserved by us.
|
|
shard.socket?.send(
|
|
JSON.stringify({
|
|
op: GatewayOpcodes.Heartbeat,
|
|
d: shard.previousSequenceNumber,
|
|
}),
|
|
);
|
|
shard.events.heartbeat?.(shard);
|
|
|
|
break;
|
|
}
|
|
case GatewayOpcodes.Hello: {
|
|
const interval = (messageData.d as DiscordHello).heartbeat_interval;
|
|
|
|
shard.startHeartbeating(interval);
|
|
|
|
if (shard.state !== ShardState.Resuming) {
|
|
// HELLO has been send on a non resume action.
|
|
// This means that the shard starts a new session,
|
|
// therefore the rate limit interval has been reset too.
|
|
shard.bucket = createLeakyBucket({
|
|
max: shard.calculateSafeRequests(),
|
|
refillInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL,
|
|
refillAmount: shard.calculateSafeRequests(),
|
|
// Waiting acquires should not be lost on a re-identify.
|
|
waiting: shard.bucket.waiting,
|
|
});
|
|
}
|
|
|
|
shard.events.hello?.(shard);
|
|
|
|
break;
|
|
}
|
|
case GatewayOpcodes.HeartbeatACK: {
|
|
shard.heart.acknowledged = true;
|
|
shard.heart.lastAck = Date.now();
|
|
// Manually calculating the round trip time for users who need it.
|
|
if (shard.heart.lastBeat) {
|
|
shard.heart.rtt = shard.heart.lastAck - shard.heart.lastBeat;
|
|
}
|
|
|
|
shard.events.heartbeatAck?.(shard);
|
|
|
|
break;
|
|
}
|
|
case GatewayOpcodes.Reconnect: {
|
|
// gateway.debug("GW RECONNECT", { shardId });
|
|
|
|
shard.events.requestedReconnect?.(shard);
|
|
|
|
await shard.resume();
|
|
|
|
break;
|
|
}
|
|
case GatewayOpcodes.InvalidSession: {
|
|
// gateway.debug("GW INVALID_SESSION", { shardId, payload: messageData });
|
|
const resumable = messageData.d as boolean;
|
|
|
|
shard.events.invalidSession?.(shard, resumable);
|
|
|
|
// We need to wait for a random amount of time between 1 and 5
|
|
// Reference: https://discord.com/developers/docs/topics/gateway#resuming
|
|
await delay(Math.floor((Math.random() * 4 + 1) * 1000));
|
|
|
|
shard.resolves.get('INVALID_SESSION')?.(messageData);
|
|
shard.resolves.delete('INVALID_SESSION');
|
|
|
|
// When resumable is false we need to re-identify
|
|
if (!resumable) {
|
|
await shard.identify();
|
|
|
|
break;
|
|
}
|
|
|
|
// The session is invalid but apparently it is resumable
|
|
await shard.resume();
|
|
|
|
break;
|
|
}
|
|
}
|
|
|
|
if (messageData.t === 'RESUMED') {
|
|
// gateway.debug("GW RESUMED", { shardId });
|
|
|
|
shard.state = ShardState.Connected;
|
|
shard.events.resumed?.(shard);
|
|
|
|
// Continue the requests which have been queued since the shard went offline.
|
|
shard.offlineSendQueue.map((resolve) => resolve());
|
|
|
|
shard.resolves.get('RESUMED')?.(messageData);
|
|
shard.resolves.delete('RESUMED');
|
|
} // Important for future resumes.
|
|
else if (messageData.t === 'READY') {
|
|
const payload = messageData.d as DiscordReady;
|
|
|
|
shard.sessionId = payload.session_id;
|
|
shard.state = ShardState.Connected;
|
|
|
|
// Continue the requests which have been queued since the shard went offline.
|
|
// Important when this is a re-identify
|
|
shard.offlineSendQueue.map((resolve) => resolve());
|
|
|
|
shard.resolves.get('READY')?.(messageData);
|
|
shard.resolves.delete('READY');
|
|
}
|
|
|
|
// Update the sequence number if it is present
|
|
// `s` can be either `null` or a `number`.
|
|
// In order to prevent update misses when `s` is `0` we check against null.
|
|
if (messageData.s !== null) {
|
|
shard.previousSequenceNumber = messageData.s;
|
|
}
|
|
|
|
// The necessary handling required for the Shards connection has been finished.
|
|
// Now the event can be safely forwarded.
|
|
shard.events.message?.(shard, messageData);
|
|
}
|