diff --git a/.config/example.yml b/.config/example.yml index 7afa56fbe..cd08f76d6 100644 --- a/.config/example.yml +++ b/.config/example.yml @@ -125,6 +125,14 @@ autoAdmin: true # deliverJobConcurrency: 128 # inboxJobConcurrency: 16 +# Job rate limiter +# deliverJobPerSec: 128 +# inboxJobPerSec: 16 + +# Job attempts +# deliverJobMaxAttempts: 12 +# inboxJobMaxAttempts: 8 + # IP address family used for outgoing request (ipv4, ipv6 or dual) #outgoingAddressFamily: ipv4 diff --git a/src/config/types.ts b/src/config/types.ts index 2bf94af74..aeb2c1233 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -47,6 +47,10 @@ export type Source = { deliverJobConcurrency?: number; inboxJobConcurrency?: number; + deliverJobPerSec?: number; + inboxJobPerSec?: number; + deliverJobMaxAttempts?: number; + inboxJobMaxAttempts?: number; syslog: { host: string; diff --git a/src/queue/index.ts b/src/queue/index.ts index ad7453353..b1437da8b 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -13,7 +13,7 @@ import { queueLogger } from './logger'; import { DriveFile } from '../models/entities/drive-file'; import { getJobInfo } from './get-job-info'; -function initializeQueue(name: string) { +function initializeQueue(name: string, limitPerSec = -1) { return new Queue(name, { redis: { port: config.redis.port, @@ -21,7 +21,11 @@ function initializeQueue(name: string) { password: config.redis.pass, db: config.redis.db || 0, }, - prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue' + prefix: config.redis.prefix ? `${config.redis.prefix}:queue` : 'queue', + limiter: limitPerSec > 0 ? { + max: limitPerSec * 5, + duration: 5000 + } : undefined }); } @@ -33,8 +37,8 @@ function renderError(e: Error): any { }; } -export const deliverQueue = initializeQueue('deliver'); -export const inboxQueue = initializeQueue('inbox'); +export const deliverQueue = initializeQueue('deliver', config.deliverJobPerSec || 128); +export const inboxQueue = initializeQueue('inbox', config.inboxJobPerSec || 16); export const dbQueue = initializeQueue('db'); export const objectStorageQueue = initializeQueue('objectStorage'); @@ -85,7 +89,7 @@ export function deliver(user: ILocalUser, content: any, to: any) { }; return deliverQueue.add(data, { - attempts: 8, + attempts: config.deliverJobMaxAttempts || 12, backoff: { type: 'exponential', delay: 60 * 1000 @@ -102,7 +106,7 @@ export function inbox(activity: any, signature: httpSignature.IParsedSignature) }; return inboxQueue.add(data, { - attempts: 8, + attempts: config.inboxJobMaxAttempts || 8, backoff: { type: 'exponential', delay: 1000