From 431239316983ced7ffde03cacd7b7a87371b5f92 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Wed, 1 Feb 2023 23:22:26 +0100 Subject: [PATCH] Revert 'Revert "server: fix user deletion race condition"' This reverts commit bb3ec8bafe5d16eea061929271f72032c8aeb6f3. --- .../1673201544000-deletion-progress.js | 18 +++++++++++++ packages/backend/src/models/entities/user.ts | 8 +++--- .../backend/src/models/repositories/user.ts | 2 +- packages/backend/src/queue/index.ts | 26 +++++++++++++++---- .../queue/processors/system/check-expired.ts | 7 ++++- packages/backend/src/queue/types.ts | 2 ++ .../src/remote/activitypub/deliver-manager.ts | 18 ++++++++----- .../remote/activitypub/kernel/delete/actor.ts | 2 +- .../api/endpoints/admin/users/delete.ts | 1 + .../server/api/endpoints/i/delete-account.ts | 2 +- .../backend/src/services/delete-account.ts | 2 +- packages/backend/src/services/suspend-user.ts | 5 +++- 12 files changed, 71 insertions(+), 22 deletions(-) create mode 100644 packages/backend/migration/1673201544000-deletion-progress.js diff --git a/packages/backend/migration/1673201544000-deletion-progress.js b/packages/backend/migration/1673201544000-deletion-progress.js new file mode 100644 index 000000000..90aa5cbf8 --- /dev/null +++ b/packages/backend/migration/1673201544000-deletion-progress.js @@ -0,0 +1,18 @@ +export class deletionProgress1673201544000 { + name = 'deletionProgress1673201544000'; + + async up(queryRunner) { + await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`); + await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" integer`); + await queryRunner.query(`UPDATE "user" SET "isDeleted" = CASE WHEN "host" IS NULL THEN -1 ELSE 0 END WHERE "isDeletedOld"`); + await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`); + } + + async down(queryRunner) { + await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`); + await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" boolean NOT NULL DEFAULT false`); + await queryRunner.query(`UPDATE "user" SET "isDeleted" = "isDeletedOld" IS NOT NULL`); + await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`); + } +} + diff --git a/packages/backend/src/models/entities/user.ts b/packages/backend/src/models/entities/user.ts index 0c1e8092f..f2d53a984 100644 --- a/packages/backend/src/models/entities/user.ts +++ b/packages/backend/src/models/entities/user.ts @@ -163,11 +163,11 @@ export class User { // Indicates the user was deleted by an admin. // The users' data is not deleted from the database to keep them from reappearing. // A hard delete of the record may follow if we receive a matching Delete activity. - @Column('boolean', { - default: false, - comment: 'Whether the User is deleted.', + @Column('integer', { + nullable: true, + comment: 'How many delivery jobs are outstanding before the deletion is completed.', }) - public isDeleted: boolean; + public isDeleted: number | null; @Column('varchar', { length: 128, array: true, default: '{}', diff --git a/packages/backend/src/models/repositories/user.ts b/packages/backend/src/models/repositories/user.ts index aea591229..6549c24c6 100644 --- a/packages/backend/src/models/repositories/user.ts +++ b/packages/backend/src/models/repositories/user.ts @@ -381,7 +381,7 @@ export const UserRepository = db.getRepository(User).extend({ autoAcceptFollowed: profile!.autoAcceptFollowed, noCrawle: profile!.noCrawle, isExplorable: user.isExplorable, - isDeleted: user.isDeleted, + isDeleted: user.isDeleted != null, hideOnlineStatus: user.hideOnlineStatus, hasUnreadSpecifiedNotes: NoteUnreads.count({ where: { userId: user.id, isSpecified: true }, diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 0d0a89f14..4d4d9257d 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -1,7 +1,9 @@ import httpSignature from '@peertube/http-signature'; import { v4 as uuid } from 'uuid'; +import Bull from 'bull'; import config from '@/config/index.js'; +import { Users } from '@/models/index.js'; import { DriveFile } from '@/models/entities/drive-file.js'; import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js'; import { IActivity } from '@/remote/activitypub/type.js'; @@ -17,7 +19,7 @@ import { endedPollNotification } from './processors/ended-poll-notification.js'; import { queueLogger } from './logger.js'; import { getJobInfo } from './get-job-info.js'; import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js'; -import { ThinUser } from './types.js'; +import { DeliverJobData, ThinUser } from './types.js'; function renderError(e: Error): any { return { @@ -34,6 +36,12 @@ const inboxLogger = queueLogger.createSubLogger('inbox'); const dbLogger = queueLogger.createSubLogger('db'); const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); +async function deletionRefCount(job: Bull.Job): Promise { + if (job.data.deletingUserId) { + await Users.decrement({ id: job.data.deletingUserId }, 'isDeleted', 1); + } +} + systemQueue .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) .on('active', (job) => systemLogger.debug(`active id=${job.id}`)) @@ -45,8 +53,14 @@ systemQueue deliverQueue .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`)) - .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`)) + .on('completed', async (job, result) => { + deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`); + await deletionRefCount(job); + }) + .on('failed', async (job, err) => { + deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`); + await deletionRefCount(job); + }) .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`)) .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); @@ -82,7 +96,7 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`)) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export function deliver(user: ThinUser, content: unknown, to: string | null) { +export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) { if (content == null) return null; if (to == null) return null; @@ -92,6 +106,7 @@ export function deliver(user: ThinUser, content: unknown, to: string | null) { }, content, to, + deletingUserId, }; return deliverQueue.add(data, { @@ -323,8 +338,9 @@ export default function() { } export function destroy() { - deliverQueue.once('cleaned', (jobs, status) => { + deliverQueue.once('cleaned', async (jobs, status) => { deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); + await Promise.all(jobs.map(job => deletionRefCount(job)); }); deliverQueue.clean(0, 'delayed'); diff --git a/packages/backend/src/queue/processors/system/check-expired.ts b/packages/backend/src/queue/processors/system/check-expired.ts index eeb6149bb..71bd498e9 100644 --- a/packages/backend/src/queue/processors/system/check-expired.ts +++ b/packages/backend/src/queue/processors/system/check-expired.ts @@ -1,6 +1,6 @@ import Bull from 'bull'; import { In, LessThan } from 'typeorm'; -import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins } from '@/models/index.js'; +import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins, Users } from '@/models/index.js'; import { publishUserEvent } from '@/services/stream.js'; import { MINUTE, MONTH } from '@/const.js'; import { queueLogger } from '@/queue/logger.js'; @@ -52,6 +52,11 @@ export async function checkExpired(job: Bull.Job>, done: createdAt: OlderThan(3 * MONTH), }); + await Users.delete({ + // delete users where the deletion status reference count has come down to zero + isDeleted: 0, + }); + logger.succ('Deleted expired data.'); done(); diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 82bd28703..d745a1fc7 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -12,6 +12,8 @@ export type DeliverJobData = { content: unknown; /** inbox URL to deliver */ to: string; + /** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */ + deletingUserId?: string; }; export type InboxJobData = { diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 4bc651c98..dfadef150 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -88,10 +88,10 @@ export class DeliverManager { /** * Execute delivers */ - public async execute() { + public async execute(deletingUserId?: string) { if (!Users.isLocalUser(this.actor)) return; - const inboxes = new Set(); + let inboxes = new Set(); /* build inbox list @@ -150,13 +150,17 @@ export class DeliverManager { )), ); - // deliver - for (const inbox of inboxes) { - // skip instances as indicated - if (instancesToSkip.includes(new URL(inbox).host)) continue; + inboxes = inboxes.entries() + .filter(inbox => !instancesToSkip.includes(new URL(inbox).host)); - deliver(this.actor, this.activity, inbox); + if (deletingUserId) { + await Users.update(deletingUserId, { + // set deletion job count for reference counting before queueing jobs + isDeleted: inboxes.length, + }); } + + inboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId)); } } diff --git a/packages/backend/src/remote/activitypub/kernel/delete/actor.ts b/packages/backend/src/remote/activitypub/kernel/delete/actor.ts index 9513ea22f..a191c626a 100644 --- a/packages/backend/src/remote/activitypub/kernel/delete/actor.ts +++ b/packages/backend/src/remote/activitypub/kernel/delete/actor.ts @@ -16,7 +16,7 @@ export async function deleteActor(actor: IRemoteUser, uri: string): Promise { Users.findOneByOrFail({ id: user.id }), ]); - if (userDetailed.isDeleted) { + if (userDetailed.isDeleted != null) { return; } diff --git a/packages/backend/src/services/delete-account.ts b/packages/backend/src/services/delete-account.ts index 2fa6e004b..a52b5b5d4 100644 --- a/packages/backend/src/services/delete-account.ts +++ b/packages/backend/src/services/delete-account.ts @@ -9,7 +9,7 @@ export async function deleteAccount(user: { }): Promise { await Promise.all([ Users.update(user.id, { - isDeleted: true, + isDeleted: -1, }), // revoke all of the users access tokens to block API access AccessTokens.delete({ diff --git a/packages/backend/src/services/suspend-user.ts b/packages/backend/src/services/suspend-user.ts index 11e6266a0..7ed6bff80 100644 --- a/packages/backend/src/services/suspend-user.ts +++ b/packages/backend/src/services/suspend-user.ts @@ -6,6 +6,9 @@ import { User } from '@/models/entities/user.js'; import { Users } from '@/models/index.js'; import { publishInternalEvent } from '@/services/stream.js'; +/** + * Sends an internal event and for local users queues the delete activites. + */ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise { publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true }); @@ -15,6 +18,6 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] } // deliver to all of known network const dm = new DeliverManager(user, content); dm.addEveryone(); - await dm.execute(); + await dm.execute(user.id); } }