server: switch websocket to ws

This commit is contained in:
Johann150 2023-01-11 22:16:01 +01:00
parent 80e2851378
commit 1319dc93d9
Signed by untrusted user: Johann150
GPG key ID: 9EE6577A2A06F8F1
3 changed files with 91 additions and 77 deletions

View file

@ -113,7 +113,6 @@
"unzipper": "0.10.11", "unzipper": "0.10.11",
"uuid": "8.3.2", "uuid": "8.3.2",
"web-push": "3.5.0", "web-push": "3.5.0",
"websocket": "1.0.34",
"ws": "8.8.0", "ws": "8.8.0",
"xev": "3.0.2" "xev": "3.0.2"
}, },
@ -164,7 +163,6 @@
"@types/tmp": "0.2.3", "@types/tmp": "0.2.3",
"@types/uuid": "8.3.4", "@types/uuid": "8.3.4",
"@types/web-push": "3.3.2", "@types/web-push": "3.3.2",
"@types/websocket": "1.0.5",
"@types/ws": "8.5.3", "@types/ws": "8.5.3",
"@typescript-eslint/eslint-plugin": "^5.46.1", "@typescript-eslint/eslint-plugin": "^5.46.1",
"@typescript-eslint/parser": "^5.46.1", "@typescript-eslint/parser": "^5.46.1",

View file

