diff --git a/packages/backend/src/queue/processors/deliver.ts b/packages/backend/src/queue/processors/deliver.ts index 108379aa8..a518c4116 100644 --- a/packages/backend/src/queue/processors/deliver.ts +++ b/packages/backend/src/queue/processors/deliver.ts @@ -12,36 +12,52 @@ import { Cache } from '@/misc/cache.js'; import { Instance } from '@/models/entities/instance.js'; import { StatusError } from '@/misc/fetch.js'; import { DeliverJobData } from '@/queue/types.js'; +import { LessThan } from 'typeorm'; const logger = new Logger('deliver'); let latest: string | null = null; const suspendedHostsCache = new Cache(1000 * 60 * 60); +// dead host list is a linear scan, so cache it longer +const deadHostsCache = new Cache(1000 * 60 * 60 * 24); +const deadThreshold = 1000 * 60 * 60 * 24 * 30; // 1 month export default async (job: Bull.Job) => { const { host } = new URL(job.data.to); + const puny = toPuny(host); // ブロックしてたら中断 const meta = await fetchMeta(); - if (meta.blockedHosts.includes(toPuny(host))) { + if (meta.blockedHosts.includes(puny)) { return 'skip (blocked)'; } - // isSuspendedなら中断 + // isSuspended let suspendedHosts = suspendedHostsCache.get(null); if (suspendedHosts == null) { - suspendedHosts = await Instances.find({ - where: { - isSuspended: true, - }, + suspendedHosts = await Instances.findBy({ + isSuspended: true, }); suspendedHostsCache.set(null, suspendedHosts); } - if (suspendedHosts.map(x => x.host).includes(toPuny(host))) { + if (suspendedHosts.some(x => x.host == puny)) { return 'skip (suspended)'; } + // dead + let deadHosts = deadHostsCache.get(null); + if (deadHosts == null) { + const deadTime = new Date(Date.now() - deadThreshold); + deadHosts = await Instances.findBy({ + lastCommunicatedAt: LessThan(deadTime), + }); + deadHostsCache.set(null, deadHosts); + } + if (deadHosts.some(x => x.host == puny)) { + return 'skip (dead instance)'; + } + try { if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) { logger.debug(`delivering ${latest}`);