From 1319dc93d9df09715253c90ec086200fa0fd7cc9 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Wed, 11 Jan 2023 22:16:01 +0100 Subject: [PATCH] server: switch websocket to ws --- packages/backend/package.json | 2 - .../backend/src/server/api/stream/index.ts | 72 ++++++++------ packages/backend/src/server/api/streaming.ts | 94 ++++++++++--------- 3 files changed, 91 insertions(+), 77 deletions(-) diff --git a/packages/backend/package.json b/packages/backend/package.json index cea0018e1..5d4eca072 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -113,7 +113,6 @@ "unzipper": "0.10.11", "uuid": "8.3.2", "web-push": "3.5.0", - "websocket": "1.0.34", "ws": "8.8.0", "xev": "3.0.2" }, @@ -164,7 +163,6 @@ "@types/tmp": "0.2.3", "@types/uuid": "8.3.4", "@types/web-push": "3.3.2", - "@types/websocket": "1.0.5", "@types/ws": "8.5.3", "@typescript-eslint/eslint-plugin": "^5.46.1", "@typescript-eslint/parser": "^5.46.1", diff --git a/packages/backend/src/server/api/stream/index.ts b/packages/backend/src/server/api/stream/index.ts index c39a33c74..145c318fa 100644 --- a/packages/backend/src/server/api/stream/index.ts +++ b/packages/backend/src/server/api/stream/index.ts @@ -1,5 +1,5 @@ import { EventEmitter } from 'events'; -import * as websocket from 'websocket'; +import { WebSocket } from 'ws'; import { readNote } from '@/services/note/read.js'; import { User } from '@/models/entities/user.js'; import { Channel as ChannelModel } from '@/models/entities/channel.js'; @@ -26,29 +26,29 @@ export class Connection { public blocking: Set = new Set(); // "被"blocking public followingChannels: Set = new Set(); public token?: AccessToken; - private wsConnection: websocket.connection; + private socket: WebSocket; public subscriber: StreamEventEmitter; private channels: Channel[] = []; private subscribingNotes: any = {}; private cachedNotes: Packed<'Note'>[] = []; constructor( - wsConnection: websocket.connection, + socket: WebSocket, subscriber: EventEmitter, user: User | null | undefined, token: AccessToken | null | undefined, ) { - this.wsConnection = wsConnection; + this.socket = socket; this.subscriber = subscriber; if (user) this.user = user; if (token) this.token = token; - this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); + this.onMessage = this.onMessage.bind(this); this.onUserEvent = this.onUserEvent.bind(this); this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); this.onBroadcastMessage = this.onBroadcastMessage.bind(this); - this.wsConnection.on('message', this.onWsConnectionMessage); + this.socket.on('message', this.onMessage); this.subscriber.on('broadcast', data => { this.onBroadcastMessage(data); @@ -113,7 +113,7 @@ export class Connection { break; case 'terminate': - this.wsConnection.close(); + this.socket.close(); this.dispose(); break; @@ -122,12 +122,8 @@ export class Connection { } } - /** - * クライアントからメッセージ受信時 - */ - private async onWsConnectionMessage(data: websocket.Message) { - if (data.type !== 'utf8') return; - if (data.utf8Data == null) return; + private async onMessage(data: WebSocket.RawData, isRaw: boolean) { + if (data.isRaw) return; let obj: Record; @@ -140,22 +136,40 @@ export class Connection { const { type, body } = obj; switch (type) { - case 'readNotification': this.onReadNotification(body); break; - case 'subNote': this.onSubscribeNote(body); break; - case 's': this.onSubscribeNote(body); break; // alias - case 'sr': this.onSubscribeNote(body); this.readNote(body); break; - case 'unsubNote': this.onUnsubscribeNote(body); break; - case 'un': this.onUnsubscribeNote(body); break; // alias - case 'connect': this.onChannelConnectRequested(body); break; - case 'disconnect': this.onChannelDisconnectRequested(body); break; - case 'channel': this.onChannelMessageRequested(body); break; - case 'ch': this.onChannelMessageRequested(body); break; // alias + case 'readNotification': + this.onReadNotification(body); + break; + case 'subNote': case 's': + this.onSubscribeNote(body); + break; + case 'sr': + this.onSubscribeNote(body); + this.readNote(body); + break; + case 'unsubNote': case 'un': + this.onUnsubscribeNote(body); + break; + case 'connect': + this.onChannelConnectRequested(body); + break; + case 'disconnect': + this.onChannelDisconnectRequested(body); + break; + case 'channel': case 'ch': + this.onChannelMessageRequested(body); + break; - // 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、 - // クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 - // なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 - case 'typingOnChannel': this.typingOnChannel(body.channel); break; - case 'typingOnMessaging': this.typingOnMessaging(body); break; + // The reason for receiving these messages at the root level rather than in + // individual channels is that when considering the client's circumstances, the + // input form may be separate from the main components of the note channel or + // message, and it would be cumbersome to have each of those components connect to + // each channel. + case 'typingOnChannel': + this.typingOnChannel(body.channel); + break; + case 'typingOnMessaging': + this.typingOnMessaging(body); + break; } } @@ -259,7 +273,7 @@ export class Connection { * クライアントにメッセージ送信 */ public sendMessageToWs(type: string, payload: any) { - this.wsConnection.send(JSON.stringify({ + this.socket.send(JSON.stringify({ type, body: payload, })); diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts index 63b731770..825896b56 100644 --- a/packages/backend/src/server/api/streaming.ts +++ b/packages/backend/src/server/api/streaming.ts @@ -1,8 +1,8 @@ import { EventEmitter } from 'events'; -import { ParsedUrlQuery } from 'querystring'; import * as http from 'node:http'; -import * as websocket from 'websocket'; +import { WebSocketServer } from 'ws'; +import { MINUTE } from '@/const.js'; import { subscriber as redisClient } from '@/db/redis.js'; import { Users } from '@/models/index.js'; import { Connection } from './stream/index.js'; @@ -10,62 +10,64 @@ import authenticate from './authenticate.js'; export const initializeStreamingServer = (server: http.Server): void => { // Init websocket server - const ws = new websocket.server({ - httpServer: server, - }); + const ws = new WebSocketServer({ noServer: true }); - ws.on('request', async (request): Promise => { - const q = request.resourceURL.query as ParsedUrlQuery; + server.on('upgrade', async (request, socket, head)=> { + if (!request.url.startsWith('/streaming?')) { + socket.write('HTTP/1.1 400 Bad Request\r\n\r\n', undefined, () => socket.destroy()); + return; + } + const q = new URLSearchParams(request.url.slice(11)); - const [user, app] = await authenticate(request.httpRequest.headers.authorization, q.i) + const [user, app] = await authenticate(request.headers.authorization, q.get('i')) .catch(err => { - request.reject(403, err.message); + socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n', undefined, () => socket.destroy()); return []; }); - if (typeof user === 'undefined') { - return; - } + if (typeof user === 'undefined') return; if (user?.isSuspended) { - request.reject(400); + socket.write('HTTP/1.1 403 Forbidden\r\n\r\n', undefined, () => socket.destroy()); return; } - const connection = request.accept(); + ws.handleUpgrade(request, socket, head, (socket) => { + const ev = new EventEmitter(); - const ev = new EventEmitter(); - - async function onRedisMessage(_: string, data: string) { - const parsed = JSON.parse(data); - ev.emit(parsed.channel, parsed.message); - } - - redisClient.on('message', onRedisMessage); - - const main = new Connection(connection, ev, user, app); - - const intervalId = user ? setInterval(() => { - Users.update(user.id, { - lastActiveDate: new Date(), - }); - }, 1000 * 60 * 5) : null; - if (user) { - Users.update(user.id, { - lastActiveDate: new Date(), - }); - } - - connection.once('close', () => { - ev.removeAllListeners(); - main.dispose(); - redisClient.off('message', onRedisMessage); - if (intervalId) clearInterval(intervalId); - }); - - connection.on('message', async (data) => { - if (data.type === 'utf8' && data.utf8Data === 'ping') { - connection.send('pong'); + async function onRedisMessage(_: string, data: string) { + const parsed = JSON.parse(data); + ev.emit(parsed.channel, parsed.message); } + + redisClient.on('message', onRedisMessage); + + const main = new Connection(socket, ev, user, app); + + // keep user "online" while a stream is connected + const intervalId = user ? setInterval(() => { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + }, 5 * MINUTE) : null; + if (user) { + Users.update(user.id, { + lastActiveDate: new Date(), + }); + } + socket.once('close', () => { + ev.removeAllListeners(); + main.dispose(); + redisClient.off('message', onRedisMessage); + if (intervalId) clearInterval(intervalId); + }); + + // ping/pong mechanism + // TODO: the websocket protocol already specifies a ping/pong mechanism, why is this necessary? + socket.on('message', async (data) => { + if (data.type === 'utf8' && data.utf8Data === 'ping') { + socket.send('pong'); + } + }); }); }); };