@ -1,5 +1,5 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import * as websocket from 'websocket'; import { WebSocket } from 'ws';
import { readNote } from '@/services/note/read.js'; import { readNote } from '@/services/note/read.js';
import { User } from '@/models/entities/user.js'; import { User } from '@/models/entities/user.js';
import { Channel as ChannelModel } from '@/models/entities/channel.js'; import { Channel as ChannelModel } from '@/models/entities/channel.js';
@ -26,29 +26,29 @@ export class Connection {
public blocking: Set<User['id']> = new Set(); // "被"blocking public blocking: Set<User['id']> = new Set(); // "被"blocking
public followingChannels: Set<ChannelModel['id']> = new Set(); public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken; public token?: AccessToken;
private wsConnection: websocket.connection; private socket: WebSocket;
public subscriber: StreamEventEmitter; public subscriber: StreamEventEmitter;
private channels: Channel[] = []; private channels: Channel[] = [];
private subscribingNotes: any = {}; private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = []; private cachedNotes: Packed<'Note'>[] = [];
constructor( constructor(
wsConnection: websocket.connection, socket: WebSocket,
subscriber: EventEmitter, subscriber: EventEmitter,
user: User | null | undefined, user: User | null | undefined,
token: AccessToken | null | undefined, token: AccessToken | null | undefined,
) { ) {
this.wsConnection = wsConnection; this.socket = socket;
this.subscriber = subscriber; this.subscriber = subscriber;
if (user) this.user = user; if (user) this.user = user;
if (token) this.token = token; if (token) this.token = token;
this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this); this.onMessage = this.onMessage.bind(this);
this.onUserEvent = this.onUserEvent.bind(this); this.onUserEvent = this.onUserEvent.bind(this);
this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this); this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this);
this.onBroadcastMessage = this.onBroadcastMessage.bind(this); this.onBroadcastMessage = this.onBroadcastMessage.bind(this);
this.wsConnection.on('message', this.onWsConnectionMessage); this.socket.on('message', this.onMessage);
this.subscriber.on('broadcast', data => { this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data); this.onBroadcastMessage(data);
@ -113,7 +113,7 @@ export class Connection {
break; break;
case 'terminate': case 'terminate':
this.wsConnection.close(); this.socket.close();
this.dispose(); this.dispose();
break; break;
@ -122,12 +122,8 @@ export class Connection {
} }
} }
/** private async onMessage(data: WebSocket.RawData, isRaw: boolean) {
* if (data.isRaw) return;
*/
private async onWsConnectionMessage(data: websocket.Message) {
if (data.type !== 'utf8') return;
if (data.utf8Data == null) return;
let obj: Record<string, any>; let obj: Record<string, any>;
@ -140,22 +136,40 @@ export class Connection {
const { type, body } = obj; const { type, body } = obj;
switch (type) { switch (type) {
case 'readNotification': this.onReadNotification(body); break; case 'readNotification':
case 'subNote': this.onSubscribeNote(body); break; this.onReadNotification(body);
case 's': this.onSubscribeNote(body); break; // alias break;
case 'sr': this.onSubscribeNote(body); this.readNote(body); break; case 'subNote': case 's':
case 'unsubNote': this.onUnsubscribeNote(body); break; this.onSubscribeNote(body);
case 'un': this.onUnsubscribeNote(body); break; // alias break;
case 'connect': this.onChannelConnectRequested(body); break; case 'sr':
case 'disconnect': this.onChannelDisconnectRequested(body); break; this.onSubscribeNote(body);
case 'channel': this.onChannelMessageRequested(body); break; this.readNote(body);
case 'ch': this.onChannelMessageRequested(body); break; // alias break;
case 'unsubNote': case 'un':
this.onUnsubscribeNote(body);
break;
case 'connect':
this.onChannelConnectRequested(body);
break;
case 'disconnect':
this.onChannelDisconnectRequested(body);
break;
case 'channel': case 'ch':
this.onChannelMessageRequested(body);
break;
// 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、 // The reason for receiving these messages at the root level rather than in
// クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別 // individual channels is that when considering the client's circumstances, the
// なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。 // input form may be separate from the main components of the note channel or
case 'typingOnChannel': this.typingOnChannel(body.channel); break; // message, and it would be cumbersome to have each of those components connect to
case 'typingOnMessaging': this.typingOnMessaging(body); break; // each channel.
case 'typingOnChannel':
this.typingOnChannel(body.channel);
break;
case 'typingOnMessaging':
this.typingOnMessaging(body);
break;
} }
} }
@ -259,7 +273,7 @@ export class Connection {
* *
*/ */
public sendMessageToWs(type: string, payload: any) { public sendMessageToWs(type: string, payload: any) {
this.wsConnection.send(JSON.stringify({ this.socket.send(JSON.stringify({
type, type,
body: payload, body: payload,
})); }));

View file

@ -1,8 +1,8 @@
import { EventEmitter } from 'events'; import { EventEmitter } from 'events';
import { ParsedUrlQuery } from 'querystring';
import * as http from 'node:http'; import * as http from 'node:http';
import * as websocket from 'websocket'; import { WebSocketServer } from 'ws';
import { MINUTE } from '@/const.js';
import { subscriber as redisClient } from '@/db/redis.js'; import { subscriber as redisClient } from '@/db/redis.js';
import { Users } from '@/models/index.js'; import { Users } from '@/models/index.js';
import { Connection } from './stream/index.js'; import { Connection } from './stream/index.js';
@ -10,62 +10,64 @@ import authenticate from './authenticate.js';
export const initializeStreamingServer = (server: http.Server): void => { export const initializeStreamingServer = (server: http.Server): void => {
// Init websocket server // Init websocket server
const ws = new websocket.server({ const ws = new WebSocketServer({ noServer: true });
httpServer: server,
});
ws.on('request', async (request): Promise<void> => { server.on('upgrade', async (request, socket, head)=> {
const q = request.resourceURL.query as ParsedUrlQuery; if (!request.url.startsWith('/streaming?')) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n', undefined, () => socket.destroy());
return;
}
const q = new URLSearchParams(request.url.slice(11));
const [user, app] = await authenticate(request.httpRequest.headers.authorization, q.i) const [user, app] = await authenticate(request.headers.authorization, q.get('i'))
.catch(err => { .catch(err => {
request.reject(403, err.message); socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n', undefined, () => socket.destroy());
return []; return [];
}); });
if (typeof user === 'undefined') { if (typeof user === 'undefined') return;
return;
}
if (user?.isSuspended) { if (user?.isSuspended) {
request.reject(400); socket.write('HTTP/1.1 403 Forbidden\r\n\r\n', undefined, () => socket.destroy());
return; return;
} }
const connection = request.accept(); ws.handleUpgrade(request, socket, head, (socket) => {
const ev = new EventEmitter();
const ev = new EventEmitter(); async function onRedisMessage(_: string, data: string) {
const parsed = JSON.parse(data);
async function onRedisMessage(_: string, data: string) { ev.emit(parsed.channel, parsed.message);
const parsed = JSON.parse(data);
ev.emit(parsed.channel, parsed.message);
}
redisClient.on('message', onRedisMessage);
const main = new Connection(connection, ev, user, app);
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}, 1000 * 60 * 5) : null;
if (user) {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}
connection.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
});
connection.on('message', async (data) => {
if (data.type === 'utf8' && data.utf8Data === 'ping') {
connection.send('pong');
} }
redisClient.on('message', onRedisMessage);
const main = new Connection(socket, ev, user, app);
// keep user "online" while a stream is connected
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}, 5 * MINUTE) : null;
if (user) {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}
socket.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
});
// ping/pong mechanism
// TODO: the websocket protocol already specifies a ping/pong mechanism, why is this necessary?
socket.on('message', async (data) => {
if (data.type === 'utf8' && data.utf8Data === 'ping') {
socket.send('pong');
}
});
}); });
}); });
}; };