From a786fd99fcac660e5c6e209fb218c5c3019870c9 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 18 Apr 2023 21:27:05 +0200 Subject: [PATCH 1/8] make delivery manager author agnostic --- .../src/remote/activitypub/deliver-manager.ts | 57 ++++++++++--------- packages/backend/src/services/note/create.ts | 4 +- packages/backend/src/services/note/delete.ts | 4 +- .../src/services/note/reaction/create.ts | 4 +- .../src/services/note/reaction/delete.ts | 4 +- packages/backend/src/services/suspend-user.ts | 2 +- 6 files changed, 38 insertions(+), 37 deletions(-) diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 4bc651c98..0262f94fe 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -14,6 +14,7 @@ interface IEveryoneRecipe extends IRecipe { interface IFollowersRecipe extends IRecipe { type: 'Followers'; + followee: ILocalUser; } interface IDirectRecipe extends IRecipe { @@ -32,26 +33,24 @@ const isDirect = (recipe: any): recipe is IDirectRecipe => //#endregion export class DeliverManager { - private actor: { id: User['id']; host: null; }; private activity: any; private recipes: IRecipe[] = []; /** * Constructor - * @param actor Actor * @param activity Activity to deliver */ - constructor(actor: { id: User['id']; host: null; }, activity: any) { - this.actor = actor; + constructor(activity: any) { this.activity = activity; } /** * Add recipe for followers deliver */ - public addFollowersRecipe() { + public addFollowersRecipe(followee: ILocalUser) { const deliver = { type: 'Followers', + followee, } as IFollowersRecipe; this.addRecipe(deliver); @@ -89,8 +88,6 @@ export class DeliverManager { * Execute delivers */ public async execute() { - if (!Users.isLocalUser(this.actor)) return; - const inboxes = new Set(); /* @@ -116,21 +113,25 @@ export class DeliverManager { } } - if (this.recipes.some(r => isFollowers(r))) { - // followers deliver - const followers = await Followings.createQueryBuilder('followings') - // return either the shared inbox (if available) or the individual inbox - .select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox') - // so we don't have to make our inboxes Set work as hard - .distinct(true) - // ...for the specific actors followers - .where('followings.followeeId = :actorId', { actorId: this.actor.id }) - // don't deliver to ourselves - .andWhere('followings.followerHost IS NOT NULL') - .getRawMany(); - - followers.forEach(({ inbox }) => inboxes.add(inbox)); - } + await Promise.all( + this.recipes.filter(isFollowers) + .map(recipe => { + // followers deliver + return Followings.createQueryBuilder('followings') + // return either the shared inbox (if available) or the individual inbox + .select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox') + // so we don't have to make our inboxes Set work as hard + .distinct(true) + // ...for the specific actors followers + .where('followings.followeeId = :actorId', { actorId: recipe.followee.id }) + // don't deliver to ourselves + .andWhere('followings.followerHost IS NOT NULL') + .getRawMany() + .then(followers => + followers.forEach(({ inbox }) => inboxes.add(inbox)) + ); + }) + ); this.recipes.filter((recipe): recipe is IDirectRecipe => // followers recipes have already been processed @@ -155,7 +156,7 @@ export class DeliverManager { // skip instances as indicated if (instancesToSkip.includes(new URL(inbox).host)) continue; - deliver(this.actor, this.activity, inbox); + deliver(null, this.activity, inbox); } } } @@ -166,9 +167,9 @@ export class DeliverManager { * @param activity Activity * @param from Followee */ -export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: null; }, activity: any) { - const manager = new DeliverManager(actor, activity); - manager.addFollowersRecipe(); +export async function deliverToFollowers(actor: ILocalUser, activity: any) { + const manager = new DeliverManager(activity); + manager.addFollowersRecipe(actor); await manager.execute(); } @@ -177,8 +178,8 @@ export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: nu * @param activity Activity * @param to Target user */ -export async function deliverToUser(actor: { id: ILocalUser['id']; host: null; }, activity: any, to: IRemoteUser) { - const manager = new DeliverManager(actor, activity); +export async function deliverToUser(activity: any, to: IRemoteUser) { + const manager = new DeliverManager(activity); manager.addDirectRecipe(to); await manager.execute(); } diff --git a/packages/backend/src/services/note/create.ts b/packages/backend/src/services/note/create.ts index 8f48bc12a..5fddfdcd3 100644 --- a/packages/backend/src/services/note/create.ts +++ b/packages/backend/src/services/note/create.ts @@ -432,7 +432,7 @@ export default async (user: { id: User['id']; username: User['username']; host: if (Users.isLocalUser(user) && !data.localOnly) { (async () => { const noteActivity = renderActivity(await renderNoteOrRenoteActivity(note)); - const dm = new DeliverManager(user, noteActivity); + const dm = new DeliverManager(noteActivity); // Delivered to remote users who have been mentioned for (const u of mentionedUsers.filter(u => Users.isRemoteUser(u))) { @@ -453,7 +453,7 @@ export default async (user: { id: User['id']; username: User['username']; host: // Deliver to followers if (['public', 'home', 'followers'].includes(note.visibility)) { - dm.addFollowersRecipe(); + dm.addFollowersRecipe(user); } if (['public'].includes(note.visibility)) { diff --git a/packages/backend/src/services/note/delete.ts b/packages/backend/src/services/note/delete.ts index f22ce80db..f8e7cd10b 100644 --- a/packages/backend/src/services/note/delete.ts +++ b/packages/backend/src/services/note/delete.ts @@ -60,8 +60,8 @@ export async function deleteNotes(notes: Note[], user?: User): Promise { // Compute addressing information. // Since we do not send any actual content, we send all note deletions to everyone. - const manager = new DeliverManager(fetchedUser, content); - manager.addFollowersRecipe(); + const manager = new DeliverManager(content); + manager.addFollowersRecipe(fetchedUser); manager.addEveryone(); // Check mentioned users, since not all may have a shared inbox. await Promise.all( diff --git a/packages/backend/src/services/note/reaction/create.ts b/packages/backend/src/services/note/reaction/create.ts index d50761a29..e35ceb991 100644 --- a/packages/backend/src/services/note/reaction/create.ts +++ b/packages/backend/src/services/note/reaction/create.ts @@ -130,14 +130,14 @@ export async function createReaction(user: { id: User['id']; host: User['host']; //#region 配信 if (Users.isLocalUser(user) && !note.localOnly) { const content = renderActivity(await renderLike(record, note)); - const dm = new DeliverManager(user, content); + const dm = new DeliverManager(content); if (note.userHost !== null) { const reactee = await Users.findOneBy({ id: note.userId }); dm.addDirectRecipe(reactee as IRemoteUser); } if (['public', 'home', 'followers'].includes(note.visibility)) { - dm.addFollowersRecipe(); + dm.addFollowersRecipe(user); } else if (note.visibility === 'specified') { const visibleUsers = await Promise.all(note.visibleUserIds.map(id => Users.findOneBy({ id }))); for (const u of visibleUsers.filter(u => u && Users.isRemoteUser(u))) { diff --git a/packages/backend/src/services/note/reaction/delete.ts b/packages/backend/src/services/note/reaction/delete.ts index 616a30015..060fe43de 100644 --- a/packages/backend/src/services/note/reaction/delete.ts +++ b/packages/backend/src/services/note/reaction/delete.ts @@ -46,12 +46,12 @@ export async function deleteReaction(user: { id: User['id']; host: User['host']; //#region 配信 if (Users.isLocalUser(user) && !note.localOnly) { const content = renderActivity(renderUndo(await renderLike(exist, note), user)); - const dm = new DeliverManager(user, content); + const dm = new DeliverManager(content); if (note.userHost !== null) { const reactee = await Users.findOneBy({ id: note.userId }); dm.addDirectRecipe(reactee as IRemoteUser); } - dm.addFollowersRecipe(); + dm.addFollowersRecipe(user); dm.execute(); } //#endregion diff --git a/packages/backend/src/services/suspend-user.ts b/packages/backend/src/services/suspend-user.ts index 11e6266a0..d08683008 100644 --- a/packages/backend/src/services/suspend-user.ts +++ b/packages/backend/src/services/suspend-user.ts @@ -13,7 +13,7 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] } const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user)); // deliver to all of known network - const dm = new DeliverManager(user, content); + const dm = new DeliverManager(content); dm.addEveryone(); await dm.execute(); } From ff4b6d932ea4436ad40f8685d8a9d96c6b179f9e Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 18 Apr 2023 21:56:09 +0200 Subject: [PATCH 2/8] refactor deliver to extract actor from activity --- packages/backend/src/queue/index.ts | 10 +++++++++- packages/backend/src/queue/types.ts | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index cc125a3de..4b458739e 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -7,6 +7,7 @@ import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js'; import { IActivity } from '@/remote/activitypub/type.js'; import { envOption } from '@/env.js'; import { MINUTE } from '@/const.js'; +import { DbResolver } from '@/remote/activitypub/db-resolver.js'; import processDeliver from './processors/deliver.js'; import processInbox from './processors/inbox.js'; @@ -83,10 +84,17 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export function deliver(user: ThinUser, content: unknown, to: string | null) { +export async function deliver(_user: ThinUser, content: unknown, to: string | null) { if (content == null) return null; if (to == null) return null; + // extract user from the Activity + const userUri = content.actor; + if (!userUri) throw new Error("Cannot deliver activity without actor."); + const user = await new DbResolver().getUserFromApId(userUri); + if (!user) throw new Error("Actor not found, cannot deliver."); + if (user.host != null) throw new Error("Cannot deliver for remote actor."); + const data = { user: { id: user.id, diff --git a/packages/backend/src/queue/types.ts b/packages/backend/src/queue/types.ts index 82bd28703..f4fcd97be 100644 --- a/packages/backend/src/queue/types.ts +++ b/packages/backend/src/queue/types.ts @@ -9,7 +9,7 @@ export type DeliverJobData = { /** Actor */ user: ThinUser; /** Activity */ - content: unknown; + content: IActivity; /** inbox URL to deliver */ to: string; }; From 4dc8822d057b8df920d0b884347a3552abc99a48 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 18 Apr 2023 22:00:52 +0200 Subject: [PATCH 3/8] remove unnecesary intermediate variables --- packages/backend/src/services/relay.ts | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/packages/backend/src/services/relay.ts b/packages/backend/src/services/relay.ts index de7155ea0..143ac5ca8 100644 --- a/packages/backend/src/services/relay.ts +++ b/packages/backend/src/services/relay.ts @@ -36,8 +36,7 @@ export async function addRelay(inbox: string): Promise { }).then(x => Relays.findOneByOrFail(x.identifiers[0])); const relayActor = await getRelayActor(); - const follow = renderFollowRelay(relay, relayActor); - const activity = renderActivity(follow); + const activity = renderActivity(renderFollowRelay(relay, relayActor)); deliver(relayActor, activity, relay.inbox); return relay; @@ -53,17 +52,14 @@ export async function removeRelay(inbox: string): Promise { } const relayActor = await getRelayActor(); - const follow = renderFollowRelay(relay, relayActor); - const undo = renderUndo(follow, relayActor); - const activity = renderActivity(undo); + const activity = renderActivity(renderUndo(renderFollowRelay(relay, relayActor), relayActor)); deliver(relayActor, activity, relay.inbox); await Relays.delete(relay.id); } export async function listRelay(): Promise { - const relays = await Relays.find(); - return relays; + return await Relays.find(); } export async function relayAccepted(id: string): Promise { From c978a687e9698c81fcb00ab82414c139f18639d6 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 18 Apr 2023 22:01:11 +0200 Subject: [PATCH 4/8] remove unused user parameter from deliver --- packages/backend/src/queue/index.ts | 2 +- .../backend/src/remote/activitypub/deliver-manager.ts | 2 +- .../src/server/api/common/read-messaging-message.ts | 4 ++-- .../src/server/api/endpoints/admin/reports/resolve.ts | 2 +- .../backend/src/server/api/endpoints/notes/polls/vote.ts | 2 +- packages/backend/src/services/blocking/create.ts | 8 ++++---- packages/backend/src/services/blocking/delete.ts | 2 +- packages/backend/src/services/following/create.ts | 4 ++-- packages/backend/src/services/following/delete.ts | 4 ++-- packages/backend/src/services/following/reject.ts | 2 +- .../backend/src/services/following/requests/accept.ts | 2 +- .../backend/src/services/following/requests/cancel.ts | 2 +- .../backend/src/services/following/requests/create.ts | 2 +- packages/backend/src/services/messages/create.ts | 2 +- packages/backend/src/services/messages/delete.ts | 2 +- packages/backend/src/services/relay.ts | 8 ++++---- packages/backend/src/services/unsuspend-user.ts | 2 +- 17 files changed, 26 insertions(+), 26 deletions(-) diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 4b458739e..55b5676c3 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -84,7 +84,7 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export async function deliver(_user: ThinUser, content: unknown, to: string | null) { +export function deliver(content: unknown, to: string | null) { if (content == null) return null; if (to == null) return null; diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 0262f94fe..0a653ed5d 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -156,7 +156,7 @@ export class DeliverManager { // skip instances as indicated if (instancesToSkip.includes(new URL(inbox).host)) continue; - deliver(null, this.activity, inbox); + deliver(this.activity, inbox); } } } diff --git a/packages/backend/src/server/api/common/read-messaging-message.ts b/packages/backend/src/server/api/common/read-messaging-message.ts index e8e16f319..5f3ea8e27 100644 --- a/packages/backend/src/server/api/common/read-messaging-message.ts +++ b/packages/backend/src/server/api/common/read-messaging-message.ts @@ -141,10 +141,10 @@ export async function deliverReadActivity(user: { id: User['id']; host: null; }, if (contents.length > 1) { const collection = orderedCollection(null, contents.length, undefined, undefined, contents); - deliver(user, renderActivity(collection), recipient.inbox); + deliver(renderActivity(collection), recipient.inbox); } else { for (const content of contents) { - deliver(user, renderActivity(content), recipient.inbox); + deliver(renderActivity(content), recipient.inbox); } } } diff --git a/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts b/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts index f9ff25c88..705dd5a37 100644 --- a/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts +++ b/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts @@ -29,7 +29,7 @@ export default define(meta, paramDef, async (ps, me) => { const actor = await getInstanceActor(); const targetUser = await Users.findOneByOrFail({ id: report.targetUserId }); - deliver(actor, renderActivity(renderFlag(actor, report)), targetUser.inbox); + deliver(renderActivity(renderFlag(actor, report)), targetUser.inbox); } await AbuseUserReports.update(report.id, { diff --git a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts index 56819b142..c5ad95af5 100644 --- a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts +++ b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts @@ -133,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => { if (note.userHost != null) { const pollOwner = await Users.findOneByOrFail({ id: note.userId }) as IRemoteUser; - deliver(user, renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox); + deliver(renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox); } // リモートフォロワーにUpdate配信 diff --git a/packages/backend/src/services/blocking/create.ts b/packages/backend/src/services/blocking/create.ts index 1550e3022..796c98dab 100644 --- a/packages/backend/src/services/blocking/create.ts +++ b/packages/backend/src/services/blocking/create.ts @@ -34,7 +34,7 @@ export default async function(blocker: User, blockee: User): Promise { if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee) && blocker.federateBlocks) { const content = renderActivity(renderBlock(blocking)); - deliver(blocker, content, blockee.inbox); + deliver(content, blockee.inbox); } } @@ -78,13 +78,13 @@ async function cancelRequest(follower: User, followee: User): Promise { // Send Undo Follow if followee is remote if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { const content = renderActivity(renderUndo(renderFollow(follower, followee), follower)); - deliver(follower, content, followee.inbox); + deliver(content, followee.inbox); } // Send Reject if follower is remote if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) { const content = renderActivity(renderReject(renderFollow(follower, followee, request.requestId!), followee)); - deliver(followee, content, follower.inbox); + deliver(content, follower.inbox); } } @@ -125,7 +125,7 @@ async function unFollow(follower: User, followee: User): Promise { // Send Undo Follow if follower is remote if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { const content = renderActivity(renderUndo(renderFollow(follower, followee), follower)); - deliver(follower, content, followee.inbox); + deliver(content, followee.inbox); } } diff --git a/packages/backend/src/services/blocking/delete.ts b/packages/backend/src/services/blocking/delete.ts index 82f92f05a..3dfa8fc44 100644 --- a/packages/backend/src/services/blocking/delete.ts +++ b/packages/backend/src/services/blocking/delete.ts @@ -29,6 +29,6 @@ export default async function(blocker: CacheableUser, blockee: CacheableUser) { // deliver if remote bloking if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) { const content = renderActivity(renderUndo(renderBlock(blocking), blocker)); - deliver(blocker, content, blockee.inbox); + deliver(content, blockee.inbox); } } diff --git a/packages/backend/src/services/following/create.ts b/packages/backend/src/services/following/create.ts index 7ad85dd99..00161f195 100644 --- a/packages/backend/src/services/following/create.ts +++ b/packages/backend/src/services/following/create.ts @@ -146,7 +146,7 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocked) { // リモートフォローを受けてブロックしていた場合は、エラーにするのではなくRejectを送り返しておしまい。 const content = renderActivity(renderReject(renderFollow(follower, followee, requestId), followee)); - deliver(followee , content, follower.inbox); + deliver(content, follower.inbox); return; } else if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocking) { // リモートフォローを受けてブロックされているはずの場合だったら、ブロック解除しておく。 @@ -195,6 +195,6 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) { const content = renderActivity(renderAccept(renderFollow(follower, followee, requestId), followee)); - deliver(followee, content, follower.inbox); + deliver(content, follower.inbox); } } diff --git a/packages/backend/src/services/following/delete.ts b/packages/backend/src/services/following/delete.ts index 9a29d1921..60f230683 100644 --- a/packages/backend/src/services/following/delete.ts +++ b/packages/backend/src/services/following/delete.ts @@ -52,13 +52,13 @@ export default async function(follower: { id: User['id']; host: User['host']; ur if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { const content = renderActivity(renderUndo(renderFollow(follower, followee), follower)); - deliver(follower, content, followee.inbox); + deliver(content, followee.inbox); } if (Users.isLocalUser(followee) && Users.isRemoteUser(follower)) { // local user has null host const content = renderActivity(renderReject(renderFollow(follower, followee), followee)); - deliver(followee, content, follower.inbox); + deliver(content, follower.inbox); } } diff --git a/packages/backend/src/services/following/reject.ts b/packages/backend/src/services/following/reject.ts index ec98da635..0bbe6fa24 100644 --- a/packages/backend/src/services/following/reject.ts +++ b/packages/backend/src/services/following/reject.ts @@ -99,7 +99,7 @@ async function deliverReject(followee: Local, follower: Remote): Promise { }); const content = renderActivity(renderReject(renderFollow(follower, followee, request?.requestId || undefined), followee)); - deliver(followee, content, follower.inbox); + deliver(content, follower.inbox); } /** diff --git a/packages/backend/src/services/following/requests/accept.ts b/packages/backend/src/services/following/requests/accept.ts index 231075100..6e5c02364 100644 --- a/packages/backend/src/services/following/requests/accept.ts +++ b/packages/backend/src/services/following/requests/accept.ts @@ -27,7 +27,7 @@ export async function acceptFollowRequest(followee: User, follower: User): Promi if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) { const content = renderActivity(renderAccept(renderFollow(follower, followee, request.requestId!), followee)); - deliver(followee, content, follower.inbox); + deliver(content, follower.inbox); } Users.pack(followee.id, followee, { diff --git a/packages/backend/src/services/following/requests/cancel.ts b/packages/backend/src/services/following/requests/cancel.ts index 4ceac19cb..ae1fd90bc 100644 --- a/packages/backend/src/services/following/requests/cancel.ts +++ b/packages/backend/src/services/following/requests/cancel.ts @@ -17,7 +17,7 @@ export async function cancelFollowRequest(followee: User, follower: User): Promi const content = renderActivity(renderUndo(renderFollow(follower, followee), follower)); if (Users.isLocalUser(follower)) { - deliver(follower, content, followee.inbox); + deliver(content, followee.inbox); } } diff --git a/packages/backend/src/services/following/requests/create.ts b/packages/backend/src/services/following/requests/create.ts index 446f2de40..396089b1a 100644 --- a/packages/backend/src/services/following/requests/create.ts +++ b/packages/backend/src/services/following/requests/create.ts @@ -64,6 +64,6 @@ export async function createFollowRequest(follower: User, followee: User, reques if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { const content = renderActivity(renderFollow(follower, followee)); - deliver(follower, content, followee.inbox); + deliver(content, followee.inbox); } } diff --git a/packages/backend/src/services/messages/create.ts b/packages/backend/src/services/messages/create.ts index 4a0ea53a8..c6d58b59a 100644 --- a/packages/backend/src/services/messages/create.ts +++ b/packages/backend/src/services/messages/create.ts @@ -97,7 +97,7 @@ export async function createMessage(user: { id: User['id']; host: User['host']; const activity = renderActivity(renderCreate(await renderNote(note, false, true), note)); - deliver(user, activity, recipientUser.inbox); + deliver(activity, recipientUser.inbox); } return messageObj; } diff --git a/packages/backend/src/services/messages/delete.ts b/packages/backend/src/services/messages/delete.ts index 25f49ae50..1082ba8a7 100644 --- a/packages/backend/src/services/messages/delete.ts +++ b/packages/backend/src/services/messages/delete.ts @@ -22,7 +22,7 @@ async function postDeleteMessage(message: MessagingMessage): Promise { if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) { const activity = renderActivity(renderDelete(renderTombstone(`${config.url}/notes/${message.id}`), user)); - deliver(user, activity, recipient.inbox); + deliver(activity, recipient.inbox); } } else if (message.groupId) { publishGroupMessagingStream(message.groupId, 'deleted', message.id); diff --git a/packages/backend/src/services/relay.ts b/packages/backend/src/services/relay.ts index 143ac5ca8..76adb991c 100644 --- a/packages/backend/src/services/relay.ts +++ b/packages/backend/src/services/relay.ts @@ -37,7 +37,7 @@ export async function addRelay(inbox: string): Promise { const relayActor = await getRelayActor(); const activity = renderActivity(renderFollowRelay(relay, relayActor)); - deliver(relayActor, activity, relay.inbox); + deliver(activity, relay.inbox); return relay; } @@ -53,7 +53,7 @@ export async function removeRelay(inbox: string): Promise { const relayActor = await getRelayActor(); const activity = renderActivity(renderUndo(renderFollowRelay(relay, relayActor), relayActor)); - deliver(relayActor, activity, relay.inbox); + deliver(activity, relay.inbox); await Relays.delete(relay.id); } @@ -90,7 +90,7 @@ export async function deliverToRelays(user: { id: User['id']; host: null; }, act const signed = await attachLdSignature(copy, user); for (const relay of relays) { - deliver(user, signed, relay.inbox); + deliver(signed, relay.inbox); } } @@ -108,6 +108,6 @@ export async function deliverMultipleToRelays(user: User, activities: any[]): Pr })); for (const relay of relays) { - deliver(user, content, relay.inbox); + deliver(content, relay.inbox); } } diff --git a/packages/backend/src/services/unsuspend-user.ts b/packages/backend/src/services/unsuspend-user.ts index 766b10f21..85a886be1 100644 --- a/packages/backend/src/services/unsuspend-user.ts +++ b/packages/backend/src/services/unsuspend-user.ts @@ -32,7 +32,7 @@ export async function doPostUnsuspend(user: User): Promise { } for (const inbox of queue) { - deliver(user as any, content, inbox); + deliver(content, inbox); } } } From b374a79eb1added121bc8d4bcde93de285c733a5 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Tue, 18 Apr 2023 22:48:40 +0200 Subject: [PATCH 5/8] activitypub: no longer deliver chat read Prospectively, the messaging messages will be removed as objects so it does not make sense to refactor this to match the remainder of refactoring away the user from deliver. It is just easier to remove this use of delivering something that is not an activity. Changelog: Removed --- .../src/remote/activitypub/renderer/read.ts | 9 --------- .../api/common/read-messaging-message.ts | 19 ------------------- .../api/endpoints/messaging/messages.ts | 7 +------ .../server/api/stream/channels/messaging.ts | 9 +-------- 4 files changed, 2 insertions(+), 42 deletions(-) delete mode 100644 packages/backend/src/remote/activitypub/renderer/read.ts diff --git a/packages/backend/src/remote/activitypub/renderer/read.ts b/packages/backend/src/remote/activitypub/renderer/read.ts deleted file mode 100644 index a30e649f6..000000000 --- a/packages/backend/src/remote/activitypub/renderer/read.ts +++ /dev/null @@ -1,9 +0,0 @@ -import config from '@/config/index.js'; -import { User } from '@/models/entities/user.js'; -import { MessagingMessage } from '@/models/entities/messaging-message.js'; - -export const renderReadActivity = (user: { id: User['id'] }, message: MessagingMessage) => ({ - type: 'Read', - actor: `${config.url}/users/${user.id}`, - object: message.uri, -}); diff --git a/packages/backend/src/server/api/common/read-messaging-message.ts b/packages/backend/src/server/api/common/read-messaging-message.ts index 5f3ea8e27..9f713a9d6 100644 --- a/packages/backend/src/server/api/common/read-messaging-message.ts +++ b/packages/backend/src/server/api/common/read-messaging-message.ts @@ -7,10 +7,6 @@ import { MessagingMessages, UserGroupJoinings, Users } from '@/models/index.js'; import { IdentifiableError } from '@/misc/identifiable-error.js'; import { UserGroup } from '@/models/entities/user-group.js'; import { toArray } from '@/prelude/array.js'; -import { renderReadActivity } from '@/remote/activitypub/renderer/read.js'; -import { renderActivity } from '@/remote/activitypub/renderer/index.js'; -import { deliver } from '@/queue/index.js'; -import orderedCollection from '@/remote/activitypub/renderer/ordered-collection.js'; /** * Mark messages as read @@ -133,18 +129,3 @@ export async function readGroupMessagingMessage( } } } - -export async function deliverReadActivity(user: { id: User['id']; host: null; }, recipient: IRemoteUser, messages: MessagingMessage | MessagingMessage[]) { - const contents = toArray(messages) - .filter(x => x.uri) - .map(x => renderReadActivity(user, x)); - - if (contents.length > 1) { - const collection = orderedCollection(null, contents.length, undefined, undefined, contents); - deliver(renderActivity(collection), recipient.inbox); - } else { - for (const content of contents) { - deliver(renderActivity(content), recipient.inbox); - } - } -} diff --git a/packages/backend/src/server/api/endpoints/messaging/messages.ts b/packages/backend/src/server/api/endpoints/messaging/messages.ts index f00b84e49..db7184a0b 100644 --- a/packages/backend/src/server/api/endpoints/messaging/messages.ts +++ b/packages/backend/src/server/api/endpoints/messaging/messages.ts @@ -4,7 +4,7 @@ import define from '@/server/api/define.js'; import { ApiError } from '@/server/api/error.js'; import { getUser } from '@/server/api/common/getters.js'; import { makePaginationQuery } from '@/server/api/common/make-pagination-query.js'; -import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js'; +import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js'; export const meta = { tags: ['messaging'], @@ -75,11 +75,6 @@ export default define(meta, paramDef, async (ps, user) => { // Mark all as read if (ps.markAsRead) { readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id)); - - // リモートユーザーとのメッセージだったら既読配信 - if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) { - deliverReadActivity(user, recipient, messages); - } } return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, { diff --git a/packages/backend/src/server/api/stream/channels/messaging.ts b/packages/backend/src/server/api/stream/channels/messaging.ts index e10ede776..009acf440 100644 --- a/packages/backend/src/server/api/stream/channels/messaging.ts +++ b/packages/backend/src/server/api/stream/channels/messaging.ts @@ -1,7 +1,7 @@ import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index.js'; import { User, ILocalUser, IRemoteUser } from '@/models/entities/user.js'; import { UserGroup } from '@/models/entities/user-group.js'; -import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js'; +import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js'; import Channel from '@/server/api/stream/channel.js'; import { StreamMessages } from '@/server/api/stream/types.js'; @@ -69,13 +69,6 @@ export default class extends Channel { case 'read': if (this.otherpartyId) { readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]); - - // リモートユーザーからのメッセージだったら既読配信 - if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) { - MessagingMessages.findOneBy({ id: body.id }).then(message => { - if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message); - }); - } } else if (this.groupId) { readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]); } From 965ee4b0411595c002c367e56d7cc8f3e7dee88c Mon Sep 17 00:00:00 2001 From: Johann150 Date: Mon, 1 May 2023 12:41:39 +0200 Subject: [PATCH 6/8] fix for delivering multiple activities at once --- packages/backend/src/queue/index.ts | 59 ++++++++++++++++++----------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 55b5676c3..f52660448 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -84,34 +84,47 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export function deliver(content: unknown, to: string | null) { +export function deliver(content: IActivity|IActivity[], to: string | null) { if (content == null) return null; if (to == null) return null; - // extract user from the Activity - const userUri = content.actor; - if (!userUri) throw new Error("Cannot deliver activity without actor."); - const user = await new DbResolver().getUserFromApId(userUri); - if (!user) throw new Error("Actor not found, cannot deliver."); - if (user.host != null) throw new Error("Cannot deliver for remote actor."); + // group activities by actor + const contentArray = Array.isArray(content) ? content : [content]; + let byActor = contentArray.reduce((acc, activity) => { + if (activity.actor == null) throw new Error("Cannot deliver activity without actor."); + if (!(activity.actor in acc)) { + acc[activity.actor] = []; + } + acc[activity.actor].push(activity); + return acc; + }, []); - const data = { - user: { - id: user.id, - }, - content, - to, - }; + // add groups to deliver queue + const dbResolver = new DbResolver(); + for (const actor in byActor) { + // extract user from the Activity + const user = await dbResolver.getUserFromApId(actor); + if (!user) throw new Error("Actor not found, cannot deliver."); + if (user.host != null) throw new Error("Cannot deliver for remote actor."); - return deliverQueue.add(data, { - attempts: config.deliverJobMaxAttempts, - timeout: MINUTE, - backoff: { - type: 'apBackoff', - }, - removeOnComplete: true, - removeOnFail: true, - }); + // add item to deliver queue + const data = { + user: { + id: user.id, + }, + byActor[actor], + to, + }; + deliverQueue.add(data, { + attempts: config.deliverJobMaxAttempts, + timeout: MINUTE, + backoff: { + type: 'apBackoff', + }, + removeOnComplete: true, + removeOnFail: true, + }); + } } export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { From b04ef21b6ef2687e9df42200d8d6512f741f2d16 Mon Sep 17 00:00:00 2001 From: Johann150 Date: Mon, 1 May 2023 20:45:47 +0200 Subject: [PATCH 7/8] fix missing parameter name --- packages/backend/src/queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index f52660448..8fdc14894 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -112,7 +112,7 @@ export function deliver(content: IActivity|IActivity[], to: string | null) { user: { id: user.id, }, - byActor[actor], + content: byActor[actor], to, }; deliverQueue.add(data, { From ba77a81cc047dc7bfb0418657e3dccd3f357248a Mon Sep 17 00:00:00 2001 From: Johann150 Date: Thu, 4 May 2023 21:43:09 +0200 Subject: [PATCH 8/8] fix: require async for deliver queue --- packages/backend/src/queue/index.ts | 2 +- .../src/remote/activitypub/deliver-manager.ts | 2 +- .../api/endpoints/admin/reports/resolve.ts | 2 +- .../src/server/api/endpoints/notes/polls/vote.ts | 2 +- packages/backend/src/services/blocking/delete.ts | 2 +- .../src/services/following/requests/accept.ts | 2 +- .../src/services/following/requests/cancel.ts | 2 +- .../src/services/following/requests/create.ts | 2 +- packages/backend/src/services/messages/create.ts | 2 +- packages/backend/src/services/messages/delete.ts | 2 +- packages/backend/src/services/relay.ts | 16 ++++++++-------- packages/backend/src/services/unsuspend-user.ts | 2 +- 12 files changed, 19 insertions(+), 19 deletions(-) diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 8fdc14894..6a3a02d8e 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -84,7 +84,7 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export function deliver(content: IActivity|IActivity[], to: string | null) { +export async function deliver(content: IActivity|IActivity[], to: string | null) { if (content == null) return null; if (to == null) return null; diff --git a/packages/backend/src/remote/activitypub/deliver-manager.ts b/packages/backend/src/remote/activitypub/deliver-manager.ts index 0a653ed5d..7d3407684 100644 --- a/packages/backend/src/remote/activitypub/deliver-manager.ts +++ b/packages/backend/src/remote/activitypub/deliver-manager.ts @@ -156,7 +156,7 @@ export class DeliverManager { // skip instances as indicated if (instancesToSkip.includes(new URL(inbox).host)) continue; - deliver(this.activity, inbox); + await deliver(this.activity, inbox); } } } diff --git a/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts b/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts index 705dd5a37..96dd3d612 100644 --- a/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts +++ b/packages/backend/src/server/api/endpoints/admin/reports/resolve.ts @@ -29,7 +29,7 @@ export default define(meta, paramDef, async (ps, me) => { const actor = await getInstanceActor(); const targetUser = await Users.findOneByOrFail({ id: report.targetUserId }); - deliver(renderActivity(renderFlag(actor, report)), targetUser.inbox); + await deliver(renderActivity(renderFlag(actor, report)), targetUser.inbox); } await AbuseUserReports.update(report.id, { diff --git a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts index c5ad95af5..eadf97225 100644 --- a/packages/backend/src/server/api/endpoints/notes/polls/vote.ts +++ b/packages/backend/src/server/api/endpoints/notes/polls/vote.ts @@ -133,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => { if (note.userHost != null) { const pollOwner = await Users.findOneByOrFail({ id: note.userId }) as IRemoteUser; - deliver(renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox); + await deliver(renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox); } // リモートフォロワーにUpdate配信 diff --git a/packages/backend/src/services/blocking/delete.ts b/packages/backend/src/services/blocking/delete.ts index 3dfa8fc44..8b146951d 100644 --- a/packages/backend/src/services/blocking/delete.ts +++ b/packages/backend/src/services/blocking/delete.ts @@ -29,6 +29,6 @@ export default async function(blocker: CacheableUser, blockee: CacheableUser) { // deliver if remote bloking if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) { const content = renderActivity(renderUndo(renderBlock(blocking), blocker)); - deliver(content, blockee.inbox); + await deliver(content, blockee.inbox); } } diff --git a/packages/backend/src/services/following/requests/accept.ts b/packages/backend/src/services/following/requests/accept.ts index 6e5c02364..beb10a56f 100644 --- a/packages/backend/src/services/following/requests/accept.ts +++ b/packages/backend/src/services/following/requests/accept.ts @@ -27,7 +27,7 @@ export async function acceptFollowRequest(followee: User, follower: User): Promi if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) { const content = renderActivity(renderAccept(renderFollow(follower, followee, request.requestId!), followee)); - deliver(content, follower.inbox); + await deliver(content, follower.inbox); } Users.pack(followee.id, followee, { diff --git a/packages/backend/src/services/following/requests/cancel.ts b/packages/backend/src/services/following/requests/cancel.ts index ae1fd90bc..31193a911 100644 --- a/packages/backend/src/services/following/requests/cancel.ts +++ b/packages/backend/src/services/following/requests/cancel.ts @@ -17,7 +17,7 @@ export async function cancelFollowRequest(followee: User, follower: User): Promi const content = renderActivity(renderUndo(renderFollow(follower, followee), follower)); if (Users.isLocalUser(follower)) { - deliver(content, followee.inbox); + await deliver(content, followee.inbox); } } diff --git a/packages/backend/src/services/following/requests/create.ts b/packages/backend/src/services/following/requests/create.ts index 396089b1a..0f92db2b0 100644 --- a/packages/backend/src/services/following/requests/create.ts +++ b/packages/backend/src/services/following/requests/create.ts @@ -64,6 +64,6 @@ export async function createFollowRequest(follower: User, followee: User, reques if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) { const content = renderActivity(renderFollow(follower, followee)); - deliver(content, followee.inbox); + await deliver(content, followee.inbox); } } diff --git a/packages/backend/src/services/messages/create.ts b/packages/backend/src/services/messages/create.ts index c6d58b59a..631f23863 100644 --- a/packages/backend/src/services/messages/create.ts +++ b/packages/backend/src/services/messages/create.ts @@ -97,7 +97,7 @@ export async function createMessage(user: { id: User['id']; host: User['host']; const activity = renderActivity(renderCreate(await renderNote(note, false, true), note)); - deliver(activity, recipientUser.inbox); + await deliver(activity, recipientUser.inbox); } return messageObj; } diff --git a/packages/backend/src/services/messages/delete.ts b/packages/backend/src/services/messages/delete.ts index 1082ba8a7..acbc5e71a 100644 --- a/packages/backend/src/services/messages/delete.ts +++ b/packages/backend/src/services/messages/delete.ts @@ -22,7 +22,7 @@ async function postDeleteMessage(message: MessagingMessage): Promise { if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) { const activity = renderActivity(renderDelete(renderTombstone(`${config.url}/notes/${message.id}`), user)); - deliver(activity, recipient.inbox); + await deliver(activity, recipient.inbox); } } else if (message.groupId) { publishGroupMessagingStream(message.groupId, 'deleted', message.id); diff --git a/packages/backend/src/services/relay.ts b/packages/backend/src/services/relay.ts index 76adb991c..8c580ea7d 100644 --- a/packages/backend/src/services/relay.ts +++ b/packages/backend/src/services/relay.ts @@ -37,7 +37,7 @@ export async function addRelay(inbox: string): Promise { const relayActor = await getRelayActor(); const activity = renderActivity(renderFollowRelay(relay, relayActor)); - deliver(activity, relay.inbox); + await deliver(activity, relay.inbox); return relay; } @@ -53,7 +53,7 @@ export async function removeRelay(inbox: string): Promise { const relayActor = await getRelayActor(); const activity = renderActivity(renderUndo(renderFollowRelay(relay, relayActor), relayActor)); - deliver(activity, relay.inbox); + await deliver(activity, relay.inbox); await Relays.delete(relay.id); } @@ -89,9 +89,9 @@ export async function deliverToRelays(user: { id: User['id']; host: null; }, act const signed = await attachLdSignature(copy, user); - for (const relay of relays) { - deliver(signed, relay.inbox); - } + await Promise.all(relays.map(relay => + deliver(signed, relay.inbox) + )); } export async function deliverMultipleToRelays(user: User, activities: any[]): Promise { @@ -107,7 +107,7 @@ export async function deliverMultipleToRelays(user: User, activities: any[]): Pr return attachLdSignature(copy, user); })); - for (const relay of relays) { - deliver(content, relay.inbox); - } + await Promise.all(relays.map(relay => + deliver(content, relay.inbox) + )); } diff --git a/packages/backend/src/services/unsuspend-user.ts b/packages/backend/src/services/unsuspend-user.ts index 85a886be1..472ed7484 100644 --- a/packages/backend/src/services/unsuspend-user.ts +++ b/packages/backend/src/services/unsuspend-user.ts @@ -32,7 +32,7 @@ export async function doPostUnsuspend(user: User): Promise { } for (const inbox of queue) { - deliver(content, inbox); + await deliver(content, inbox); } } }