From e6a8173378bb60ff34e3ab2112e55a205500dee3 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 23 May 2023 19:48:48 +0200 Subject: [PATCH] refactor onlyQueue and onlyServer configuration Instead of checking this configuration in the respective component (queue) or not at all (server), the configuration can be checked when starting the respective workers. --- packages/backend/src/boot/master.ts | 17 +++++++++++++---- packages/backend/src/queue/index.ts | 3 --- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/packages/backend/src/boot/master.ts b/packages/backend/src/boot/master.ts index 26303d140..c53dc12b1 100644 --- a/packages/backend/src/boot/master.ts +++ b/packages/backend/src/boot/master.ts @@ -141,15 +141,24 @@ async function connectDb(): Promise { async function spawnWorkers(clusterLimits: Required): Promise { const modes = ['web' as const, 'queue' as const]; + + const clusters = structuredClone(clusterLimits); + + if (envOption.onlyQueue) { + clusters.web = 0; + } else if (envOption.onlyServer) { + clusters.queue = 0; + } + const cpus = os.cpus().length; - for (const mode of modes.filter(mode => clusterLimits[mode] > cpus)) { + for (const mode of modes.filter(mode => clusters[mode] > cpus)) { bootLogger.warn(`configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`); } - const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0); + const total = modes.reduce((acc, mode) => acc + clusters[mode], 0); const workers = new Array(total); - workers.fill('web', 0, clusterLimits.web); - workers.fill('queue', clusterLimits.web); + workers.fill('web', 0, clusters.web); + workers.fill('queue', clusters.web); bootLogger.info(`Starting ${total} workers...`); await Promise.all(workers.map(mode => spawnWorker(mode))); diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 558f54ff0..0d0a89f14 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -5,7 +5,6 @@ import config from '@/config/index.js'; import { DriveFile } from '@/models/entities/drive-file.js'; import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js'; import { IActivity } from '@/remote/activitypub/type.js'; -import { envOption } from '@/env.js'; import { MINUTE } from '@/const.js'; import processDeliver from './processors/deliver.js'; @@ -289,8 +288,6 @@ export function webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[ } export default function() { - if (envOption.onlyServer) return; - deliverQueue.process(config.deliverJobConcurrency, processDeliver); inboxQueue.process(config.inboxJobConcurrency, processInbox); endedPollNotificationQueue.process(endedPollNotification);