server: track deletion completion

This commit is contained in:
Johann150 2023-01-08 18:50:35 +01:00
parent 85e985d13f
commit 80f72e21cd
Signed by untrusted user: Johann150
GPG key ID: 9EE6577A2A06F8F1
4 changed files with 38 additions and 13 deletions

View file

@ -1,7 +1,9 @@
import httpSignature from '@peertube/http-signature';
import { v4 as uuid } from 'uuid';
import Bull from 'bull';
import config from '@/config/index.js';
import { Users } from '@/models/index.js';
import { DriveFile } from '@/models/entities/drive-file.js';
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
import { IActivity } from '@/remote/activitypub/type.js';
@ -18,7 +20,7 @@ import { endedPollNotification } from './processors/ended-poll-notification.js';
import { queueLogger } from './logger.js';
import { getJobInfo } from './get-job-info.js';
import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js';
import { ThinUser } from './types.js';
import { DeliverJobData, ThinUser } from './types.js';
function renderError(e: Error): any {
return {
@ -35,6 +37,12 @@ const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db');
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
async function deletionRefCount(job: Bull.Job<DeliverJobData>): Promise<void> {
if (job.data.deletingUserId) {
await Users.decrement({ id: job.data.deletingUserId }, 'isDeleted', 1);
}
}
systemQueue
.on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
@ -46,8 +54,14 @@ systemQueue
deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
.on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
.on('completed', async (job, result) => {
deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`);
await deletionRefCount(job);
})
.on('failed', async (job, err) => {
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`);
await deletionRefCount(job);
})
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
@ -83,7 +97,7 @@ webhookDeliverQueue
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
export function deliver(user: ThinUser, content: unknown, to: string | null) {
export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) {
if (content == null) return null;
if (to == null) return null;
@ -93,6 +107,7 @@ export function deliver(user: ThinUser, content: unknown, to: string | null) {
},
content,
to,
deletingUserId,
};
return deliverQueue.add(data, {
@ -326,8 +341,9 @@ export default function() {
}
export function destroy() {
deliverQueue.once('cleaned', (jobs, status) => {
deliverQueue.once('cleaned', async (jobs, status) => {
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
await Promise.all(jobs.map(job => deletionRefCount(job));
});
deliverQueue.clean(0, 'delayed');

View file

@ -12,6 +12,8 @@ export type DeliverJobData = {
content: unknown;
/** inbox URL to deliver */
to: string;
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */
deletingUserId?: string;
};
export type InboxJobData = {

View file

@ -88,10 +88,10 @@ export class DeliverManager {
/**
* Execute delivers
*/
public async execute() {
public async execute(deletingUserId?: string) {
if (!Users.isLocalUser(this.actor)) return;
const inboxes = new Set<string>();
let inboxes = new Set<string>();
/*
build inbox list
@ -150,13 +150,17 @@ export class DeliverManager {
)),
);
// deliver
for (const inbox of inboxes) {
// skip instances as indicated
if (instancesToSkip.includes(new URL(inbox).host)) continue;
inboxes = inboxes.entries()
.filter(inbox => !instancesToSkip.includes(new URL(inbox).host));
deliver(this.actor, this.activity, inbox);
if (deletingUserId) {
await Users.update(deletingUserId, {
// set deletion job count for reference counting before queueing jobs
isDeleted: inboxes.length,
});
}
inboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId));
}
}

View file

@ -6,6 +6,9 @@ import { User } from '@/models/entities/user.js';
import { Users } from '@/models/index.js';
import { publishInternalEvent } from '@/services/stream.js';
/**
* Sends an internal event and for local users queues the delete activites.
*/
export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise<void> {
publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true });
@ -15,6 +18,6 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }
// deliver to all of known network
const dm = new DeliverManager(user, content);
dm.addEveryone();
await dm.execute();
await dm.execute(user.id);
}
}