sharding replaces

This commit is contained in:
Dragurimu 2022-07-30 00:19:28 -05:00
parent f7c293aaf2
commit 025d4176c2
7 changed files with 177 additions and 23 deletions

View File

@ -1,6 +1,6 @@
{
"name": "@biscuitland/api-types",
"version": "1.0.1",
"version": "1.0.2",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",

View File

@ -1,6 +1,6 @@
{
"name": "@biscuitland/cache",
"version": "1.0.1",
"version": "1.0.2",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",

View File

@ -1,6 +1,6 @@
{
"name": "@biscuitland/core",
"version": "1.0.1",
"version": "1.0.2",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",

View File

@ -2,7 +2,7 @@ import type {
AtLeastOne,
ApplicationCommandPermissionTypes,
DiscordApplicationCommand,
DiscordGatewayPayload,
// DiscordGatewayPayload,
DiscordGuildApplicationCommandPermissions,
DiscordUser,
DiscordApplicationCommandOption,
@ -210,8 +210,8 @@ export class Session {
// makeWs
const defHandler: DiscordRawEventHandler = (shard, event) => {
let message = event.data;
let data = JSON.parse(message) as DiscordGatewayPayload;
let data = event as any;
// let data = JSON.parse(message) as DiscordGatewayPayload;
Actions.raw(this, shard.id, data);

View File

@ -1,6 +1,6 @@
{
"name": "@biscuitland/rest",
"version": "1.0.1",
"version": "1.0.2",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",

View File

@ -1,6 +1,6 @@
{
"name": "@biscuitland/ws",
"version": "1.0.1",
"version": "1.0.2",
"main": "./dist/index.js",
"module": "./dist/index.mjs",
"types": "./dist/index.d.ts",

View File

@ -1,5 +1,7 @@
import type {
DiscordGatewayPayload,
DiscordHello,
DiscordReady,
PickPartial,
} from '@biscuitland/api-types';
import type { LeakyBucket } from '../utils/bucket-util';
@ -13,7 +15,7 @@ import {
} from '@biscuitland/api-types';
import WebSocket from 'ws';
import { checkOffline } from '../utils/shard-util';
import { inflateSync } from 'node:zlib';
export const DEFAULT_HEARTBEAT_INTERVAL = 45000;
@ -26,6 +28,9 @@ export type PickOptions = Pick<
> &
Partial<ShardOptions>;
const decoder = new TextDecoder();
export interface ShardOptions {
/** Id of the shard which should be created. */
id: number;
@ -363,6 +368,129 @@ export class Shard {
clearTimeout(this.heart.timeoutId);
}
/**
* @inheritDoc
*/
async handleMessage (message: MessageEvent): Promise<void> {
let data = message.data;
if (this.options.gatewayConfig.compress && data instanceof Blob) {
// @ts-ignore
data = decoder.decode(inflateSync(new Uint8Array(await message.arrayBuffer())));
}
if (typeof data !== 'string') return;
const messageData = JSON.parse(data) as DiscordGatewayPayload;
switch (messageData.op) {
case GatewayOpcodes.Heartbeat: {
if (!this.isOpen()) return;
this.heart.lastBeat = Date.now();
this.socket?.send(
JSON.stringify({
op: GatewayOpcodes.Heartbeat,
d: this.options.previousSequenceNumber,
}),
);
this.events.heartbeat?.(this);
break;
}
case GatewayOpcodes.Hello: {
const interval = (messageData.d as DiscordHello).heartbeat_interval;
this.startHeartbeating(interval);
if (this.state !== ShardState.Resuming) {
this.bucket = createLeakyBucket({
max: this.safe(),
refillInterval: GATEWAY_RATE_LIMIT_RESET_INTERVAL,
refillAmount: this.safe(),
waiting: this.bucket.waiting,
});
}
this.events.hello?.(this);
break;
}
case GatewayOpcodes.HeartbeatACK: {
this.heart.acknowledged = true;
this.heart.lastAck = Date.now();
if (this.heart.lastBeat) {
this.heart.rtt = this.heart.lastAck - this.heart.lastBeat;
}
this.events.heartbeatAck?.(this);
break;
}
case GatewayOpcodes.Reconnect: {
this.events.requestedReconnect?.(this);
await this.resume();
break;
}
case GatewayOpcodes.InvalidSession: {
const resumable = messageData.d as boolean;
this.events.invalidSession?.(this, resumable);
await this.delay(Math.floor((Math.random() * 4 + 1) * 1000));
this.resolves.get('INVALID_SESSION')?.(messageData);
this.resolves.delete('INVALID_SESSION');
if (!resumable) {
await this.identify();
break;
}
await this.resume();
break;
}
}
if (messageData.t === 'RESUMED') {
this.state = ShardState.Connected;
this.events.resumed?.(this);
this.offlineSendQueue.map((resolve) => resolve());
this.resolves.get('RESUMED')?.(messageData);
this.resolves.delete('RESUMED');
}
else if (messageData.t === 'READY') {
const payload = messageData.d as DiscordReady;
this.sessionId = payload.session_id;
this.state = ShardState.Connected;
this.offlineSendQueue.map((resolve) => resolve());
this.resolves.get('READY')?.(messageData);
this.resolves.delete('READY');
}
if (messageData.s !== null) {
this.options.previousSequenceNumber = messageData.s;
}
this.events.message?.(this, messageData);
this.options.handleMessage(this, messageData as any);
}
/**
* @inheritDoc
*/
@ -395,11 +523,11 @@ export class Shard {
this.state = ShardState.Identifying;
this.events.identifying?.(this);
if (!this.ready()) {
if (!this.isOpen()) {
await this.connect();
}
// await this.operator();
await this.handleIdentify();
this.send(
{
@ -409,8 +537,7 @@ export class Shard {
compress: this.options.gatewayConfig.compress,
properties: this.options.gatewayConfig.properties,
intents: this.options.gatewayConfig.intents,
shard: [this.id, this.options.totalShards],
// presence: await this.makePresence?.(this.id),
shard: [this.id, this.options.totalShards]
},
},
true
@ -455,15 +582,14 @@ export class Shard {
socket.onmessage = (message: any) => {
hi = true;
this.options.handleMessage(this, message);
// this.handle(message);
this.handleMessage(message);
};
return new Promise(resolve => {
socket.onopen = () => {
setTimeout(() => {
if (!hi) {
this.options.handleMessage(this, {
this.handleMessage({
data: JSON.stringify({
t: null,
s: null,
@ -494,7 +620,7 @@ export class Shard {
*/
async resume(): Promise<void> {
if (this.ready()) {
if (this.isOpen()) {
this.close(
ShardSocketCloseCodes.ResumeClosingOldConnection,
'Reconnecting the shard, closing old connection.'
@ -536,11 +662,11 @@ export class Shard {
*/
async send(message: ShardSocketRequest, highPriority: boolean) {
await checkOffline(this, highPriority);
await this.checkOffline(highPriority);
await this.bucket.acquire(1, highPriority);
await checkOffline(this, highPriority);
await this.checkOffline(highPriority);
this.socket?.send(JSON.stringify(message));
}
@ -613,10 +739,26 @@ export class Shard {
* @inheritDoc
*/
close(code: number, reason: string): void {
if (!this.ready()) {
return;
async checkOffline(highPriority: boolean): Promise<void> {
if (!this.isOpen()) {
await new Promise(resolve => {
if (highPriority) {
this.offlineSendQueue.unshift(resolve);
} else {
this.offlineSendQueue.push(resolve);
}
});
}
}
/**
* @inheritDoc
*/
close(code: number, reason: string): void {
if (this.socket?.readyState !== WebSocket.OPEN) {
return;
};
return this.socket?.close(code, reason);
}
@ -625,10 +767,22 @@ export class Shard {
* @inheritDoc
*/
ready(): boolean {
isOpen(): boolean {
return this.socket?.readyState === WebSocket.OPEN;
}
/**
* @inheritDoc
*/
async delay(ms: number): Promise<void> {
return new Promise((res): any =>
setTimeout((): void => {
res();
}, ms)
);
}
/**
* @inheritDoc
*/