forked from FoundKeyGang/FoundKey
backend: add automatic dead instance detection
It works by having a day-long cache of "when did we last successfully communicate with this instance?" Anything over a specified threshold (1 month) will act as though the instance is suspended - all outgoing jobs are dropped on processing. The day-long cache is in place because the ordering is necessarily a linear scan. Once an instance comes back online, we will detect that is the case as soon as we receive an activity from them (which will update the "last communicated at") field. Potential future TODOs: * Improve the caching system, it's actually pretty inefficient as it is. CacheBox with a call override? * Think of ways to make it not-a-linear-scan, since the instances table can get pretty big. It's around 4500 on toast cafe. ChangeLog: Added
This commit is contained in:
parent
756ecbb1f7
commit
91a4f38871
1 changed files with 23 additions and 7 deletions
|
@ -12,36 +12,52 @@ import { Cache } from '@/misc/cache.js';
|
||||||
import { Instance } from '@/models/entities/instance.js';
|
import { Instance } from '@/models/entities/instance.js';
|
||||||
import { StatusError } from '@/misc/fetch.js';
|
import { StatusError } from '@/misc/fetch.js';
|
||||||
import { DeliverJobData } from '@/queue/types.js';
|
import { DeliverJobData } from '@/queue/types.js';
|
||||||
|
import { LessThan } from 'typeorm';
|
||||||
|
|
||||||
const logger = new Logger('deliver');
|
const logger = new Logger('deliver');
|
||||||
|
|
||||||
let latest: string | null = null;
|
let latest: string | null = null;
|
||||||
|
|
||||||
const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
|
const suspendedHostsCache = new Cache<Instance[]>(1000 * 60 * 60);
|
||||||
|
// dead host list is a linear scan, so cache it longer
|
||||||
|
const deadHostsCache = new Cache<Instance[]>(1000 * 60 * 60 * 24);
|
||||||
|
const deadThreshold = 1000 * 60 * 60 * 24 * 30; // 1 month
|
||||||
|
|
||||||
export default async (job: Bull.Job<DeliverJobData>) => {
|
export default async (job: Bull.Job<DeliverJobData>) => {
|
||||||
const { host } = new URL(job.data.to);
|
const { host } = new URL(job.data.to);
|
||||||
|
const puny = toPuny(host);
|
||||||
|
|
||||||
// ブロックしてたら中断
|
// ブロックしてたら中断
|
||||||
const meta = await fetchMeta();
|
const meta = await fetchMeta();
|
||||||
if (meta.blockedHosts.includes(toPuny(host))) {
|
if (meta.blockedHosts.includes(puny)) {
|
||||||
return 'skip (blocked)';
|
return 'skip (blocked)';
|
||||||
}
|
}
|
||||||
|
|
||||||
// isSuspendedなら中断
|
// isSuspended
|
||||||
let suspendedHosts = suspendedHostsCache.get(null);
|
let suspendedHosts = suspendedHostsCache.get(null);
|
||||||
if (suspendedHosts == null) {
|
if (suspendedHosts == null) {
|
||||||
suspendedHosts = await Instances.find({
|
suspendedHosts = await Instances.findBy({
|
||||||
where: {
|
isSuspended: true,
|
||||||
isSuspended: true,
|
|
||||||
},
|
|
||||||
});
|
});
|
||||||
suspendedHostsCache.set(null, suspendedHosts);
|
suspendedHostsCache.set(null, suspendedHosts);
|
||||||
}
|
}
|
||||||
if (suspendedHosts.map(x => x.host).includes(toPuny(host))) {
|
if (suspendedHosts.some(x => x.host == puny)) {
|
||||||
return 'skip (suspended)';
|
return 'skip (suspended)';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dead
|
||||||
|
let deadHosts = deadHostsCache.get(null);
|
||||||
|
if (deadHosts == null) {
|
||||||
|
const deadTime = new Date(Date.now() - deadThreshold);
|
||||||
|
deadHosts = await Instances.findBy({
|
||||||
|
lastCommunicatedAt: LessThan(deadTime),
|
||||||
|
});
|
||||||
|
deadHostsCache.set(null, deadHosts);
|
||||||
|
}
|
||||||
|
if (deadHosts.some(x => x.host == puny)) {
|
||||||
|
return 'skip (dead instance)';
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
if (latest !== (latest = JSON.stringify(job.data.content, null, 2))) {
|
||||||
logger.debug(`delivering ${latest}`);
|
logger.debug(`delivering ${latest}`);
|
||||||
|
|
Loading…
Reference in a new issue