Revert "server: fix user deletion race condition"

This reverts commit cc83cbe523, reversing
changes made to 8abd3ebec7.

This changeset contains:
* multiple type errors
* a foreign key incompatibility
* breaks outgoing note federation (in at least two ways)
This commit is contained in:
Chloe Kudryavtsev 2023-01-30 14:59:24 +01:00
parent 6fd80816fa
commit bb3ec8bafe
12 changed files with 23 additions and 72 deletions

View file

@ -1,18 +0,0 @@
export class deletionProgress1673201544000 {
name = 'deletionProgress1673201544000';
async up(queryRunner) {
await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`);
await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" integer`);
await queryRunner.query(`UPDATE "user" SET "isDeleted" = CASE WHEN "host" IS NULL THEN -1 ELSE 0 END WHERE "isDeletedOld"`);
await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`);
}
async down(queryRunner) {
await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`);
await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" boolean NOT NULL DEFAULT false`);
await queryRunner.query(`UPDATE "user" SET "isDeleted" = "isDeletedOld" IS NOT NULL`);
await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`);
}
}

View file

@ -163,11 +163,11 @@ export class User {
// Indicates the user was deleted by an admin. // Indicates the user was deleted by an admin.
// The users' data is not deleted from the database to keep them from reappearing. // The users' data is not deleted from the database to keep them from reappearing.
// A hard delete of the record may follow if we receive a matching Delete activity. // A hard delete of the record may follow if we receive a matching Delete activity.
@Column('integer', { @Column('boolean', {
nullable: true, default: false,
comment: 'How many delivery jobs are outstanding before the deletion is completed.', comment: 'Whether the User is deleted.',
}) })
public isDeleted: number | null; public isDeleted: boolean;
@Column('varchar', { @Column('varchar', {
length: 128, array: true, default: '{}', length: 128, array: true, default: '{}',

View file

@ -349,7 +349,7 @@ export const UserRepository = db.getRepository(User).extend({
autoAcceptFollowed: profile!.autoAcceptFollowed, autoAcceptFollowed: profile!.autoAcceptFollowed,
noCrawle: profile!.noCrawle, noCrawle: profile!.noCrawle,
isExplorable: user.isExplorable, isExplorable: user.isExplorable,
isDeleted: user.isDeleted != null, isDeleted: user.isDeleted,
hideOnlineStatus: user.hideOnlineStatus, hideOnlineStatus: user.hideOnlineStatus,
hasUnreadSpecifiedNotes: NoteUnreads.count({ hasUnreadSpecifiedNotes: NoteUnreads.count({
where: { userId: user.id, isSpecified: true }, where: { userId: user.id, isSpecified: true },

View file

@ -1,9 +1,7 @@
import httpSignature from '@peertube/http-signature'; import httpSignature from '@peertube/http-signature';
import { v4 as uuid } from 'uuid'; import { v4 as uuid } from 'uuid';
import Bull from 'bull';
import config from '@/config/index.js'; import config from '@/config/index.js';
import { Users } from '@/models/index.js';
import { DriveFile } from '@/models/entities/drive-file.js'; import { DriveFile } from '@/models/entities/drive-file.js';
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js'; import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
import { IActivity } from '@/remote/activitypub/type.js'; import { IActivity } from '@/remote/activitypub/type.js';
@ -20,7 +18,7 @@ import { endedPollNotification } from './processors/ended-poll-notification.js';
import { queueLogger } from './logger.js'; import { queueLogger } from './logger.js';
import { getJobInfo } from './get-job-info.js'; import { getJobInfo } from './get-job-info.js';
import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js'; import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js';
import { DeliverJobData, ThinUser } from './types.js'; import { ThinUser } from './types.js';
function renderError(e: Error): any { function renderError(e: Error): any {
return { return {
@ -37,12 +35,6 @@ const inboxLogger = queueLogger.createSubLogger('inbox');
const dbLogger = queueLogger.createSubLogger('db'); const dbLogger = queueLogger.createSubLogger('db');
const objectStorageLogger = queueLogger.createSubLogger('objectStorage'); const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
async function deletionRefCount(job: Bull.Job<DeliverJobData>): Promise<void> {
if (job.data.deletingUserId) {
await Users.decrement({ id: job.data.deletingUserId }, 'isDeleted', 1);
}
}
systemQueue systemQueue
.on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`)) .on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => systemLogger.debug(`active id=${job.id}`)) .on('active', (job) => systemLogger.debug(`active id=${job.id}`))
@ -54,14 +46,8 @@ systemQueue
deliverQueue deliverQueue
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`)) .on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`)) .on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
.on('completed', async (job, result) => { .on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`); .on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
await deletionRefCount(job);
})
.on('failed', async (job, err) => {
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`);
await deletionRefCount(job);
})
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); .on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
@ -97,7 +83,7 @@ webhookDeliverQueue
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) { export function deliver(user: ThinUser, content: unknown, to: string | null) {
if (content == null) return null; if (content == null) return null;
if (to == null) return null; if (to == null) return null;
@ -107,7 +93,6 @@ export function deliver(user: ThinUser, content: unknown, to: string | null, del
}, },
content, content,
to, to,
deletingUserId,
}; };
return deliverQueue.add(data, { return deliverQueue.add(data, {
@ -341,9 +326,8 @@ export default function() {
} }
export function destroy() { export function destroy() {
deliverQueue.once('cleaned', async (jobs, status) => { deliverQueue.once('cleaned', (jobs, status) => {
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`); deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
await Promise.all(jobs.map(job => deletionRefCount(job));
}); });
deliverQueue.clean(0, 'delayed'); deliverQueue.clean(0, 'delayed');

View file

@ -1,6 +1,6 @@
import Bull from 'bull'; import Bull from 'bull';
import { In, LessThan } from 'typeorm'; import { In, LessThan } from 'typeorm';
import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins, Users } from '@/models/index.js'; import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins } from '@/models/index.js';
import { publishUserEvent } from '@/services/stream.js'; import { publishUserEvent } from '@/services/stream.js';
import { MINUTE, MONTH } from '@/const.js'; import { MINUTE, MONTH } from '@/const.js';
import { queueLogger } from '@/queue/logger.js'; import { queueLogger } from '@/queue/logger.js';
@ -52,11 +52,6 @@ export async function checkExpired(job: Bull.Job<Record<string, unknown>>, done:
createdAt: OlderThan(3 * MONTH), createdAt: OlderThan(3 * MONTH),
}); });
await Users.delete({
// delete users where the deletion status reference count has come down to zero
isDeleted: 0,
});
logger.succ('Deleted expired data.'); logger.succ('Deleted expired data.');
done(); done();

View file

@ -12,8 +12,6 @@ export type DeliverJobData = {
content: unknown; content: unknown;
/** inbox URL to deliver */ /** inbox URL to deliver */
to: string; to: string;
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */
deletingUserId?: string;
}; };
export type InboxJobData = { export type InboxJobData = {

View file

@ -88,10 +88,10 @@ export class DeliverManager {
/** /**
* Execute delivers * Execute delivers
*/ */
public async execute(deletingUserId?: string) { public async execute() {
if (!Users.isLocalUser(this.actor)) return; if (!Users.isLocalUser(this.actor)) return;
let inboxes = new Set<string>(); const inboxes = new Set<string>();
/* /*
build inbox list build inbox list
@ -150,17 +150,13 @@ export class DeliverManager {
)), )),
); );
inboxes = inboxes.entries() // deliver
.filter(inbox => !instancesToSkip.includes(new URL(inbox).host)); for (const inbox of inboxes) {
// skip instances as indicated
if (instancesToSkip.includes(new URL(inbox).host)) continue;
if (deletingUserId) { deliver(this.actor, this.activity, inbox);
await Users.update(deletingUserId, {
// set deletion job count for reference counting before queueing jobs
isDeleted: inboxes.length,
});
} }
inboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId));
} }
} }

View file

@ -16,7 +16,7 @@ export async function deleteActor(actor: CacheableRemoteUser, uri: string): Prom
// anyway, the user is gone now so dont care // anyway, the user is gone now so dont care
return 'ok: gone'; return 'ok: gone';
} }
if (user.isDeleted != null) { if (user.isDeleted) {
// the actual deletion already happened by an admin, just delete the record // the actual deletion already happened by an admin, just delete the record
await Users.delete(actor.id); await Users.delete(actor.id);
} else { } else {

View file

@ -1,4 +1,3 @@
import { IsNull } from 'typeorm';
import { Users } from '@/models/index.js'; import { Users } from '@/models/index.js';
import { ApiError } from '@/server/api/error.js'; import { ApiError } from '@/server/api/error.js';
import { deleteAccount } from '@/services/delete-account.js'; import { deleteAccount } from '@/services/delete-account.js';
@ -25,7 +24,7 @@ export const paramDef = {
export default define(meta, paramDef, async (ps) => { export default define(meta, paramDef, async (ps) => {
const user = await Users.findOneBy({ const user = await Users.findOneBy({
id: ps.userId, id: ps.userId,
isDeleted: IsNull(), isDeleted: false,
}); });
if (user == null) { if (user == null) {

View file

@ -27,7 +27,7 @@ export default define(meta, paramDef, async (ps, user) => {
Users.findOneByOrFail({ id: user.id }), Users.findOneByOrFail({ id: user.id }),
]); ]);
if (userDetailed.isDeleted != null) { if (userDetailed.isDeleted) {
return; return;
} }

View file

@ -8,7 +8,7 @@ export async function deleteAccount(user: {
host: string | null; host: string | null;
}): Promise<void> { }): Promise<void> {
await Users.update(user.id, { await Users.update(user.id, {
isDeleted: -1, isDeleted: true,
}); });
if (Users.isLocalUser(user)) { if (Users.isLocalUser(user)) {

View file

@ -6,9 +6,6 @@ import { User } from '@/models/entities/user.js';
import { Users } from '@/models/index.js'; import { Users } from '@/models/index.js';
import { publishInternalEvent } from '@/services/stream.js'; import { publishInternalEvent } from '@/services/stream.js';
/**
* Sends an internal event and for local users queues the delete activites.
*/
export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise<void> { export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise<void> {
publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true }); publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true });
@ -18,6 +15,6 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }
// deliver to all of known network // deliver to all of known network
const dm = new DeliverManager(user, content); const dm = new DeliverManager(user, content);
dm.addEveryone(); dm.addEveryone();
await dm.execute(user.id); await dm.execute();
} }
} }