BREAKING: implement separate web workers

There are now separate web and queue workers.

The configuration entry `clusterLimit` has been replaced by
`clusterLimits` which allows separate configuration of web and
queue workers.

Changelog: Changed
This commit is contained in:
Johann150 2022-11-25 12:56:49 +01:00
parent c7255dbea0
commit 48a60b03ea
Signed by untrusted user: Johann150
GPG key ID: 9EE6577A2A06F8F1
5 changed files with 58 additions and 15 deletions

View file

@ -78,8 +78,11 @@ redis:
# Whether disable HSTS
#disableHsts: true
# Number of worker processes
#clusterLimit: 1
# Number of worker processes by type.
# The sum must not exceed the number of available cores.
#clusterLimits:
# web: 1
# queue: 1
# Job concurrency per worker
# deliverJobConcurrency: 128

View file

@ -66,7 +66,7 @@ export async function masterMain(): Promise<void> {
bootLogger.succ('FoundKey initialized');
if (!envOption.disableClustering) {
await spawnWorkers(config.clusterLimit);
await spawnWorkers(config.clusterLimits);
}
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, null, true);
@ -139,16 +139,26 @@ async function connectDb(): Promise<void> {
}
}
async function spawnWorkers(limit = 1): Promise<void> {
const workers = Math.min(limit, os.cpus().length);
bootLogger.info(`Starting ${workers} worker${workers === 1 ? '' : 's'}...`);
await Promise.all([...Array(workers)].map(spawnWorker));
bootLogger.succ('All workers started');
async function spawnWorkers(clusterLimits: Required<Config['clusterLimits']>): Promise<void> {
const modes = ['web', 'queue'];
const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0);
if (total > os.cpus().length) {
bootLogger.error(`configuration error: cluster limits total (${total}) must not exceed number of cores (${os.cpus().length})`);
process.exit(78);
}
const workers = new Array(total);
workers.fill('web', 0, clusterLimits.web);
workers.fill('queue', clusterLimits.web);
bootLogger.info(`Starting ${total} workers...`);
await Promise.all(workers.map(mode => spawnWorker(mode)));
bootLogger.succ(`All workers started`);
}
function spawnWorker(): Promise<void> {
function spawnWorker(mode: 'web' | 'queue'): Promise<void> {
return new Promise(res => {
const worker = cluster.fork();
const worker = cluster.fork({ mode });
worker.on('message', message => {
if (message === 'listenFailed') {
bootLogger.error('The server Listen failed due to the previous error.');

View file

@ -1,3 +1,4 @@
import os from 'node:os';
import cluster from 'node:cluster';
import { initDb } from '@/db/postgre.js';
@ -7,12 +8,21 @@ import { initDb } from '@/db/postgre.js';
export async function workerMain(): Promise<void> {
await initDb();
if (!process.env.mode || process.env.mode === 'web') {
// start server
await import('../server/index.js').then(x => x.default());
}
if (!process.env.mode || process.env.mode === 'queue') {
// start job queue
import('../queue/index.js').then(x => x.default());
if (process.env.mode === 'queue') {
// if this is an exclusive queue worker, renice to have higher priority
os.setPriority(os.constants.priority.PRIORITY_BELOW_NORMAL);
}
}
if (cluster.isWorker) {
// Send a 'ready' message to parent process
process.send?.('ready');

View file

@ -54,6 +54,23 @@ export default function load(): Config {
if (!config.redis.prefix) config.redis.prefix = mixin.host;
if (!config.clusterLimits) {
config.clusterLimits = {
web: 1,
queue: 1,
};
} else {
config.clusterLimits = {
web: 1,
queue: 1,
...config.clusterLimits,
};
if (config.clusterLimits.web < 1 || config.clusterLimits.queue < 1) {
throw new Error('invalid cluster limits');
}
}
return Object.assign(config, mixin);
}

View file

@ -45,7 +45,10 @@ export type Source = {
accesslog?: string;
clusterLimit?: number;
clusterLimits?: {
web?: number;
queue?: number;
};
id: string;