forked from FoundKeyGang/FoundKey
server: avoid adding suspended instances to deliver queue
This should reduce the performance hit when adding large numbers of instances to the deliver queue by making the check for suspended and dead instances a bulk operation. Changelog: Changed Reviewed-on: FoundKeyGang/FoundKey#215
This commit is contained in:
parent
7a64a3858d
commit
2aafe8fc9f
3 changed files with 68 additions and 21 deletions
54
packages/backend/src/misc/skipped-instances.ts
Normal file
54
packages/backend/src/misc/skipped-instances.ts
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
import { Brackets } from 'typeorm';
|
||||||
|
import { fetchMeta } from '@/misc/fetch-meta.js';
|
||||||
|
import { Instances } from '@/models/index.js';
|
||||||
|
import { Instance } from '@/models/entities/instance.js';
|
||||||
|
import { DAY } from '@/const.js';
|
||||||
|
|
||||||
|
// Threshold from last contact after which an instance will be considered
|
||||||
|
// "dead" and should no longer get activities delivered to it.
|
||||||
|
const deadThreshold = 30 * DAY;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the subset of hosts which should be skipped.
|
||||||
|
*
|
||||||
|
* @param hosts array of punycoded instance hosts
|
||||||
|
* @returns array of punycoed instance hosts that should be skipped (subset of hosts parameter)
|
||||||
|
*/
|
||||||
|
export async function skippedInstances(hosts: Array<Instace['host']>): Array<Instance['host']> {
|
||||||
|
// first check for blocked instances since that info may already be in memory
|
||||||
|
const { blockedHosts } = await fetchMeta();
|
||||||
|
|
||||||
|
const skipped = hosts.filter(host => blockedHosts.includes(host));
|
||||||
|
// if possible return early and skip accessing the database
|
||||||
|
if (skipped.length === hosts.length) return hosts;
|
||||||
|
|
||||||
|
const deadTime = new Date(Date.now() - deadThreshold);
|
||||||
|
|
||||||
|
return skipped.concat(
|
||||||
|
await Instances.createQueryBuilder('instance')
|
||||||
|
.where('instance.host in (:...hosts)', {
|
||||||
|
// don't check hosts again that we already know are suspended
|
||||||
|
// also avoids adding duplicates to the list
|
||||||
|
hosts: hosts.filter(host => !skipped.includes(host)),
|
||||||
|
})
|
||||||
|
.andWhere(new Brackets(qb => { qb
|
||||||
|
.where('instance.isSuspended')
|
||||||
|
.orWhere('instance.lastCommunicatedAt < :deadTime', { deadTime });
|
||||||
|
}))
|
||||||
|
.select('host')
|
||||||
|
.getRawMany()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns whether a specific host (punycoded) should be skipped.
|
||||||
|
* Convenience wrapper around skippedInstances which should only be used if there is a single host to check.
|
||||||
|
* If you have multiple hosts, consider using skippedInstances instead to do a bulk check.
|
||||||
|
*
|
||||||
|
* @param host punycoded instance host
|
||||||
|
* @returns whether the given host should be skipped
|
||||||
|
*/
|
||||||
|
export async function shouldSkipInstance(host: Instance['host']): boolean {
|
||||||
|
const skipped = await skippedInstances([host]);
|
||||||
|
return skipped.length > 0;
|
||||||
|
}
|
|
@ -6,39 +6,20 @@ import Logger from '@/services/logger.js';
|
||||||
import { Instances } from '@/models/index.js';
|
import { Instances } from '@/models/index.js';
|
||||||
import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
|
import { apRequestChart, federationChart, instanceChart } from '@/services/chart/index.js';
|
||||||
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
|
import { fetchInstanceMetadata } from '@/services/fetch-instance-metadata.js';
|
||||||
import { fetchMeta } from '@/misc/fetch-meta.js';
|
|
||||||
import { toPuny } from '@/misc/convert-host.js';
|
import { toPuny } from '@/misc/convert-host.js';
|
||||||
import { Cache } from '@/misc/cache.js';
|
|
||||||
import { Instance } from '@/models/entities/instance.js';
|
|
||||||
import { StatusError } from '@/misc/fetch.js';
|
import { StatusError } from '@/misc/fetch.js';
|
||||||
|
import { shouldSkipInstance } from '@/misc/skipped-instances.js';
|
||||||
import { DeliverJobData } from '@/queue/types.js';
|
import { DeliverJobData } from '@/queue/types.js';
|
||||||
import { LessThan } from 'typeorm';
|
|
||||||
import { DAY } from '@/const.js';
|
|
||||||
|
|
||||||
const logger = new Logger('deliver');
|
const logger = new Logger('deliver');
|
||||||
|
|
||||||
let latest: string | null = null;
|
let latest: string | null = null;
|
||||||
|
|
||||||
const deadThreshold = 30 * DAY;
|
|
||||||
|
|
||||||
export default async (job: Bull.Job<DeliverJobData>) => {
|
export default async (job: Bull.Job<DeliverJobData>) => {
|
||||||
const { host } = new URL(job.data.to);
|
const { host } = new URL(job.data.to);
|
||||||
const puny = toPuny(host);
|
const puny = toPuny(host);
|
||||||
|
|
||||||
// ブロックしてたら中断
|
if (await shouldSkipInstance(puny)) return 'skip';
|
||||||
const meta = await fetchMeta();
|
|
||||||
if (meta.blockedHosts.includes(puny)) {
|
|
||||||
return 'skip (blocked)';
|
|
||||||
}
|
|
||||||
|
|
||||||
const deadTime = new Date(Date.now() - deadThreshold);
|
|
||||||
const isSuspendedOrDead = await Instances.countBy([
|
|
||||||
{ host: puny, isSuspended: true },
|
|
||||||
{ host: puny, lastCommunicatedAt: LessThan(deadTime) },
|
|
||||||
]);
|
|
||||||
if (isSuspendedOrDead) {
|
|
||||||
return 'skip (suspended or dead)';
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
||||||
|
|
|
@ -2,6 +2,7 @@ import { IsNull, Not } from 'typeorm';
|
||||||
import { ILocalUser, IRemoteUser, User } from '@/models/entities/user.js';
|
import { ILocalUser, IRemoteUser, User } from '@/models/entities/user.js';
|
||||||
import { Users, Followings } from '@/models/index.js';
|
import { Users, Followings } from '@/models/index.js';
|
||||||
import { deliver } from '@/queue/index.js';
|
import { deliver } from '@/queue/index.js';
|
||||||
|
import { skippedInstances } from '@/misc/skipped-instances.js';
|
||||||
|
|
||||||
//#region types
|
//#region types
|
||||||
interface IRecipe {
|
interface IRecipe {
|
||||||
|
@ -150,8 +151,19 @@ export default class DeliverManager {
|
||||||
)
|
)
|
||||||
.forEach(recipe => inboxes.add(recipe.to.inbox!));
|
.forEach(recipe => inboxes.add(recipe.to.inbox!));
|
||||||
|
|
||||||
|
const instancesToSkip = await skippedInstances(
|
||||||
|
// get (unique) list of hosts
|
||||||
|
Array.from(new Set(
|
||||||
|
Array.from(inboxes)
|
||||||
|
.map(inbox => new URL(inbox).host)
|
||||||
|
))
|
||||||
|
);
|
||||||
|
|
||||||
// deliver
|
// deliver
|
||||||
for (const inbox of inboxes) {
|
for (const inbox of inboxes) {
|
||||||
|
// skip instances as indicated
|
||||||
|
if (instancesToSkip.includes(new URL(inbox).host)) continue;
|
||||||
|
|
||||||
deliver(this.actor, this.activity, inbox);
|
deliver(this.actor, this.activity, inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue