From 2aafe8fc9f893a631b5900d54bf19cb22fba2f79 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Sat, 29 Oct 2022 03:13:22 +0200 Subject: [PATCH] server: avoid adding suspended instances to deliver queue This should reduce the performance hit when adding large numbers of instances to the deliver queue by making the check for suspended and dead instances a bulk operation. Changelog: Changed Reviewed-on: https://akkoma.dev/FoundKeyGang/FoundKey/pulls/215 --- .../backend/src/misc/skipped-instances.ts | 54 +++++++++++++++++++ .../backend/src/queue/processors/deliver.ts | 23 +------- .../src/remote/activitypub/deliver-manager.ts | 12 +++++ 3 files changed, 68 insertions(+), 21 deletions(-) create mode 100644 packages/backend/src/misc/skipped-instances.ts diff --git a/packages/backend/src/misc/skipped-instances.ts b/packages/backend/src/misc/skipped-instances.ts new file mode 100644 index 000000000..2d0d5a7ae --- /dev/null +++ b/packages/backend/src/misc/skipped-instances.ts @@ -0,0 +1,54 @@ +import { Brackets } from 'typeorm'; +import { fetchMeta } from '@/misc/fetch-meta.js'; +import { Instances } from '@/models/index.js'; +import { Instance } from '@/models/entities/instance.js'; +import { DAY } from '@/const.js'; + +// Threshold from last contact after which an instance will be considered +// "dead" and should no longer get activities delivered to it. +const deadThreshold = 30 * DAY; + +/** + * Returns the subset of hosts which should be skipped. + * + * @param hosts array of punycoded instance hosts + * @returns array of punycoed instance hosts that should be skipped (subset of hosts parameter) + */ +export async function skippedInstances(hosts: Array): Array { + // first check for blocked instances since that info may already be in memory + const { blockedHosts } = await fetchMeta(); + + const skipped = hosts.filter(host => blockedHosts.includes(host)); + // if possible return early and skip accessing the database + if (skipped.length === hosts.length) return hosts; + + const deadTime = new Date(Date.now() - deadThreshold); + + return skipped.concat( + await Instances.createQueryBuilder('instance') + .where('instance.host in (:...hosts)', { + // don't check hosts again that we already know are suspended + // also avoids adding duplicates to the list + hosts: hosts.filter(host => !skipped.includes(host)), + }) + .andWhere(new Brackets(qb => { qb + .where('instance.isSuspended') + .orWhere('instance.lastCommunicatedAt < :deadTime', { deadTime }); + })) + .select('host') + .getRawMany() + ); +} + +/** + * Returns whether a specific host (punycoded) should be skipped. + * Convenience wrapper around skippedInstances which should only be used if there is a single host to check. + * If you have multiple hosts, consider using skippedInstances instead to do a bulk check. + * + * @param host punycoded instance host + * @returns whether the given host should be skipped + */ +export async function shouldSkipInstance(host: Instance['host']): boolean { + const skipped = await skippedInstances([host]); + return skipped.length > 0; +} diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index 1f99b305d..3de0783e9 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -6,39 +6,20 @@ import Logger from '@/services/logger.js'; import { Instances } from '@/models/index.js'; import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js'; import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js'; -import { fetchMeta } from '@/misc/fetch-meta.js'; import { toPuny } from '@/misc/convert-host.js'; -import { Cache } from '@/misc/cache.js'; -import { Instance } from '@/models/entities/instance.js'; import { StatusError } from '@/misc/fetch.js'; +import { shouldSkipInstance } from '@/misc/skipped-instances.js'; import { DeliverJobData } from '@/queue/types.js'; -import { LessThan } from 'typeorm'; -import { DAY } from '@/const.js'; const logger = new Logger('deliver'); let latest: string | null = null; -const deadThreshold = 30 * DAY; - export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); const puny = toPuny(host); - // ブロックしてたら中断 - const meta = await fetchMeta(); - if (meta.blockedHosts.includes(puny)) { - return 'skip (blocked)'; - } - - const deadTime = new Date(Date.now() - deadThreshold); - const isSuspendedOrDead = await Instances.countBy([ - { host: puny, isSuspended: true }, - { host: puny, lastCommunicatedAt: LessThan(deadTime) }, - ]); - if (isSuspendedOrDead) { - return 'skip (suspended or dead)'; - } + if (await shouldSkipInstance(puny)) return 'skip'; try { if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 1f5702ae7..55054bf35 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -2,6 +2,7 @@ import { IsNull, Not } from 'typeorm'; import { ILocalUser, IRemoteUser, User } from '@/models/entities/user.js'; import { Users, Followings } from '@/models/index.js'; import { deliver } from '@/queue/index.js'; +import { skippedInstances } from '@/misc/skipped-instances.js'; //#region types interface IRecipe { @@ -150,8 +151,19 @@ export default class DeliverManager { ) .forEach(recipe => inboxes.add(recipe.to.inbox!)); + const instancesToSkip = await skippedInstances( + // get (unique) list of hosts + Array.from(new Set( + Array.from(inboxes) + .map(inbox => new URL(inbox).host) + )) + ); + // deliver for (const inbox of inboxes) { + // skip instances as indicated + if (instancesToSkip.includes(new URL(inbox).host)) continue; + deliver(this.actor, this.activity, inbox); } }