server: switch websocket to ws #322

Manually merged
Johann150 merged 3 commits from websocket into main 2023-01-12 19:53:57 +00:00
6 changed files with 93 additions and 224 deletions

View file

@ -501,6 +501,7 @@ scratchpadDescription: "The Scratchpad provides an environment for AiScript expe
\ in it."
output: "Output"
updateRemoteUser: "Update remote user information"
deleteAllFiles: "Delete all files"
deleteAllFilesConfirm: "Are you sure that you want to delete all files?"
removeAllFollowing: "Unfollow all followed users"
removeAllFollowingDescription: "Executing this unfollows all accounts from {host}.\

View file

@ -113,7 +113,6 @@
"unzipper": "0.10.11",
"uuid": "8.3.2",
"web-push": "3.5.0",
"websocket": "1.0.34",
"ws": "8.8.0",
"xev": "3.0.2"
},
@ -164,7 +163,6 @@
"@types/tmp": "0.2.3",
"@types/uuid": "8.3.4",
"@types/web-push": "3.3.2",
"@types/websocket": "1.0.5",
"@types/ws": "8.5.3",
"@typescript-eslint/eslint-plugin": "^5.46.1",
"@typescript-eslint/parser": "^5.46.1",

View file

@ -1,5 +1,5 @@
import { EventEmitter } from 'events';
import * as websocket from 'websocket';
import { WebSocket } from 'ws';
import { readNote } from '@/services/note/read.js';
import { User } from '@/models/entities/user.js';
import { Channel as ChannelModel } from '@/models/entities/channel.js';
@ -26,29 +26,29 @@ export class Connection {
public blocking: Set<User['id']> = new Set(); // "被"blocking
public followingChannels: Set<ChannelModel['id']> = new Set();
public token?: AccessToken;
private wsConnection: websocket.connection;
private socket: WebSocket;
public subscriber: StreamEventEmitter;
private channels: Channel[] = [];
private subscribingNotes: any = {};
private cachedNotes: Packed<'Note'>[] = [];
constructor(
wsConnection: websocket.connection,
socket: WebSocket,
subscriber: EventEmitter,
user: User | null | undefined,
token: AccessToken | null | undefined,
) {
this.wsConnection = wsConnection;
this.socket = socket;
this.subscriber = subscriber;
if (user) this.user = user;
if (token) this.token = token;
this.onWsConnectionMessage = this.onWsConnectionMessage.bind(this);
this.onMessage = this.onMessage.bind(this);
this.onUserEvent = this.onUserEvent.bind(this);
this.onNoteStreamMessage = this.onNoteStreamMessage.bind(this);
this.onBroadcastMessage = this.onBroadcastMessage.bind(this);
this.wsConnection.on('message', this.onWsConnectionMessage);
this.socket.on('message', this.onMessage);
this.subscriber.on('broadcast', data => {
this.onBroadcastMessage(data);
@ -113,7 +113,7 @@ export class Connection {
break;
case 'terminate':
this.wsConnection.close();
this.socket.close();
this.dispose();
break;
@ -122,12 +122,8 @@ export class Connection {
}
}
/**
*
*/
private async onWsConnectionMessage(data: websocket.Message) {
if (data.type !== 'utf8') return;
if (data.utf8Data == null) return;
private async onMessage(data: WebSocket.RawData, isRaw: boolean) {
if (data.isRaw) return;
let obj: Record<string, any>;
@ -140,22 +136,40 @@ export class Connection {
const { type, body } = obj;
switch (type) {
case 'readNotification': this.onReadNotification(body); break;
case 'subNote': this.onSubscribeNote(body); break;
case 's': this.onSubscribeNote(body); break; // alias
case 'sr': this.onSubscribeNote(body); this.readNote(body); break;
case 'unsubNote': this.onUnsubscribeNote(body); break;
case 'un': this.onUnsubscribeNote(body); break; // alias
case 'connect': this.onChannelConnectRequested(body); break;
case 'disconnect': this.onChannelDisconnectRequested(body); break;
case 'channel': this.onChannelMessageRequested(body); break;
case 'ch': this.onChannelMessageRequested(body); break; // alias
case 'readNotification':
this.onReadNotification(body);
break;
case 'subNote': case 's':
this.onSubscribeNote(body);
break;
case 'sr':
this.onSubscribeNote(body);
this.readNote(body);
break;
case 'unsubNote': case 'un':
this.onUnsubscribeNote(body);
break;
case 'connect':
this.onChannelConnectRequested(body);
break;
case 'disconnect':
this.onChannelDisconnectRequested(body);
break;
case 'channel': case 'ch':
this.onChannelMessageRequested(body);
break;
// 個々のチャンネルではなくルートレベルでこれらのメッセージを受け取る理由は、
// クライアントの事情を考慮したとき、入力フォームはノートチャンネルやメッセージのメインコンポーネントとは別
// なこともあるため、それらのコンポーネントがそれぞれ各チャンネルに接続するようにするのは面倒なため。
case 'typingOnChannel': this.typingOnChannel(body.channel); break;
case 'typingOnMessaging': this.typingOnMessaging(body); break;
// The reason for receiving these messages at the root level rather than in
// individual channels is that when considering the client's circumstances, the
// input form may be separate from the main components of the note channel or
// message, and it would be cumbersome to have each of those components connect to
// each channel.
case 'typingOnChannel':
this.typingOnChannel(body.channel);
break;
case 'typingOnMessaging':
this.typingOnMessaging(body);
break;
}
}
@ -259,7 +273,7 @@ export class Connection {
*
*/
public sendMessageToWs(type: string, payload: any) {
this.wsConnection.send(JSON.stringify({
this.socket.send(JSON.stringify({
type,
body: payload,
}));

View file

@ -1,8 +1,8 @@
import { EventEmitter } from 'events';
import { ParsedUrlQuery } from 'querystring';
import * as http from 'node:http';
import * as websocket from 'websocket';
import { WebSocketServer } from 'ws';
import { MINUTE } from '@/const.js';
import { subscriber as redisClient } from '@/db/redis.js';
import { Users } from '@/models/index.js';
import { Connection } from './stream/index.js';
@ -10,62 +10,64 @@ import authenticate from './authenticate.js';
export const initializeStreamingServer = (server: http.Server): void => {
// Init websocket server
const ws = new websocket.server({
httpServer: server,
});
const ws = new WebSocketServer({ noServer: true });
ws.on('request', async (request): Promise<void> => {
const q = request.resourceURL.query as ParsedUrlQuery;
server.on('upgrade', async (request, socket, head)=> {
if (!request.url.startsWith('/streaming?')) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n', undefined, () => socket.destroy());
return;
}
const q = new URLSearchParams(request.url.slice(11));
const [user, app] = await authenticate(request.httpRequest.headers.authorization, q.i)
const [user, app] = await authenticate(request.headers.authorization, q.get('i'))
.catch(err => {
request.reject(403, err.message);
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n', undefined, () => socket.destroy());
return [];
});
if (typeof user === 'undefined') {
return;
}
if (typeof user === 'undefined') return;
if (user?.isSuspended) {
request.reject(400);
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n', undefined, () => socket.destroy());
return;
}
const connection = request.accept();
ws.handleUpgrade(request, socket, head, (socket) => {
const ev = new EventEmitter();
const ev = new EventEmitter();
async function onRedisMessage(_: string, data: string) {
const parsed = JSON.parse(data);
ev.emit(parsed.channel, parsed.message);
}
redisClient.on('message', onRedisMessage);
const main = new Connection(connection, ev, user, app);
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}, 1000 * 60 * 5) : null;
if (user) {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}
connection.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
});
connection.on('message', async (data) => {
if (data.type === 'utf8' && data.utf8Data === 'ping') {
connection.send('pong');
async function onRedisMessage(_: string, data: string) {
const parsed = JSON.parse(data);
ev.emit(parsed.channel, parsed.message);
}
redisClient.on('message', onRedisMessage);
const main = new Connection(socket, ev, user, app);
// keep user "online" while a stream is connected
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}, 5 * MINUTE) : null;
if (user) {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}
socket.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
});
// ping/pong mechanism
// TODO: the websocket protocol already specifies a ping/pong mechanism, why is this necessary?
socket.on('message', async (data) => {
if (data.type === 'utf8' && data.utf8Data === 'ping') {
socket.send('pong');
}
});
});
});
};

View file

@ -37,6 +37,7 @@
{{ i18n.ts.reflectMayTakeTime }}
<div class="_formBlock">
<MkButton v-if="user.host == null && iAmModerator" inline style="margin-right: 8px;" @click="resetPassword"><i class="fas fa-key"></i> {{ i18n.ts.resetPassword }}</MkButton>
<MkButton inline danger @click="deleteAllFiles">{{ i18n.ts.deleteAllFiles }}</MkButton>
<MkButton v-if="$i.isAdmin" inline danger @click="deleteAccount">{{ i18n.ts.deleteAccount }}</MkButton>
</div>
</FormSection>

147
yarn.lock
View file

@ -1853,16 +1853,6 @@ __metadata:
languageName: node
linkType: hard
"@types/glob@npm:7.2.0":
version: 7.2.0
resolution: "@types/glob@npm:7.2.0"
dependencies:
"@types/minimatch": "*"
"@types/node": "*"
checksum: 6ae717fedfdfdad25f3d5a568323926c64f52ef35897bcac8aca8e19bc50c0bd84630bbd063e5d52078b2137d8e7d3c26eabebd1a2f03ff350fff8a91e79fc19
languageName: node
linkType: hard
"@types/graceful-fs@npm:^4.1.2":
version: 4.1.5
resolution: "@types/graceful-fs@npm:4.1.5"
@ -2374,13 +2364,6 @@ __metadata:
languageName: node
linkType: hard
"@types/seedrandom@npm:3.0.2":
version: 3.0.2
resolution: "@types/seedrandom@npm:3.0.2"
checksum: 02e585601cb9764cb0eb3f92b384512f8e171422acea3d5a801a41a8a06d475b60ae520eba469bcedf0ed8ad650415919cf30a9cd6bc57090613e61bedc071ed
languageName: node
linkType: hard
"@types/semver@npm:7.3.12":
version: 7.3.12
resolution: "@types/semver@npm:7.3.12"
@ -2548,15 +2531,6 @@ __metadata:
languageName: node
linkType: hard
"@types/websocket@npm:1.0.5":
version: 1.0.5
resolution: "@types/websocket@npm:1.0.5"
dependencies:
"@types/node": "*"
checksum: 41c7a620f877a0165ff36e713455d888b7f5df9c51e71b5d0f47994f98cf22ccd339b8c6cfdc6bb417e950d40f405693974d393bd916971490553cc5e9e67a9d
languageName: node
linkType: hard
"@types/ws@npm:8.5.3":
version: 8.5.3
resolution: "@types/ws@npm:8.5.3"
@ -3740,7 +3714,6 @@ __metadata:
"@types/tmp": 0.2.3
"@types/uuid": 8.3.4
"@types/web-push": 3.3.2
"@types/websocket": 1.0.5
"@types/ws": 8.5.3
"@typescript-eslint/eslint-plugin": ^5.46.1
"@typescript-eslint/parser": ^5.46.1
@ -3840,7 +3813,6 @@ __metadata:
unzipper: 0.10.11
uuid: 8.3.2
web-push: 3.5.0
websocket: 1.0.34
ws: 8.8.0
xev: 3.0.2
languageName: unknown
@ -4202,16 +4174,6 @@ __metadata:
languageName: node
linkType: hard
"bufferutil@npm:^4.0.1":
version: 4.0.6
resolution: "bufferutil@npm:4.0.6"
dependencies:
node-gyp: latest
node-gyp-build: ^4.3.0
checksum: dd107560947445280af7820c3d0534127b911577d85d537e1d7e0aa30fd634853cef8a994d6e8aed3d81388ab1a20257de776164afe6a6af8e78f5f17968ebd6
languageName: node
linkType: hard
"bull@npm:4.8.4":
version: 4.8.4
resolution: "bull@npm:4.8.4"
@ -4741,26 +4703,15 @@ __metadata:
"@rollup/plugin-json": 4.1.0
"@rollup/pluginutils": ^4.2.1
"@syuilo/aiscript": 0.11.1
"@types/escape-regexp": 0.0.1
"@types/glob": 7.2.0
"@types/gulp": 4.0.9
"@types/gulp-rename": 2.0.1
"@types/is-url": 1.2.30
"@types/katex": 0.14.0
"@types/matter-js": 0.17.7
"@types/mocha": 9.1.1
"@types/punycode": 2.1.0
"@types/qrcode": 1.5.0
"@types/seedrandom": 3.0.2
"@types/throttle-debounce": 5.0.0
"@types/tinycolor2": 1.4.3
"@types/uuid": 8.3.4
"@types/websocket": 1.0.5
"@types/ws": 8.5.3
"@typescript-eslint/eslint-plugin": ^5.46.1
"@typescript-eslint/parser": ^5.46.1
"@vitejs/plugin-vue": ^3.1.0
abort-controller: 3.0.0
autobind-decorator: 2.4.0
autosize: 5.0.1
blurhash: 1.1.5
@ -4771,17 +4722,14 @@ __metadata:
chartjs-plugin-gradient: 0.5.0
chartjs-plugin-zoom: 1.2.1
compare-versions: 4.1.3
content-disposition: 0.5.4
cropperjs: 2.0.0-beta.1
cross-env: 7.0.3
cypress: 10.3.0
date-fns: 2.28.0
escape-regexp: 0.0.1
eslint: ^8.29.0
eslint-plugin-import: ^2.26.0
eslint-plugin-vue: ^9.8.0
eventemitter3: 4.0.7
feed: 4.2.2
foundkey-js: "workspace:*"
idb-keyval: 6.2.0
insert-text-at-cursor: 0.3.0
@ -4789,42 +4737,24 @@ __metadata:
katex: 0.16.0
matter-js: 0.18.0
mfm-js: 0.22.1
mocha: 10.0.0
ms: 2.1.3
nested-property: 4.0.0
photoswipe: 5.2.8
prismjs: 1.28.0
private-ip: 2.3.3
promise-limit: 2.7.0
pug: 3.0.2
punycode: 2.1.1
qrcode: 1.5.1
reflect-metadata: 0.1.13
rollup: 2.75.7
sass: 1.53.0
seedrandom: 3.0.5
start-server-and-test: 1.14.0
strict-event-emitter-types: 2.0.0
stringz: 2.1.0
syuilo-password-strength: 0.0.1
talisman: ^1.1.4
textarea-caret: 3.1.0
three: 0.142.0
throttle-debounce: 5.0.0
tinycolor2: 1.4.2
tsc-alias: 1.7.0
tsconfig-paths: 4.1.0
twemoji-parser: 14.0.0
typescript: ^4.9.4
uuid: 8.3.2
v-debounce: 0.1.2
vanilla-tilt: 1.7.2
vite: 3.1.0
vue: 3.2.45
vue-prism-editor: 2.0.0-alpha.2
vuedraggable: 4.0.1
websocket: 1.0.34
ws: 8.8.0
languageName: unknown
linkType: soft
@ -12445,17 +12375,6 @@ __metadata:
languageName: node
linkType: hard
"node-gyp-build@npm:^4.3.0":
version: 4.5.0
resolution: "node-gyp-build@npm:4.5.0"
bin:
node-gyp-build: bin.js
node-gyp-build-optional: optional.js
node-gyp-build-test: build-test.js
checksum: d888bae0fb88335f69af1b57a2294a931c5042f36e413d8d364c992c9ebfa0b96ffe773179a5a2c8f04b73856e8634e09cce108dbb9804396d3cc8c5455ff2db
languageName: node
linkType: hard
"node-gyp@npm:^9.3.0":
version: 9.3.0
resolution: "node-gyp@npm:9.3.0"
@ -15048,20 +14967,6 @@ __metadata:
languageName: node
linkType: hard
"rollup@npm:2.75.7":
version: 2.75.7
resolution: "rollup@npm:2.75.7"
dependencies:
fsevents: ~2.3.2
dependenciesMeta:
fsevents:
optional: true
bin:
rollup: dist/bin/rollup
checksum: a6331d46b01062b184efdcb42ce12caf2e1575f989963944534b02f5855f6f3bc239ed0c1a18893572d7695af6d83166f3aef59dbd0365084e1531cb67824674
languageName: node
linkType: hard
"rollup@npm:~2.78.0":
version: 2.78.1
resolution: "rollup@npm:2.78.1"
@ -16402,13 +16307,6 @@ __metadata:
languageName: node
linkType: hard
"three@npm:0.142.0":
version: 0.142.0
resolution: "three@npm:0.142.0"
checksum: b920987cab222bea5fd657a498f8203546c6b613cee01d2e1a355c7e044204bbff6ef051641b026743a6d3d5261a988c84285039a94e4c94a959f1b3789ac682
languageName: node
linkType: hard
"throat@npm:^6.0.1":
version: 6.0.1
resolution: "throat@npm:6.0.1"
@ -17327,16 +17225,6 @@ __metadata:
languageName: node
linkType: hard
"utf-8-validate@npm:^5.0.2":
version: 5.0.9
resolution: "utf-8-validate@npm:5.0.9"
dependencies:
node-gyp: latest
node-gyp-build: ^4.3.0
checksum: 90117f1b65e0a1256c83dfad529983617263b622f2379745311d0438c7ea31db0d134ebd0dca84c3f5847a3560a3d249644e478a9109c616f63c7ea19cac53dc
languageName: node
linkType: hard
"util-deprecate@npm:^1.0.1, util-deprecate@npm:^1.0.2, util-deprecate@npm:~1.0.1":
version: 1.0.2
resolution: "util-deprecate@npm:1.0.2"
@ -17371,13 +17259,6 @@ __metadata:
languageName: node
linkType: hard
"v-debounce@npm:0.1.2":
version: 0.1.2
resolution: "v-debounce@npm:0.1.2"
checksum: 58428fb783581854a46acbeb1cf14beaddf51467a1cac2b4fd2053f32e6243e6ca8ff6948eca1b8ce82b849dadf12695a21b8bb3062fecbed0db812c14c0a983
languageName: node
linkType: hard
"v8-compile-cache-lib@npm:^3.0.1":
version: 3.0.1
resolution: "v8-compile-cache-lib@npm:3.0.1"
@ -17429,13 +17310,6 @@ __metadata:
languageName: node
linkType: hard
"vanilla-tilt@npm:1.7.2":
version: 1.7.2
resolution: "vanilla-tilt@npm:1.7.2"
checksum: 6909bae39a32f7fa707f848468d0bd914020dd38931821f2ce7e1f0b546a123ebd76c64fec7398b351021cf72508bb7d8af5627c92435bec5892827f62127c67
languageName: node
linkType: hard
"vary@npm:^1.1.2":
version: 1.1.2
resolution: "vary@npm:1.1.2"
@ -17715,20 +17589,6 @@ __metadata:
languageName: node
linkType: hard
"websocket@npm:1.0.34":
version: 1.0.34
resolution: "websocket@npm:1.0.34"
dependencies:
bufferutil: ^4.0.1
debug: ^2.2.0
es5-ext: ^0.10.50
typedarray-to-buffer: ^3.1.5
utf-8-validate: ^5.0.2
yaeti: ^0.0.6
checksum: 8a0ce6d79cc1334bb6ea0d607f0092f3d32700b4dd19e4d5540f2a85f3b50e1f8110da0e4716737056584dde70bbebcb40bbd94bbb437d7468c71abfbfa077d8
languageName: node
linkType: hard
"whatwg-encoding@npm:^1.0.5":
version: 1.0.5
resolution: "whatwg-encoding@npm:1.0.5"
@ -18080,13 +17940,6 @@ __metadata:
languageName: node
linkType: hard
"yaeti@npm:^0.0.6":
version: 0.0.6
resolution: "yaeti@npm:0.0.6"
checksum: 6db12c152f7c363b80071086a3ebf5032e03332604eeda988872be50d6c8469e1f13316175544fa320f72edad696c2d83843ad0ff370659045c1a68bcecfcfea
languageName: node
linkType: hard
"yallist@npm:4.0.0, yallist@npm:^4.0.0":
version: 4.0.0
resolution: "yallist@npm:4.0.0"