diff --git a/packages/backend/src/server/api/streaming.ts b/packages/backend/src/server/api/streaming.ts index 825896b56..2be442f8f 100644 --- a/packages/backend/src/server/api/streaming.ts +++ b/packages/backend/src/server/api/streaming.ts @@ -2,7 +2,7 @@ import { EventEmitter } from 'events'; import * as http from 'node:http'; import { WebSocketServer } from 'ws'; -import { MINUTE } from '@/const.js'; +import { SECOND, MINUTE } from '@/const.js'; import { subscriber as redisClient } from '@/db/redis.js'; import { Users } from '@/models/index.js'; import { Connection } from './stream/index.js'; @@ -43,6 +43,20 @@ export const initializeStreamingServer = (server: http.Server): void => { const main = new Connection(socket, ev, user, app); + // ping/pong mechanism + let pingTimeout = null; + function startHeartbeat() { + if (pingTimeout) clearTimeout(pingTimeout); + + socket.ping(); + pingTimeout = setTimeout(() => { + socket.terminate(); + }, 30 * SECOND); + } + startHeartbeat(); + socket.on('ping', () => { startHeartbeat(); }); + socket.on('pong', () => { startHeartbeat(); }); + // keep user "online" while a stream is connected const intervalId = user ? setInterval(() => { Users.update(user.id, { @@ -54,11 +68,13 @@ export const initializeStreamingServer = (server: http.Server): void => { lastActiveDate: new Date(), }); } + socket.once('close', () => { ev.removeAllListeners(); main.dispose(); redisClient.off('message', onRedisMessage); if (intervalId) clearInterval(intervalId); + if (pingTimeout) clearTimeout(pingTimeout); }); // ping/pong mechanism