forked from FoundKeyGang/FoundKey
Revert 'Revert "server: fix user deletion race condition"'
This reverts commit bb3ec8bafe
.
This commit is contained in:
parent
2fde652b4a
commit
4312393169
12 changed files with 71 additions and 22 deletions
|
@ -0,0 +1,18 @@
|
||||||
|
export class deletionProgress1673201544000 {
|
||||||
|
name = 'deletionProgress1673201544000';
|
||||||
|
|
||||||
|
async up(queryRunner) {
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`);
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" integer`);
|
||||||
|
await queryRunner.query(`UPDATE "user" SET "isDeleted" = CASE WHEN "host" IS NULL THEN -1 ELSE 0 END WHERE "isDeletedOld"`);
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`);
|
||||||
|
}
|
||||||
|
|
||||||
|
async down(queryRunner) {
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" RENAME COLUMN "isDeleted" TO "isDeletedOld"`);
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" ADD "isDeleted" boolean NOT NULL DEFAULT false`);
|
||||||
|
await queryRunner.query(`UPDATE "user" SET "isDeleted" = "isDeletedOld" IS NOT NULL`);
|
||||||
|
await queryRunner.query(`ALTER TABLE "user" DROP COLUMN "isDeletedOld"`);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -163,11 +163,11 @@ export class User {
|
||||||
// Indicates the user was deleted by an admin.
|
// Indicates the user was deleted by an admin.
|
||||||
// The users' data is not deleted from the database to keep them from reappearing.
|
// The users' data is not deleted from the database to keep them from reappearing.
|
||||||
// A hard delete of the record may follow if we receive a matching Delete activity.
|
// A hard delete of the record may follow if we receive a matching Delete activity.
|
||||||
@Column('boolean', {
|
@Column('integer', {
|
||||||
default: false,
|
nullable: true,
|
||||||
comment: 'Whether the User is deleted.',
|
comment: 'How many delivery jobs are outstanding before the deletion is completed.',
|
||||||
})
|
})
|
||||||
public isDeleted: boolean;
|
public isDeleted: number | null;
|
||||||
|
|
||||||
@Column('varchar', {
|
@Column('varchar', {
|
||||||
length: 128, array: true, default: '{}',
|
length: 128, array: true, default: '{}',
|
||||||
|
|
|
@ -381,7 +381,7 @@ export const UserRepository = db.getRepository(User).extend({
|
||||||
autoAcceptFollowed: profile!.autoAcceptFollowed,
|
autoAcceptFollowed: profile!.autoAcceptFollowed,
|
||||||
noCrawle: profile!.noCrawle,
|
noCrawle: profile!.noCrawle,
|
||||||
isExplorable: user.isExplorable,
|
isExplorable: user.isExplorable,
|
||||||
isDeleted: user.isDeleted,
|
isDeleted: user.isDeleted != null,
|
||||||
hideOnlineStatus: user.hideOnlineStatus,
|
hideOnlineStatus: user.hideOnlineStatus,
|
||||||
hasUnreadSpecifiedNotes: NoteUnreads.count({
|
hasUnreadSpecifiedNotes: NoteUnreads.count({
|
||||||
where: { userId: user.id, isSpecified: true },
|
where: { userId: user.id, isSpecified: true },
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import httpSignature from '@peertube/http-signature';
|
import httpSignature from '@peertube/http-signature';
|
||||||
import { v4 as uuid } from 'uuid';
|
import { v4 as uuid } from 'uuid';
|
||||||
|
import Bull from 'bull';
|
||||||
|
|
||||||
import config from '@/config/index.js';
|
import config from '@/config/index.js';
|
||||||
|
import { Users } from '@/models/index.js';
|
||||||
import { DriveFile } from '@/models/entities/drive-file.js';
|
import { DriveFile } from '@/models/entities/drive-file.js';
|
||||||
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
||||||
import { IActivity } from '@/remote/activitypub/type.js';
|
import { IActivity } from '@/remote/activitypub/type.js';
|
||||||
|
@ -17,7 +19,7 @@ import { endedPollNotification } from './processors/ended-poll-notification.js';
|
||||||
import { queueLogger } from './logger.js';
|
import { queueLogger } from './logger.js';
|
||||||
import { getJobInfo } from './get-job-info.js';
|
import { getJobInfo } from './get-job-info.js';
|
||||||
import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js';
|
import { systemQueue, dbQueue, deliverQueue, inboxQueue, objectStorageQueue, endedPollNotificationQueue, webhookDeliverQueue } from './queues.js';
|
||||||
import { ThinUser } from './types.js';
|
import { DeliverJobData, ThinUser } from './types.js';
|
||||||
|
|
||||||
function renderError(e: Error): any {
|
function renderError(e: Error): any {
|
||||||
return {
|
return {
|
||||||
|
@ -34,6 +36,12 @@ const inboxLogger = queueLogger.createSubLogger('inbox');
|
||||||
const dbLogger = queueLogger.createSubLogger('db');
|
const dbLogger = queueLogger.createSubLogger('db');
|
||||||
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
|
const objectStorageLogger = queueLogger.createSubLogger('objectStorage');
|
||||||
|
|
||||||
|
async function deletionRefCount(job: Bull.Job<DeliverJobData>): Promise<void> {
|
||||||
|
if (job.data.deletingUserId) {
|
||||||
|
await Users.decrement({ id: job.data.deletingUserId }, 'isDeleted', 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
systemQueue
|
systemQueue
|
||||||
.on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
|
.on('waiting', (jobId) => systemLogger.debug(`waiting id=${jobId}`))
|
||||||
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
|
.on('active', (job) => systemLogger.debug(`active id=${job.id}`))
|
||||||
|
@ -45,8 +53,14 @@ systemQueue
|
||||||
deliverQueue
|
deliverQueue
|
||||||
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
|
.on('waiting', (jobId) => deliverLogger.debug(`waiting id=${jobId}`))
|
||||||
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
.on('active', (job) => deliverLogger.debug(`active ${getJobInfo(job, true)} to=${job.data.to}`))
|
||||||
.on('completed', (job, result) => deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`))
|
.on('completed', async (job, result) => {
|
||||||
.on('failed', (job, err) => deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`))
|
deliverLogger.debug(`completed(${result}) ${getJobInfo(job, true)} to=${job.data.to}`);
|
||||||
|
await deletionRefCount(job);
|
||||||
|
})
|
||||||
|
.on('failed', async (job, err) => {
|
||||||
|
deliverLogger.warn(`failed(${err}) ${getJobInfo(job)} to=${job.data.to}`);
|
||||||
|
await deletionRefCount(job);
|
||||||
|
})
|
||||||
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`))
|
.on('error', (job: any, err: Error) => deliverLogger.error(`error ${err}`))
|
||||||
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
.on('stalled', (job) => deliverLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
||||||
|
|
||||||
|
@ -82,7 +96,7 @@ webhookDeliverQueue
|
||||||
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
|
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
|
||||||
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
||||||
|
|
||||||
export function deliver(user: ThinUser, content: unknown, to: string | null) {
|
export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) {
|
||||||
if (content == null) return null;
|
if (content == null) return null;
|
||||||
if (to == null) return null;
|
if (to == null) return null;
|
||||||
|
|
||||||
|
@ -92,6 +106,7 @@ export function deliver(user: ThinUser, content: unknown, to: string | null) {
|
||||||
},
|
},
|
||||||
content,
|
content,
|
||||||
to,
|
to,
|
||||||
|
deletingUserId,
|
||||||
};
|
};
|
||||||
|
|
||||||
return deliverQueue.add(data, {
|
return deliverQueue.add(data, {
|
||||||
|
@ -323,8 +338,9 @@ export default function() {
|
||||||
}
|
}
|
||||||
|
|
||||||
export function destroy() {
|
export function destroy() {
|
||||||
deliverQueue.once('cleaned', (jobs, status) => {
|
deliverQueue.once('cleaned', async (jobs, status) => {
|
||||||
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
|
deliverLogger.succ(`Cleaned ${jobs.length} ${status} jobs`);
|
||||||
|
await Promise.all(jobs.map(job => deletionRefCount(job));
|
||||||
});
|
});
|
||||||
deliverQueue.clean(0, 'delayed');
|
deliverQueue.clean(0, 'delayed');
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
import Bull from 'bull';
|
import Bull from 'bull';
|
||||||
import { In, LessThan } from 'typeorm';
|
import { In, LessThan } from 'typeorm';
|
||||||
import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins } from '@/models/index.js';
|
import { AttestationChallenges, AuthSessions, Mutings, Notifications, PasswordResetRequests, Signins, Users } from '@/models/index.js';
|
||||||
import { publishUserEvent } from '@/services/stream.js';
|
import { publishUserEvent } from '@/services/stream.js';
|
||||||
import { MINUTE, MONTH } from '@/const.js';
|
import { MINUTE, MONTH } from '@/const.js';
|
||||||
import { queueLogger } from '@/queue/logger.js';
|
import { queueLogger } from '@/queue/logger.js';
|
||||||
|
@ -52,6 +52,11 @@ export async function checkExpired(job: Bull.Job<Record<string, unknown>>, done:
|
||||||
createdAt: OlderThan(3 * MONTH),
|
createdAt: OlderThan(3 * MONTH),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await Users.delete({
|
||||||
|
// delete users where the deletion status reference count has come down to zero
|
||||||
|
isDeleted: 0,
|
||||||
|
});
|
||||||
|
|
||||||
logger.succ('Deleted expired data.');
|
logger.succ('Deleted expired data.');
|
||||||
|
|
||||||
done();
|
done();
|
||||||
|
|
|
@ -12,6 +12,8 @@ export type DeliverJobData = {
|
||||||
content: unknown;
|
content: unknown;
|
||||||
/** inbox URL to deliver */
|
/** inbox URL to deliver */
|
||||||
to: string;
|
to: string;
|
||||||
|
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */
|
||||||
|
deletingUserId?: string;
|
||||||
};
|
};
|
||||||
|
|
||||||
export type InboxJobData = {
|
export type InboxJobData = {
|
||||||
|
|
|
@ -88,10 +88,10 @@ export class DeliverManager {
|
||||||
/**
|
/**
|
||||||
* Execute delivers
|
* Execute delivers
|
||||||
*/
|
*/
|
||||||
public async execute() {
|
public async execute(deletingUserId?: string) {
|
||||||
if (!Users.isLocalUser(this.actor)) return;
|
if (!Users.isLocalUser(this.actor)) return;
|
||||||
|
|
||||||
const inboxes = new Set<string>();
|
let inboxes = new Set<string>();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
build inbox list
|
build inbox list
|
||||||
|
@ -150,13 +150,17 @@ export class DeliverManager {
|
||||||
)),
|
)),
|
||||||
);
|
);
|
||||||
|
|
||||||
// deliver
|
inboxes = inboxes.entries()
|
||||||
for (const inbox of inboxes) {
|
.filter(inbox => !instancesToSkip.includes(new URL(inbox).host));
|
||||||
// skip instances as indicated
|
|
||||||
if (instancesToSkip.includes(new URL(inbox).host)) continue;
|
|
||||||
|
|
||||||
deliver(this.actor, this.activity, inbox);
|
if (deletingUserId) {
|
||||||
|
await Users.update(deletingUserId, {
|
||||||
|
// set deletion job count for reference counting before queueing jobs
|
||||||
|
isDeleted: inboxes.length,
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@ export async function deleteActor(actor: IRemoteUser, uri: string): Promise<stri
|
||||||
// anyway, the user is gone now so dont care
|
// anyway, the user is gone now so dont care
|
||||||
return 'ok: gone';
|
return 'ok: gone';
|
||||||
}
|
}
|
||||||
if (user.isDeleted) {
|
if (user.isDeleted != null) {
|
||||||
// the actual deletion already happened by an admin, just delete the record
|
// the actual deletion already happened by an admin, just delete the record
|
||||||
await Users.delete(actor.id);
|
await Users.delete(actor.id);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
import { IsNull } from 'typeorm';
|
||||||
import { Users } from '@/models/index.js';
|
import { Users } from '@/models/index.js';
|
||||||
import { ApiError } from '@/server/api/error.js';
|
import { ApiError } from '@/server/api/error.js';
|
||||||
import { deleteAccount } from '@/services/delete-account.js';
|
import { deleteAccount } from '@/services/delete-account.js';
|
||||||
|
|
|
@ -27,7 +27,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
Users.findOneByOrFail({ id: user.id }),
|
Users.findOneByOrFail({ id: user.id }),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (userDetailed.isDeleted) {
|
if (userDetailed.isDeleted != null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,7 @@ export async function deleteAccount(user: {
|
||||||
}): Promise<void> {
|
}): Promise<void> {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
Users.update(user.id, {
|
Users.update(user.id, {
|
||||||
isDeleted: true,
|
isDeleted: -1,
|
||||||
}),
|
}),
|
||||||
// revoke all of the users access tokens to block API access
|
// revoke all of the users access tokens to block API access
|
||||||
AccessTokens.delete({
|
AccessTokens.delete({
|
||||||
|
|
|
@ -6,6 +6,9 @@ import { User } from '@/models/entities/user.js';
|
||||||
import { Users } from '@/models/index.js';
|
import { Users } from '@/models/index.js';
|
||||||
import { publishInternalEvent } from '@/services/stream.js';
|
import { publishInternalEvent } from '@/services/stream.js';
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sends an internal event and for local users queues the delete activites.
|
||||||
|
*/
|
||||||
export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise<void> {
|
export async function doPostSuspend(user: { id: User['id']; host: User['host'] }): Promise<void> {
|
||||||
publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true });
|
publishInternalEvent('userChangeSuspendedState', { id: user.id, isSuspended: true });
|
||||||
|
|
||||||
|
@ -15,6 +18,6 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }
|
||||||
// deliver to all of known network
|
// deliver to all of known network
|
||||||
const dm = new DeliverManager(user, content);
|
const dm = new DeliverManager(user, content);
|
||||||
dm.addEveryone();
|
dm.addEveryone();
|
||||||
await dm.execute();
|
await dm.execute(user.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue