diff --git a/packages/api-types/package.json b/packages/api-types/package.json index 25b7246..230be2b 100644 --- a/packages/api-types/package.json +++ b/packages/api-types/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/api-types", - "version": "1.0.1", + "version": "1.0.2", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/cache/package.json b/packages/cache/package.json index 01e8cc5..c0305a1 100644 --- a/packages/cache/package.json +++ b/packages/cache/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/cache", - "version": "1.0.1", + "version": "1.0.2", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/core/package.json b/packages/core/package.json index 21dd42c..10fb124 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/core", - "version": "1.0.1", + "version": "1.0.2", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/core/src/biscuit.ts b/packages/core/src/biscuit.ts index 30f596e..64ec0ca 100644 --- a/packages/core/src/biscuit.ts +++ b/packages/core/src/biscuit.ts @@ -2,7 +2,7 @@ import type { AtLeastOne, ApplicationCommandPermissionTypes, DiscordApplicationCommand, - DiscordGatewayPayload, + // DiscordGatewayPayload, DiscordGuildApplicationCommandPermissions, DiscordUser, DiscordApplicationCommandOption, @@ -210,8 +210,8 @@ export class Session { // makeWs const defHandler: DiscordRawEventHandler = (shard, event) => { - let message = event.data; - let data = JSON.parse(message) as DiscordGatewayPayload; + let data = event as any; + // let data = JSON.parse(message) as DiscordGatewayPayload; Actions.raw(this, shard.id, data); diff --git a/packages/rest/package.json b/packages/rest/package.json index 0d0f900..eda90e9 100644 --- a/packages/rest/package.json +++ b/packages/rest/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/rest", - "version": "1.0.1", + "version": "1.0.2", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/ws/package.json b/packages/ws/package.json index c9f9e6f..649f103 100644 --- a/packages/ws/package.json +++ b/packages/ws/package.json @@ -1,6 +1,6 @@ { "name": "@biscuitland/ws", - "version": "1.0.1", + "version": "1.0.2", "main": "./dist/index.js", "module": "./dist/index.mjs", "types": "./dist/index.d.ts", diff --git a/packages/ws/src/services/shard.ts b/packages/ws/src/services/shard.ts index 46cc5a1..7e1b5e8 100644 --- a/packages/ws/src/services/shard.ts +++ b/packages/ws/src/services/shard.ts @@ -1,5 +1,7 @@ import type { DiscordGatewayPayload, + DiscordHello, + DiscordReady, PickPartial, } from '@biscuitland/api-types'; import type { LeakyBucket } from '../utils/bucket-util'; @@ -13,7 +15,7 @@ import { } from '@biscuitland/api-types'; import WebSocket from 'ws'; -import { checkOffline } from '../utils/shard-util'; +import { inflateSync } from 'node:zlib'; export const DEFAULT_HEARTBEAT_INTERVAL = 45000; @@ -26,6 +28,9 @@ export type PickOptions = Pick< > & Partial; +const decoder = new TextDecoder(); + + export interface ShardOptions { /** Id of the shard which should be created. */ id: number; @@ -363,6 +368,129 @@ export class Shard { clearTimeout(this.heart.timeoutId); } + /** + * @inheritDoc + */ + + async handleMessage (message: MessageEvent): Promise { + let data = message.data; + + if (this.options.gatewayConfig.compress && data instanceof Blob) { + // @ts-ignore + data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer()))); + } + + if (typeof data !== 'string') return; + + const messageData = JSON.parse(data) as DiscordGatewayPayload; + + + switch (messageData.op) { + case GatewayOpcodes.Heartbeat: { + if (!this.isOpen()) return; + + this.heart.lastBeat = Date.now(); + + this.socket?.send( + JSON.stringify({ + op: GatewayOpcodes.Heartbeat, + d: this.options.previousSequenceNumber, + }), + ); + this.events.heartbeat?.(this); + + break; + } + case GatewayOpcodes.Hello: { + const interval = (messageData.d as DiscordHello).heartbeat_interval; + + this.startHeartbeating(interval); + + if (this.state !== ShardState.Resuming) { + + this.bucket = createLeakyBucket({ + max: this.safe(), + refillInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL, + refillAmount: this.safe(), + waiting: this.bucket.waiting, + }); + } + + this.events.hello?.(this); + + break; + } + case GatewayOpcodes.HeartbeatACK: { + this.heart.acknowledged = true; + this.heart.lastAck = Date.now(); + + if (this.heart.lastBeat) { + this.heart.rtt = this.heart.lastAck - this.heart.lastBeat; + } + + this.events.heartbeatAck?.(this); + + break; + } + case GatewayOpcodes.Reconnect: { + this.events.requestedReconnect?.(this); + + await this.resume(); + + break; + } + case GatewayOpcodes.InvalidSession: { + const resumable = messageData.d as boolean; + + this.events.invalidSession?.(this, resumable); + + await this.delay(Math.floor((Math.random() * 4 + 1) * 1000)); + + this.resolves.get('INVALID_SESSION')?.(messageData); + this.resolves.delete('INVALID_SESSION'); + + if (!resumable) { + await this.identify(); + + break; + } + + await this.resume(); + + break; + } + } + + if (messageData.t === 'RESUMED') { + + this.state = ShardState.Connected; + this.events.resumed?.(this); + + this.offlineSendQueue.map((resolve) => resolve()); + + this.resolves.get('RESUMED')?.(messageData); + this.resolves.delete('RESUMED'); + } + else if (messageData.t === 'READY') { + const payload = messageData.d as DiscordReady; + + this.sessionId = payload.session_id; + this.state = ShardState.Connected; + + this.offlineSendQueue.map((resolve) => resolve()); + + this.resolves.get('READY')?.(messageData); + this.resolves.delete('READY'); + } + + if (messageData.s !== null) { + this.options.previousSequenceNumber = messageData.s; + } + + this.events.message?.(this, messageData); + this.options.handleMessage(this, messageData as any); + } + /** * @inheritDoc */ @@ -395,11 +523,11 @@ export class Shard { this.state = ShardState.Identifying; this.events.identifying?.(this); - if (!this.ready()) { + if (!this.isOpen()) { await this.connect(); } - // await this.operator(); + await this.handleIdentify(); this.send( { @@ -409,8 +537,7 @@ export class Shard { compress: this.options.gatewayConfig.compress, properties: this.options.gatewayConfig.properties, intents: this.options.gatewayConfig.intents, - shard: [this.id, this.options.totalShards], - // presence: await this.makePresence?.(this.id), + shard: [this.id, this.options.totalShards] }, }, true @@ -455,15 +582,14 @@ export class Shard { socket.onmessage = (message: any) => { hi = true; - this.options.handleMessage(this, message); - // this.handle(message); + this.handleMessage(message); }; return new Promise(resolve => { socket.onopen = () => { setTimeout(() => { if (!hi) { - this.options.handleMessage(this, { + this.handleMessage({ data: JSON.stringify({ t: null, s: null, @@ -494,7 +620,7 @@ export class Shard { */ async resume(): Promise { - if (this.ready()) { + if (this.isOpen()) { this.close( ShardSocketCloseCodes.ResumeClosingOldConnection, 'Reconnecting the shard, closing old connection.' @@ -536,11 +662,11 @@ export class Shard { */ async send(message: ShardSocketRequest, highPriority: boolean) { - await checkOffline(this, highPriority); + await this.checkOffline(highPriority); await this.bucket.acquire(1, highPriority); - await checkOffline(this, highPriority); + await this.checkOffline(highPriority); this.socket?.send(JSON.stringify(message)); } @@ -613,10 +739,26 @@ export class Shard { * @inheritDoc */ - close(code: number, reason: string): void { - if (!this.ready()) { - return; + async checkOffline(highPriority: boolean): Promise { + if (!this.isOpen()) { + await new Promise(resolve => { + if (highPriority) { + this.offlineSendQueue.unshift(resolve); + } else { + this.offlineSendQueue.push(resolve); + } + }); } + } + + /** + * @inheritDoc + */ + + close(code: number, reason: string): void { + if (this.socket?.readyState !== WebSocket.OPEN) { + return; + }; return this.socket?.close(code, reason); } @@ -625,10 +767,22 @@ export class Shard { * @inheritDoc */ - ready(): boolean { + isOpen(): boolean { return this.socket?.readyState === WebSocket.OPEN; } + /** + * @inheritDoc + */ + + async delay(ms: number): Promise { + return new Promise((res): any => + setTimeout((): void => { + res(); + }, ms) + ); + } + /** * @inheritDoc */