implement separate web workers #252
5 changed files with 58 additions and 15 deletions
|
@ -78,8 +78,11 @@ redis:
|
||||||
# Whether disable HSTS
|
# Whether disable HSTS
|
||||||
#disableHsts: true
|
#disableHsts: true
|
||||||
|
|
||||||
# Number of worker processes
|
# Number of worker processes by type.
|
||||||
#clusterLimit: 1
|
# The sum must not exceed the number of available cores.
|
||||||
|
#clusterLimits:
|
||||||
|
# web: 1
|
||||||
|
# queue: 1
|
||||||
|
|
||||||
# Job concurrency per worker
|
# Job concurrency per worker
|
||||||
# deliverJobConcurrency: 128
|
# deliverJobConcurrency: 128
|
||||||
|
|
|
@ -66,7 +66,7 @@ export async function masterMain(): Promise<void> {
|
||||||
bootLogger.succ('FoundKey initialized');
|
bootLogger.succ('FoundKey initialized');
|
||||||
|
|
||||||
if (!envOption.disableClustering) {
|
if (!envOption.disableClustering) {
|
||||||
await spawnWorkers(config.clusterLimit);
|
await spawnWorkers(config.clusterLimits);
|
||||||
}
|
}
|
||||||
|
|
||||||
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, null, true);
|
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, null, true);
|
||||||
|
@ -139,16 +139,26 @@ async function connectDb(): Promise<void> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async function spawnWorkers(limit = 1): Promise<void> {
|
async function spawnWorkers(clusterLimits: Required<Config['clusterLimits']>): Promise<void> {
|
||||||
const workers = Math.min(limit, os.cpus().length);
|
const modes = ['web', 'queue'];
|
||||||
bootLogger.info(`Starting ${workers} worker${workers === 1 ? '' : 's'}...`);
|
const cpus = os.cpus().length;
|
||||||
await Promise.all([...Array(workers)].map(spawnWorker));
|
for (const mode of modes.filter(mode => clusterLimits[mode] > cpus)) {
|
||||||
bootLogger.succ('All workers started');
|
bootLogger.warn(`configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0);
|
||||||
|
const workers = new Array(total);
|
||||||
|
workers.fill('web', 0, clusterLimits.web);
|
||||||
|
workers.fill('queue', clusterLimits.web);
|
||||||
toast marked this conversation as resolved
|
|||||||
|
|
||||||
|
bootLogger.info(`Starting ${total} workers...`);
|
||||||
|
await Promise.all(workers.map(mode => spawnWorker(mode)));
|
||||||
|
bootLogger.succ(`All workers started`);
|
||||||
}
|
}
|
||||||
|
|
||||||
function spawnWorker(): Promise<void> {
|
function spawnWorker(mode: 'web' | 'queue'): Promise<void> {
|
||||||
return new Promise(res => {
|
return new Promise(res => {
|
||||||
const worker = cluster.fork();
|
const worker = cluster.fork({ mode });
|
||||||
worker.on('message', message => {
|
worker.on('message', message => {
|
||||||
if (message === 'listenFailed') {
|
if (message === 'listenFailed') {
|
||||||
bootLogger.error('The server Listen failed due to the previous error.');
|
bootLogger.error('The server Listen failed due to the previous error.');
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import os from 'node:os';
|
||||||
import cluster from 'node:cluster';
|
import cluster from 'node:cluster';
|
||||||
import { initDb } from '@/db/postgre.js';
|
import { initDb } from '@/db/postgre.js';
|
||||||
|
|
||||||
|
@ -7,11 +8,20 @@ import { initDb } from '@/db/postgre.js';
|
||||||
export async function workerMain(): Promise<void> {
|
export async function workerMain(): Promise<void> {
|
||||||
await initDb();
|
await initDb();
|
||||||
|
|
||||||
// start server
|
if (!process.env.mode || process.env.mode === 'web') {
|
||||||
await import('../server/index.js').then(x => x.default());
|
// start server
|
||||||
|
await import('../server/index.js').then(x => x.default());
|
||||||
|
}
|
||||||
|
|
||||||
// start job queue
|
if (!process.env.mode || process.env.mode === 'queue') {
|
||||||
import('../queue/index.js').then(x => x.default());
|
// start job queue
|
||||||
|
import('../queue/index.js').then(x => x.default());
|
||||||
|
|
||||||
|
if (process.env.mode === 'queue') {
|
||||||
|
// if this is an exclusive queue worker, renice to have higher priority
|
||||||
|
os.setPriority(os.constants.priority.PRIORITY_BELOW_NORMAL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (cluster.isWorker) {
|
if (cluster.isWorker) {
|
||||||
// Send a 'ready' message to parent process
|
// Send a 'ready' message to parent process
|
||||||
|
|
|
@ -54,6 +54,23 @@ export default function load(): Config {
|
||||||
|
|
||||||
if (!config.redis.prefix) config.redis.prefix = mixin.host;
|
if (!config.redis.prefix) config.redis.prefix = mixin.host;
|
||||||
|
|
||||||
|
if (!config.clusterLimits) {
|
||||||
|
config.clusterLimits = {
|
||||||
|
web: 1,
|
||||||
|
queue: 1,
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
config.clusterLimits = {
|
||||||
|
web: 1,
|
||||||
|
queue: 1,
|
||||||
|
...config.clusterLimits,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (config.clusterLimits.web < 1 || config.clusterLimits.queue < 1) {
|
||||||
|
throw new Error('invalid cluster limits');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return Object.assign(config, mixin);
|
return Object.assign(config, mixin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,10 @@ export type Source = {
|
||||||
|
|
||||||
accesslog?: string;
|
accesslog?: string;
|
||||||
|
|
||||||
clusterLimit?: number;
|
clusterLimits?: {
|
||||||
|
web?: number;
|
||||||
|
queue?: number;
|
||||||
|
};
|
||||||
|
|
||||||
id: string;
|
id: string;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
This construction is a little odd and seems to me to be prone to off-by-ones.
I would write it as this:
Your example code wouldn't work because
new Array(n)
generates an array withn
empty slots andmap
basically ignores those.I've thought about it a bit and it's fine.
Could be a TODO for cleanup later, but this is fine and works as expected.