Compare commits

...

8 commits

Author SHA1 Message Date
Johann150 e574d9cef1
fix: require async for deliver queue 2023-07-02 10:13:30 +02:00
Johann150 7f7fbc291e
fix missing parameter name 2023-07-02 10:13:30 +02:00
Johann150 c00272ced8
fix for delivering multiple activities at once 2023-07-02 10:13:29 +02:00
Johann150 d292f035aa
activitypub: no longer deliver chat read
Prospectively, the messaging messages will be removed as objects so
it does not make sense to refactor this to match the remainder of
refactoring away the user from deliver. It is just easier to remove
this use of delivering something that is not an activity.

Changelog: Removed
2023-07-02 10:13:29 +02:00
Johann150 07891db78e
remove unused user parameter from deliver 2023-07-02 10:13:29 +02:00
Johann150 17c872235a
remove unnecesary intermediate variables 2023-07-02 10:13:28 +02:00
Johann150 c8aa46538d
refactor deliver to extract actor from activity 2023-07-02 10:13:28 +02:00
Johann150 0ede4f183f
make delivery manager author agnostic 2023-07-02 10:13:27 +02:00
26 changed files with 110 additions and 132 deletions

View file

@ -8,6 +8,7 @@ import { DriveFile } from '@/models/entities/drive-file.js';
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
import { IActivity } from '@/remote/activitypub/type.js';
import { MINUTE } from '@/const.js';
import { DbResolver } from '@/remote/activitypub/db-resolver.js';
import processDeliver from './processors/deliver.js';
import processInbox from './processors/inbox.js';
@ -96,28 +97,48 @@ webhookDeliverQueue
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) {
export async function deliver(content: IActivity|IActivity[], to: string | null, deletingUserId?: string) {
if (content == null) return null;
if (to == null) return null;
const data = {
user: {
id: user.id,
},
content,
to,
deletingUserId,
};
// group activities by actor
const contentArray = Array.isArray(content) ? content : [content];
let byActor = contentArray.reduce((acc, activity) => {
if (activity.actor == null) throw new Error("Cannot deliver activity without actor.");
if (!(activity.actor in acc)) {
acc[activity.actor] = [];
}
acc[activity.actor].push(activity);
return acc;
}, []);
return deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts,
timeout: MINUTE,
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
// add groups to deliver queue
const dbResolver = new DbResolver();
for (const actor in byActor) {
// extract user from the Activity
const user = await dbResolver.getUserFromApId(actor);
if (!user) throw new Error("Actor not found, cannot deliver.");
if (user.host != null) throw new Error("Cannot deliver for remote actor.");
// add item to deliver queue
const data = {
user: {
id: user.id,
},
content: byActor[actor],
to,
deletingUserId,
};
deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts,
timeout: MINUTE,
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
}
}
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {

View file

@ -9,7 +9,7 @@ export type DeliverJobData = {
/** Actor */
user: ThinUser;
/** Activity */
content: unknown;
content: IActivity;
/** inbox URL to deliver */
to: string;
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */

View file

@ -14,6 +14,7 @@ interface IEveryoneRecipe extends IRecipe {
interface IFollowersRecipe extends IRecipe {
type: 'Followers';
followee: ILocalUser;
}
interface IDirectRecipe extends IRecipe {
@ -32,26 +33,24 @@ const isDirect = (recipe: any): recipe is IDirectRecipe =>
//#endregion
export class DeliverManager {
private actor: { id: User['id']; host: null; };
private activity: any;
private recipes: IRecipe[] = [];
/**
* Constructor
* @param actor Actor
* @param activity Activity to deliver
*/
constructor(actor: { id: User['id']; host: null; }, activity: any) {
this.actor = actor;
constructor(activity: any) {
this.activity = activity;
}
/**
* Add recipe for followers deliver
*/
public addFollowersRecipe() {
public addFollowersRecipe(followee: ILocalUser) {
const deliver = {
type: 'Followers',
followee,
} as IFollowersRecipe;
this.addRecipe(deliver);
@ -89,9 +88,7 @@ export class DeliverManager {
* Execute delivers
*/
public async execute(deletingUserId?: string) {
if (!Users.isLocalUser(this.actor)) return;
let inboxes = new Set<string>();
const inboxes = new Set<string>();
/*
build inbox list
@ -116,21 +113,25 @@ export class DeliverManager {
}
}
if (this.recipes.some(r => isFollowers(r))) {
// followers deliver
const followers = await Followings.createQueryBuilder('followings')
// return either the shared inbox (if available) or the individual inbox
.select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox')
// so we don't have to make our inboxes Set work as hard
.distinct(true)
// ...for the specific actors followers
.where('followings.followeeId = :actorId', { actorId: this.actor.id })
// don't deliver to ourselves
.andWhere('followings.followerHost IS NOT NULL')
.getRawMany();
followers.forEach(({ inbox }) => inboxes.add(inbox));
}
await Promise.all(
this.recipes.filter(isFollowers)
.map(recipe => {
// followers deliver
return Followings.createQueryBuilder('followings')
// return either the shared inbox (if available) or the individual inbox
.select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox')
// so we don't have to make our inboxes Set work as hard
.distinct(true)
// ...for the specific actors followers
.where('followings.followeeId = :actorId', { actorId: recipe.followee.id })
// don't deliver to ourselves
.andWhere('followings.followerHost IS NOT NULL')
.getRawMany()
.then(followers =>
followers.forEach(({ inbox }) => inboxes.add(inbox))
);
})
);
this.recipes.filter((recipe): recipe is IDirectRecipe =>
// followers recipes have already been processed
@ -160,7 +161,7 @@ export class DeliverManager {
});
}
filteredInboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId));
filteredInboxes.forEach(inbox => deliver(this.activity, inbox, deletingUserId));
}
}
@ -170,9 +171,9 @@ export class DeliverManager {
* @param activity Activity
* @param from Followee
*/
export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: null; }, activity: any) {
const manager = new DeliverManager(actor, activity);
manager.addFollowersRecipe();
export async function deliverToFollowers(actor: ILocalUser, activity: any) {
const manager = new DeliverManager(activity);
manager.addFollowersRecipe(actor);
await manager.execute();
}
@ -181,8 +182,8 @@ export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: nu
* @param activity Activity
* @param to Target user
*/
export async function deliverToUser(actor: { id: ILocalUser['id']; host: null; }, activity: any, to: IRemoteUser) {
const manager = new DeliverManager(actor, activity);
export async function deliverToUser(activity: any, to: IRemoteUser) {
const manager = new DeliverManager(activity);
manager.addDirectRecipe(to);
await manager.execute();
}

View file

@ -1,9 +0,0 @@
import config from '@/config/index.js';
import { User } from '@/models/entities/user.js';
import { MessagingMessage } from '@/models/entities/messaging-message.js';
export const renderReadActivity = (user: { id: User['id'] }, message: MessagingMessage) => ({
type: 'Read',
actor: `${config.url}/users/${user.id}`,
object: message.uri,
});

View file

@ -7,10 +7,6 @@ import { MessagingMessages, UserGroupJoinings, Users } from '@/models/index.js';
import { IdentifiableError } from '@/misc/identifiable-error.js';
import { UserGroup } from '@/models/entities/user-group.js';
import { toArray } from '@/prelude/array.js';
import { renderReadActivity } from '@/remote/activitypub/renderer/read.js';
import { renderActivity } from '@/remote/activitypub/renderer/index.js';
import { deliver } from '@/queue/index.js';
import orderedCollection from '@/remote/activitypub/renderer/ordered-collection.js';
/**
* Mark messages as read
@ -133,18 +129,3 @@ export async function readGroupMessagingMessage(
}
}
}
export async function deliverReadActivity(user: { id: User['id']; host: null; }, recipient: IRemoteUser, messages: MessagingMessage | MessagingMessage[]) {
const contents = toArray(messages)
.filter(x => x.uri)
.map(x => renderReadActivity(user, x));
if (contents.length > 1) {
const collection = orderedCollection(null, contents.length, undefined, undefined, contents);
deliver(user, renderActivity(collection), recipient.inbox);
} else {
for (const content of contents) {
deliver(user, renderActivity(content), recipient.inbox);
}
}
}

View file

@ -29,7 +29,7 @@ export default define(meta, paramDef, async (ps, me) => {
const actor = await getInstanceActor();
const targetUser = await Users.findOneByOrFail({ id: report.targetUserId });
deliver(actor, renderActivity(renderFlag(actor, report)), targetUser.inbox);
await deliver(renderActivity(renderFlag(actor, report)), targetUser.inbox);
}
await AbuseUserReports.update(report.id, {

View file

@ -4,7 +4,7 @@ import define from '@/server/api/define.js';
import { ApiError } from '@/server/api/error.js';
import { getUser } from '@/server/api/common/getters.js';
import { makePaginationQuery } from '@/server/api/common/make-pagination-query.js';
import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js';
import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js';
export const meta = {
tags: ['messaging'],
@ -75,11 +75,6 @@ export default define(meta, paramDef, async (ps, user) => {
// Mark all as read
if (ps.markAsRead) {
readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id));
// リモートユーザーとのメッセージだったら既読配信
if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) {
deliverReadActivity(user, recipient, messages);
}
}
return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, {

View file

@ -133,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => {
if (note.userHost != null) {
const pollOwner = await Users.findOneByOrFail({ id: note.userId }) as IRemoteUser;
deliver(user, renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox);
await deliver(renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox);
}
// リモートフォロワーにUpdate配信

View file

@ -1,7 +1,7 @@
import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index.js';
import { User, ILocalUser, IRemoteUser } from '@/models/entities/user.js';
import { UserGroup } from '@/models/entities/user-group.js';
import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js';
import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js';
import Channel from '@/server/api/stream/channel.js';
import { StreamMessages } from '@/server/api/stream/types.js';
@ -69,13 +69,6 @@ export default class extends Channel {
case 'read':
if (this.otherpartyId) {
readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]);
// リモートユーザーからのメッセージだったら既読配信
if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) {
MessagingMessages.findOneBy({ id: body.id }).then(message => {
if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message);
});
}
} else if (this.groupId) {
readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]);
}

View file

@ -34,7 +34,7 @@ export default async function(blocker: User, blockee: User): Promise<void> {
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee) && blocker.federateBlocks) {
const content = renderActivity(renderBlock(blocking));
deliver(blocker, content, blockee.inbox);
deliver(content, blockee.inbox);
}
}
@ -78,13 +78,13 @@ async function cancelRequest(follower: User, followee: User): Promise<void> {
// Send Undo Follow if followee is remote
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
deliver(follower, content, followee.inbox);
deliver(content, followee.inbox);
}
// Send Reject if follower is remote
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
const content = renderActivity(renderReject(renderFollow(follower, followee, request.requestId!), followee));
deliver(followee, content, follower.inbox);
deliver(content, follower.inbox);
}
}
@ -125,7 +125,7 @@ async function unFollow(follower: User, followee: User): Promise<void> {
// Send Undo Follow if follower is remote
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
deliver(follower, content, followee.inbox);
deliver(content, followee.inbox);
}
}

View file

@ -29,6 +29,6 @@ export default async function(blocker: User, blockee: User) {
// deliver if remote bloking
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) {
const content = renderActivity(renderUndo(renderBlock(blocking), blocker));
deliver(blocker, content, blockee.inbox);
await deliver(content, blockee.inbox);
}
}

View file

@ -146,7 +146,7 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocked) {
// リモートフォローを受けてブロックしていた場合は、エラーにするのではなくRejectを送り返しておしまい。
const content = renderActivity(renderReject(renderFollow(follower, followee, requestId), followee));
deliver(followee , content, follower.inbox);
deliver(content, follower.inbox);
return;
} else if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocking) {
// リモートフォローを受けてブロックされているはずの場合だったら、ブロック解除しておく。
@ -195,6 +195,6 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
const content = renderActivity(renderAccept(renderFollow(follower, followee, requestId), followee));
deliver(followee, content, follower.inbox);
deliver(content, follower.inbox);
}
}

View file

@ -52,13 +52,13 @@ export default async function(follower: { id: User['id']; host: User['host']; ur
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
deliver(follower, content, followee.inbox);
deliver(content, followee.inbox);
}
if (Users.isLocalUser(followee) && Users.isRemoteUser(follower)) {
// local user has null host
const content = renderActivity(renderReject(renderFollow(follower, followee), followee));
deliver(followee, content, follower.inbox);
deliver(content, follower.inbox);
}
}

View file

@ -99,7 +99,7 @@ async function deliverReject(followee: Local, follower: Remote): Promise<void> {
});
const content = renderActivity(renderReject(renderFollow(follower, followee, request?.requestId || undefined), followee));
deliver(followee, content, follower.inbox);
deliver(content, follower.inbox);
}
/**

View file

@ -27,7 +27,7 @@ export async function acceptFollowRequest(followee: User, follower: User): Promi
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
const content = renderActivity(renderAccept(renderFollow(follower, followee, request.requestId!), followee));
deliver(followee, content, follower.inbox);
await deliver(content, follower.inbox);
}
Users.pack(followee.id, followee, {

View file

@ -17,7 +17,7 @@ export async function cancelFollowRequest(followee: User, follower: User): Promi
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
if (Users.isLocalUser(follower)) {
deliver(follower, content, followee.inbox);
await deliver(content, followee.inbox);
}
}

View file

@ -64,6 +64,6 @@ export async function createFollowRequest(follower: User, followee: User, reques
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
const content = renderActivity(renderFollow(follower, followee));
deliver(follower, content, followee.inbox);
await deliver(content, followee.inbox);
}
}

View file

@ -97,7 +97,7 @@ export async function createMessage(user: { id: User['id']; host: User['host'];
const activity = renderActivity(renderCreate(await renderNote(note, false, true), note));
deliver(user, activity, recipientUser.inbox);
await deliver(activity, recipientUser.inbox);
}
return messageObj;
}

View file

@ -22,7 +22,7 @@ async function postDeleteMessage(message: MessagingMessage): Promise<void> {
if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) {
const activity = renderActivity(renderDelete(renderTombstone(`${config.url}/notes/${message.id}`), user));
deliver(user, activity, recipient.inbox);
await deliver(activity, recipient.inbox);
}
} else if (message.groupId) {
publishGroupMessagingStream(message.groupId, 'deleted', message.id);

View file

@ -60,8 +60,8 @@ export async function deleteNotes(notes: Note[], user?: User): Promise<void> {
// Compute addressing information.
// Since we do not send any actual content, we send all note deletions to everyone.
const manager = new DeliverManager(fetchedUser, content);
manager.addFollowersRecipe();
const manager = new DeliverManager(content);
manager.addFollowersRecipe(fetchedUser);
manager.addEveryone();
// Check mentioned users, since not all may have a shared inbox.
await Promise.all(

View file

@ -130,14 +130,14 @@ export async function createReaction(user: { id: User['id']; host: User['host'];
//#region 配信
if (Users.isLocalUser(user) && !note.localOnly) {
const content = renderActivity(await renderLike(record, note));
const dm = new DeliverManager(user, content);
const dm = new DeliverManager(content);
if (note.userHost !== null) {
const reactee = await Users.findOneBy({ id: note.userId });
dm.addDirectRecipe(reactee as IRemoteUser);
}
if (['public', 'home', 'followers'].includes(note.visibility)) {
dm.addFollowersRecipe();
dm.addFollowersRecipe(user);
} else if (note.visibility === 'specified') {
const visibleUsers = await Promise.all(note.visibleUserIds.map(id => Users.findOneBy({ id })));
for (const u of visibleUsers.filter(u => u && Users.isRemoteUser(u))) {

View file

@ -46,12 +46,12 @@ export async function deleteReaction(user: { id: User['id']; host: User['host'];
//#region 配信
if (Users.isLocalUser(user) && !note.localOnly) {
const content = renderActivity(renderUndo(await renderLike(exist, note), user));
const dm = new DeliverManager(user, content);
const dm = new DeliverManager(content);
if (note.userHost !== null) {
const reactee = await Users.findOneBy({ id: note.userId });
dm.addDirectRecipe(reactee as IRemoteUser);
}
dm.addFollowersRecipe();
dm.addFollowersRecipe(user);
dm.execute();
}
//#endregion

View file

@ -335,7 +335,7 @@ export async function sideEffects(user: User, note: Note, silent = false, create
if (Users.isLocalUser(user) && !note.localOnly && created) {
(async () => {
const noteActivity = renderActivity(await renderNoteOrRenoteActivity(note));
const dm = new DeliverManager(user, noteActivity);
const dm = new DeliverManager(noteActivity);
// Delivered to remote users who have been mentioned
for (const u of mentionedUsers.filter(u => Users.isRemoteUser(u))) {
@ -368,7 +368,7 @@ export async function sideEffects(user: User, note: Note, silent = false, create
// Deliver to followers
if (['public', 'home', 'followers'].includes(note.visibility)) {
dm.addFollowersRecipe();
dm.addFollowersRecipe(user);
}
if (['public'].includes(note.visibility)) {

View file

@ -36,9 +36,8 @@ export async function addRelay(inbox: string): Promise<Relay> {
}).then(x => Relays.findOneByOrFail(x.identifiers[0]));
const relayActor = await getRelayActor();
const follow = renderFollowRelay(relay, relayActor);
const activity = renderActivity(follow);
deliver(relayActor, activity, relay.inbox);
const activity = renderActivity(renderFollowRelay(relay, relayActor));
await deliver(activity, relay.inbox);
return relay;
}
@ -53,17 +52,14 @@ export async function removeRelay(inbox: string): Promise<void> {
}
const relayActor = await getRelayActor();
const follow = renderFollowRelay(relay, relayActor);
const undo = renderUndo(follow, relayActor);
const activity = renderActivity(undo);
deliver(relayActor, activity, relay.inbox);
const activity = renderActivity(renderUndo(renderFollowRelay(relay, relayActor), relayActor));
await deliver(activity, relay.inbox);
await Relays.delete(relay.id);
}
export async function listRelay(): Promise<Relay[]> {
const relays = await Relays.find();
return relays;
return await Relays.find();
}
export async function relayAccepted(id: string): Promise<string> {
@ -93,9 +89,9 @@ export async function deliverToRelays(user: { id: User['id']; host: null; }, act
const signed = await attachLdSignature(copy, user);
for (const relay of relays) {
deliver(user, signed, relay.inbox);
}
await Promise.all(relays.map(relay =>
deliver(signed, relay.inbox)
));
}
export async function deliverMultipleToRelays(user: User, activities: any[]): Promise<void> {
@ -111,7 +107,7 @@ export async function deliverMultipleToRelays(user: User, activities: any[]): Pr
return attachLdSignature(copy, user);
}));
for (const relay of relays) {
deliver(user, content, relay.inbox);
}
await Promise.all(relays.map(relay =>
deliver(content, relay.inbox)
));
}

View file

@ -16,7 +16,7 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }
const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user));
// deliver to all of known network
const dm = new DeliverManager(user, content);
const dm = new DeliverManager(content);
dm.addEveryone();
await dm.execute(user.id);
}

View file

@ -32,7 +32,7 @@ export async function doPostUnsuspend(user: User): Promise<void> {
}
for (const inbox of queue) {
deliver(user as any, content, inbox);
await deliver(content, inbox);
}
}
}