diff --git a/.config/example.yml b/.config/example.yml index 86d970eaa..75be5157d 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -78,8 +78,11 @@ redis: # Whether disable HSTS #disableHsts: true -# Number of worker processes -#clusterLimit: 1 +# Number of worker processes by type. +# The sum must not exceed the number of available cores. +#clusterLimits: +# web: 1 +# queue: 1 # Job concurrency per worker # deliverJobConcurrency: 128 diff --git a/packages/backend/src/boot/master.ts b/packages/backend/src/boot/master.ts index d558ff739..f59e72ab3 100644 --- a/packages/backend/src/boot/master.ts +++ b/packages/backend/src/boot/master.ts @@ -66,7 +66,7 @@ export async function masterMain(): Promise { bootLogger.succ('FoundKey initialized'); if (!envOption.disableClustering) { - await spawnWorkers(config.clusterLimit); + await spawnWorkers(config.clusterLimits); } bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, null, true); @@ -139,16 +139,26 @@ async function connectDb(): Promise { } } -async function spawnWorkers(limit = 1): Promise { - const workers = Math.min(limit, os.cpus().length); - bootLogger.info(`Starting ${workers} worker${workers === 1 ? '' : 's'}...`); - await Promise.all([...Array(workers)].map(spawnWorker)); - bootLogger.succ('All workers started'); +async function spawnWorkers(clusterLimits: Required): Promise { + const modes = ['web', 'queue']; + const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0); + if (total > os.cpus().length) { + bootLogger.error(`configuration error: cluster limits total (${total}) must not exceed number of cores (${os.cpus().length})`); + process.exit(78); + } + + const workers = new Array(total); + workers.fill('web', 0, clusterLimits.web); + workers.fill('queue', clusterLimits.web); + + bootLogger.info(`Starting ${total} workers...`); + await Promise.all(workers.map(mode => spawnWorker(mode))); + bootLogger.succ(`All workers started`); } -function spawnWorker(): Promise { +function spawnWorker(mode: 'web' | 'queue'): Promise { return new Promise(res => { - const worker = cluster.fork(); + const worker = cluster.fork({ mode }); worker.on('message', message => { if (message === 'listenFailed') { bootLogger.error('The server Listen failed due to the previous error.'); diff --git a/packages/backend/src/boot/worker.ts b/packages/backend/src/boot/worker.ts index 7c4885126..567e929e0 100644 --- a/packages/backend/src/boot/worker.ts +++ b/packages/backend/src/boot/worker.ts @@ -1,3 +1,4 @@ +import os from 'node:os'; import cluster from 'node:cluster'; import { initDb } from '@/db/postgre.js'; @@ -7,11 +8,20 @@ import { initDb } from '@/db/postgre.js'; export async function workerMain(): Promise { await initDb(); - // start server - await import('../server/index.js').then(x => x.default()); + if (!process.env.mode || process.env.mode === 'web') { + // start server + await import('../server/index.js').then(x => x.default()); + } - // start job queue - import('../queue/index.js').then(x => x.default()); + if (!process.env.mode || process.env.mode === 'queue') { + // start job queue + import('../queue/index.js').then(x => x.default()); + + if (process.env.mode === 'queue') { + // if this is an exclusive queue worker, renice to have higher priority + os.setPriority(os.constants.priority.PRIORITY_BELOW_NORMAL); + } + } if (cluster.isWorker) { // Send a 'ready' message to parent process diff --git a/packages/backend/src/config/load.ts b/packages/backend/src/config/load.ts index bfdc11d7e..231f7c20b 100644 --- a/packages/backend/src/config/load.ts +++ b/packages/backend/src/config/load.ts @@ -54,6 +54,23 @@ export default function load(): Config { if (!config.redis.prefix) config.redis.prefix = mixin.host; + if (!config.clusterLimits) { + config.clusterLimits = { + web: 1, + queue: 1, + }; + } else { + config.clusterLimits = { + web: 1, + queue: 1, + ...config.clusterLimits, + }; + + if (config.clusterLimits.web < 1 || config.clusterLimits.queue < 1) { + throw new Error('invalid cluster limits'); + } + } + return Object.assign(config, mixin); } diff --git a/packages/backend/src/config/types.ts b/packages/backend/src/config/types.ts index ff49fb04e..7308fd7c7 100644 --- a/packages/backend/src/config/types.ts +++ b/packages/backend/src/config/types.ts @@ -45,7 +45,10 @@ export type Source = { accesslog?: string; - clusterLimit?: number; + clusterLimits?: { + web?: number; + queue?: number; + }; id: string;