Ignas Kiela
296c40c5b4
This resulted in endless ping-pong traffic on the websocket, happening every interval of network latency to the server (e.g. for me, with 40ms latency to my server, it was about every 40ms). On my server this ended up taking about 20% of foundkey's CPU usage. Now, just send pings every 30s, and check if we have received any pong's in last 60 seconds to check that the connection is still alive. Changelog: Fixed
85 lines
2.4 KiB
TypeScript
85 lines
2.4 KiB
TypeScript
import { EventEmitter } from 'events';
|
|
import * as http from 'node:http';
|
|
import { WebSocketServer } from 'ws';
|
|
|
|
import { SECOND, MINUTE } from '@/const.js';
|
|
import { subscriber as redisClient } from '@/db/redis.js';
|
|
import { Users } from '@/models/index.js';
|
|
import { Connection } from './stream/index.js';
|
|
import { authenticate } from './authenticate.js';
|
|
|
|
export const initializeStreamingServer = (server: http.Server): void => {
|
|
// Init websocket server
|
|
const ws = new WebSocketServer({ noServer: true });
|
|
|
|
server.on('upgrade', async (request, socket, head)=> {
|
|
if (!request.url.startsWith('/streaming?')) {
|
|
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n', undefined, () => socket.destroy());
|
|
return;
|
|
}
|
|
const q = new URLSearchParams(request.url.slice(11));
|
|
|
|
const [user, app] = await authenticate(request.headers.authorization, q.get('i'))
|
|
.catch(err => {
|
|
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n', undefined, () => socket.destroy());
|
|
return [];
|
|
});
|
|
if (typeof user === 'undefined') return;
|
|
|
|
if (user?.isSuspended) {
|
|
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n', undefined, () => socket.destroy());
|
|
return;
|
|
}
|
|
|
|
ws.handleUpgrade(request, socket, head, (socket) => {
|
|
const ev = new EventEmitter();
|
|
|
|
async function onRedisMessage(_: string, data: string) {
|
|
const parsed = JSON.parse(data);
|
|
ev.emit(parsed.channel, parsed.message);
|
|
}
|
|
|
|
redisClient.on('message', onRedisMessage);
|
|
|
|
const main = new Connection(socket, ev, user, app);
|
|
|
|
// ping/pong mechanism
|
|
let pingTimeout: NodeJS.Timeout | null = null;
|
|
let disconnectTimeout = setTimeout(() => {
|
|
socket.terminate();
|
|
}, 60 * SECOND);;
|
|
function sendPing() {
|
|
socket.ping();
|
|
pingTimeout = setTimeout(() => {
|
|
sendPing();
|
|
}, 30 * SECOND);
|
|
}
|
|
function onPong() {
|
|
disconnectTimeout.refresh()
|
|
}
|
|
sendPing();
|
|
socket.on('pong', onPong);
|
|
|
|
// keep user "online" while a stream is connected
|
|
const intervalId = user ? setInterval(() => {
|
|
Users.update(user.id, {
|
|
lastActiveDate: new Date(),
|
|
});
|
|
}, 5 * MINUTE) : null;
|
|
if (user) {
|
|
Users.update(user.id, {
|
|
lastActiveDate: new Date(),
|
|
});
|
|
}
|
|
|
|
socket.once('close', () => {
|
|
ev.removeAllListeners();
|
|
main.dispose();
|
|
redisClient.off('message', onRedisMessage);
|
|
if (intervalId) clearInterval(intervalId);
|
|
if (pingTimeout) clearTimeout(pingTimeout);
|
|
if (disconnectTimeout) clearTimeout(disconnectTimeout);
|
|
});
|
|
});
|
|
});
|
|
};
|