FoundKey/packages/backend/src/server/api/streaming.ts
Ignas Kiela f7df989679
All checks were successful
ci/woodpecker/pr/lint-backend Pipeline was successful
ci/woodpecker/pr/lint-foundkey-js Pipeline was successful
ci/woodpecker/pr/build Pipeline was successful
ci/woodpecker/pr/lint-client Pipeline was successful
ci/woodpecker/pr/lint-sw Pipeline was successful
ci/woodpecker/pr/test Pipeline was successful
fix: stop sending pings on every pong
This resulted in endless ping-pong traffic on the websocket, happening
every interval of network latency to the server (e.g. for me, with 40ms
latency to my server, it was about every 40ms). On my server this ended
up taking about 20% of foundkey's CPU usage. Now, just send pings every
30s, and check if we have received any pong's in last 60 seconds to
check that the connection is still alive.

Changelog: Fixed
2023-05-23 11:19:30 +03:00

86 lines
2.4 KiB
TypeScript

import { EventEmitter } from 'events';
import * as http from 'node:http';
import { WebSocketServer } from 'ws';
import { SECOND, MINUTE } from '@/const.js';
import { subscriber as redisClient } from '@/db/redis.js';
import { Users } from '@/models/index.js';
import { Connection } from './stream/index.js';
import { authenticate } from './authenticate.js';
export const initializeStreamingServer = (server: http.Server): void => {
// Init websocket server
const ws = new WebSocketServer({ noServer: true });
server.on('upgrade', async (request, socket, head)=> {
if (!request.url.startsWith('/streaming?')) {
socket.write('HTTP/1.1 400 Bad Request\r\n\r\n', undefined, () => socket.destroy());
return;
}
const q = new URLSearchParams(request.url.slice(11));
const [user, app] = await authenticate(request.headers.authorization, q.get('i'))
.catch(err => {
socket.write('HTTP/1.1 401 Unauthorized\r\n\r\n', undefined, () => socket.destroy());
return [];
});
if (typeof user === 'undefined') return;
if (user?.isSuspended) {
socket.write('HTTP/1.1 403 Forbidden\r\n\r\n', undefined, () => socket.destroy());
return;
}
ws.handleUpgrade(request, socket, head, (socket) => {
const ev = new EventEmitter();
async function onRedisMessage(_: string, data: string) {
const parsed = JSON.parse(data);
ev.emit(parsed.channel, parsed.message);
}
redisClient.on('message', onRedisMessage);
const main = new Connection(socket, ev, user, app);
// ping/pong mechanism
let pingTimeout: NodeJS.Timeout | null = null;
let disconnectTimeout = setTimeout(() => {
socket.terminate();
}, 60 * SECOND);;
function sendPing() {
socket.ping();
pingTimeout = setTimeout(() => {
sendPing();
}, 30 * SECOND);
}
function onPong() {
disconnectTimeout.refresh()
}
sendPing();
socket.on('pong', onPong);
// keep user "online" while a stream is connected
const intervalId = user ? setInterval(() => {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}, 5 * MINUTE) : null;
if (user) {
Users.update(user.id, {
lastActiveDate: new Date(),
});
}
socket.once('close', () => {
ev.removeAllListeners();
main.dispose();
redisClient.off('message', onRedisMessage);
if (intervalId) clearInterval(intervalId);
if (pingTimeout) clearTimeout(pingTimeout);
if (disconnectTimeout) clearTimeout(disconnectTimeout);
});
});
});
};