forked from FoundKeyGang/FoundKey
refactor onlyQueue and onlyServer configuration
Instead of checking this configuration in the respective component (queue) or not at all (server), the configuration can be checked when starting the respective workers.
This commit is contained in:
parent
239a52eb99
commit
e6a8173378
2 changed files with 13 additions and 7 deletions
|
@ -141,15 +141,24 @@ async function connectDb(): Promise<void> {
|
||||||
|
|
||||||
async function spawnWorkers(clusterLimits: Required<Config['clusterLimits']>): Promise<void> {
|
async function spawnWorkers(clusterLimits: Required<Config['clusterLimits']>): Promise<void> {
|
||||||
const modes = ['web' as const, 'queue' as const];
|
const modes = ['web' as const, 'queue' as const];
|
||||||
|
|
||||||
|
const clusters = structuredClone(clusterLimits);
|
||||||
|
|
||||||
|
if (envOption.onlyQueue) {
|
||||||
|
clusters.web = 0;
|
||||||
|
} else if (envOption.onlyServer) {
|
||||||
|
clusters.queue = 0;
|
||||||
|
}
|
||||||
|
|
||||||
const cpus = os.cpus().length;
|
const cpus = os.cpus().length;
|
||||||
for (const mode of modes.filter(mode => clusterLimits[mode] > cpus)) {
|
for (const mode of modes.filter(mode => clusters[mode] > cpus)) {
|
||||||
bootLogger.warn(`configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`);
|
bootLogger.warn(`configuration warning: cluster limit for ${mode} exceeds number of cores (${cpus})`);
|
||||||
}
|
}
|
||||||
|
|
||||||
const total = modes.reduce((acc, mode) => acc + clusterLimits[mode], 0);
|
const total = modes.reduce((acc, mode) => acc + clusters[mode], 0);
|
||||||
const workers = new Array(total);
|
const workers = new Array(total);
|
||||||
workers.fill('web', 0, clusterLimits.web);
|
workers.fill('web', 0, clusters.web);
|
||||||
workers.fill('queue', clusterLimits.web);
|
workers.fill('queue', clusters.web);
|
||||||
|
|
||||||
bootLogger.info(`Starting ${total} workers...`);
|
bootLogger.info(`Starting ${total} workers...`);
|
||||||
await Promise.all(workers.map(mode => spawnWorker(mode)));
|
await Promise.all(workers.map(mode => spawnWorker(mode)));
|
||||||
|
|
|
@ -5,7 +5,6 @@ import config from '@/config/index.js';
|
||||||
import { DriveFile } from '@/models/entities/drive-file.js';
|
import { DriveFile } from '@/models/entities/drive-file.js';
|
||||||
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
||||||
import { IActivity } from '@/remote/activitypub/type.js';
|
import { IActivity } from '@/remote/activitypub/type.js';
|
||||||
import { envOption } from '@/env.js';
|
|
||||||
import { MINUTE } from '@/const.js';
|
import { MINUTE } from '@/const.js';
|
||||||
|
|
||||||
import processDeliver from './processors/deliver.js';
|
import processDeliver from './processors/deliver.js';
|
||||||
|
@ -289,8 +288,6 @@ export function webhookDeliver(webhook: Webhook, type: typeof webhookEventTypes[
|
||||||
}
|
}
|
||||||
|
|
||||||
export default function() {
|
export default function() {
|
||||||
if (envOption.onlyServer) return;
|
|
||||||
|
|
||||||
deliverQueue.process(config.deliverJobConcurrency, processDeliver);
|
deliverQueue.process(config.deliverJobConcurrency, processDeliver);
|
||||||
inboxQueue.process(config.inboxJobConcurrency, processInbox);
|
inboxQueue.process(config.inboxJobConcurrency, processInbox);
|
||||||
endedPollNotificationQueue.process(endedPollNotification);
|
endedPollNotificationQueue.process(endedPollNotification);
|
||||||
|
|
Loading…
Reference in a new issue