refactor deliver
to extract actor from activity #377
26 changed files with 110 additions and 132 deletions
|
@ -8,6 +8,7 @@ import { DriveFile } from '@/models/entities/drive-file.js';
|
||||||
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
import { Webhook, webhookEventTypes } from '@/models/entities/webhook.js';
|
||||||
import { IActivity } from '@/remote/activitypub/type.js';
|
import { IActivity } from '@/remote/activitypub/type.js';
|
||||||
import { MINUTE } from '@/const.js';
|
import { MINUTE } from '@/const.js';
|
||||||
|
import { DbResolver } from '@/remote/activitypub/db-resolver.js';
|
||||||
|
|
||||||
import processDeliver from './processors/deliver.js';
|
import processDeliver from './processors/deliver.js';
|
||||||
import processInbox from './processors/inbox.js';
|
import processInbox from './processors/inbox.js';
|
||||||
|
@ -96,28 +97,48 @@ webhookDeliverQueue
|
||||||
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
|
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
|
||||||
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
||||||
|
|
||||||
export function deliver(user: ThinUser, content: unknown, to: string | null, deletingUserId?: string) {
|
export async function deliver(content: IActivity|IActivity[], to: string | null, deletingUserId?: string) {
|
||||||
if (content == null) return null;
|
if (content == null) return null;
|
||||||
if (to == null) return null;
|
if (to == null) return null;
|
||||||
|
|
||||||
const data = {
|
// group activities by actor
|
||||||
user: {
|
const contentArray = Array.isArray(content) ? content : [content];
|
||||||
id: user.id,
|
let byActor = contentArray.reduce((acc, activity) => {
|
||||||
},
|
if (activity.actor == null) throw new Error("Cannot deliver activity without actor.");
|
||||||
content,
|
if (!(activity.actor in acc)) {
|
||||||
to,
|
acc[activity.actor] = [];
|
||||||
deletingUserId,
|
}
|
||||||
};
|
acc[activity.actor].push(activity);
|
||||||
|
return acc;
|
||||||
|
}, []);
|
||||||
|
|
||||||
return deliverQueue.add(data, {
|
// add groups to deliver queue
|
||||||
attempts: config.deliverJobMaxAttempts,
|
const dbResolver = new DbResolver();
|
||||||
timeout: MINUTE,
|
for (const actor in byActor) {
|
||||||
backoff: {
|
// extract user from the Activity
|
||||||
type: 'apBackoff',
|
const user = await dbResolver.getUserFromApId(actor);
|
||||||
},
|
if (!user) throw new Error("Actor not found, cannot deliver.");
|
||||||
removeOnComplete: true,
|
if (user.host != null) throw new Error("Cannot deliver for remote actor.");
|
||||||
removeOnFail: true,
|
|
||||||
});
|
// add item to deliver queue
|
||||||
|
const data = {
|
||||||
|
user: {
|
||||||
|
id: user.id,
|
||||||
|
},
|
||||||
|
content: byActor[actor],
|
||||||
|
to,
|
||||||
|
deletingUserId,
|
||||||
|
};
|
||||||
|
deliverQueue.add(data, {
|
||||||
|
attempts: config.deliverJobMaxAttempts,
|
||||||
|
timeout: MINUTE,
|
||||||
|
backoff: {
|
||||||
|
type: 'apBackoff',
|
||||||
|
},
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: true,
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
||||||
|
|
|
@ -9,7 +9,7 @@ export type DeliverJobData = {
|
||||||
/** Actor */
|
/** Actor */
|
||||||
user: ThinUser;
|
user: ThinUser;
|
||||||
/** Activity */
|
/** Activity */
|
||||||
content: unknown;
|
content: IActivity;
|
||||||
/** inbox URL to deliver */
|
/** inbox URL to deliver */
|
||||||
to: string;
|
to: string;
|
||||||
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */
|
/** set if this job is part of a user deletion, on completion or failure the isDeleted field needs to be decremented */
|
||||||
|
|
|
@ -14,6 +14,7 @@ interface IEveryoneRecipe extends IRecipe {
|
||||||
|
|
||||||
interface IFollowersRecipe extends IRecipe {
|
interface IFollowersRecipe extends IRecipe {
|
||||||
type: 'Followers';
|
type: 'Followers';
|
||||||
|
followee: ILocalUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface IDirectRecipe extends IRecipe {
|
interface IDirectRecipe extends IRecipe {
|
||||||
|
@ -32,26 +33,24 @@ const isDirect = (recipe: any): recipe is IDirectRecipe =>
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
||||||
export class DeliverManager {
|
export class DeliverManager {
|
||||||
private actor: { id: User['id']; host: null; };
|
|
||||||
private activity: any;
|
private activity: any;
|
||||||
private recipes: IRecipe[] = [];
|
private recipes: IRecipe[] = [];
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
* @param actor Actor
|
|
||||||
* @param activity Activity to deliver
|
* @param activity Activity to deliver
|
||||||
*/
|
*/
|
||||||
constructor(actor: { id: User['id']; host: null; }, activity: any) {
|
constructor(activity: any) {
|
||||||
this.actor = actor;
|
|
||||||
this.activity = activity;
|
this.activity = activity;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add recipe for followers deliver
|
* Add recipe for followers deliver
|
||||||
*/
|
*/
|
||||||
public addFollowersRecipe() {
|
public addFollowersRecipe(followee: ILocalUser) {
|
||||||
const deliver = {
|
const deliver = {
|
||||||
type: 'Followers',
|
type: 'Followers',
|
||||||
|
followee,
|
||||||
} as IFollowersRecipe;
|
} as IFollowersRecipe;
|
||||||
|
|
||||||
this.addRecipe(deliver);
|
this.addRecipe(deliver);
|
||||||
|
@ -89,9 +88,7 @@ export class DeliverManager {
|
||||||
* Execute delivers
|
* Execute delivers
|
||||||
*/
|
*/
|
||||||
public async execute(deletingUserId?: string) {
|
public async execute(deletingUserId?: string) {
|
||||||
if (!Users.isLocalUser(this.actor)) return;
|
const inboxes = new Set<string>();
|
||||||
|
|
||||||
let inboxes = new Set<string>();
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
build inbox list
|
build inbox list
|
||||||
|
@ -116,21 +113,25 @@ export class DeliverManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.recipes.some(r => isFollowers(r))) {
|
await Promise.all(
|
||||||
// followers deliver
|
this.recipes.filter(isFollowers)
|
||||||
const followers = await Followings.createQueryBuilder('followings')
|
.map(recipe => {
|
||||||
// return either the shared inbox (if available) or the individual inbox
|
// followers deliver
|
||||||
.select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox')
|
return Followings.createQueryBuilder('followings')
|
||||||
// so we don't have to make our inboxes Set work as hard
|
// return either the shared inbox (if available) or the individual inbox
|
||||||
.distinct(true)
|
.select('COALESCE(followings.followerSharedInbox, followings.followerInbox)', 'inbox')
|
||||||
// ...for the specific actors followers
|
// so we don't have to make our inboxes Set work as hard
|
||||||
.where('followings.followeeId = :actorId', { actorId: this.actor.id })
|
.distinct(true)
|
||||||
// don't deliver to ourselves
|
// ...for the specific actors followers
|
||||||
.andWhere('followings.followerHost IS NOT NULL')
|
.where('followings.followeeId = :actorId', { actorId: recipe.followee.id })
|
||||||
.getRawMany();
|
// don't deliver to ourselves
|
||||||
|
.andWhere('followings.followerHost IS NOT NULL')
|
||||||
followers.forEach(({ inbox }) => inboxes.add(inbox));
|
.getRawMany()
|
||||||
}
|
.then(followers =>
|
||||||
|
followers.forEach(({ inbox }) => inboxes.add(inbox))
|
||||||
|
);
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
this.recipes.filter((recipe): recipe is IDirectRecipe =>
|
this.recipes.filter((recipe): recipe is IDirectRecipe =>
|
||||||
// followers recipes have already been processed
|
// followers recipes have already been processed
|
||||||
|
@ -160,7 +161,7 @@ export class DeliverManager {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
filteredInboxes.forEach(inbox => deliver(this.actor, this.activity, inbox, deletingUserId));
|
filteredInboxes.forEach(inbox => deliver(this.activity, inbox, deletingUserId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,9 +171,9 @@ export class DeliverManager {
|
||||||
* @param activity Activity
|
* @param activity Activity
|
||||||
* @param from Followee
|
* @param from Followee
|
||||||
*/
|
*/
|
||||||
export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: null; }, activity: any) {
|
export async function deliverToFollowers(actor: ILocalUser, activity: any) {
|
||||||
const manager = new DeliverManager(actor, activity);
|
const manager = new DeliverManager(activity);
|
||||||
manager.addFollowersRecipe();
|
manager.addFollowersRecipe(actor);
|
||||||
await manager.execute();
|
await manager.execute();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -181,8 +182,8 @@ export async function deliverToFollowers(actor: { id: ILocalUser['id']; host: nu
|
||||||
* @param activity Activity
|
* @param activity Activity
|
||||||
* @param to Target user
|
* @param to Target user
|
||||||
*/
|
*/
|
||||||
export async function deliverToUser(actor: { id: ILocalUser['id']; host: null; }, activity: any, to: IRemoteUser) {
|
export async function deliverToUser(activity: any, to: IRemoteUser) {
|
||||||
const manager = new DeliverManager(actor, activity);
|
const manager = new DeliverManager(activity);
|
||||||
manager.addDirectRecipe(to);
|
manager.addDirectRecipe(to);
|
||||||
await manager.execute();
|
await manager.execute();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +0,0 @@
|
||||||
import config from '@/config/index.js';
|
|
||||||
import { User } from '@/models/entities/user.js';
|
|
||||||
import { MessagingMessage } from '@/models/entities/messaging-message.js';
|
|
||||||
|
|
||||||
export const renderReadActivity = (user: { id: User['id'] }, message: MessagingMessage) => ({
|
|
||||||
type: 'Read',
|
|
||||||
actor: `${config.url}/users/${user.id}`,
|
|
||||||
object: message.uri,
|
|
||||||
});
|
|
|
@ -7,10 +7,6 @@ import { MessagingMessages, UserGroupJoinings, Users } from '@/models/index.js';
|
||||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||||
import { UserGroup } from '@/models/entities/user-group.js';
|
import { UserGroup } from '@/models/entities/user-group.js';
|
||||||
import { toArray } from '@/prelude/array.js';
|
import { toArray } from '@/prelude/array.js';
|
||||||
import { renderReadActivity } from '@/remote/activitypub/renderer/read.js';
|
|
||||||
import { renderActivity } from '@/remote/activitypub/renderer/index.js';
|
|
||||||
import { deliver } from '@/queue/index.js';
|
|
||||||
import orderedCollection from '@/remote/activitypub/renderer/ordered-collection.js';
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Mark messages as read
|
* Mark messages as read
|
||||||
|
@ -133,18 +129,3 @@ export async function readGroupMessagingMessage(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function deliverReadActivity(user: { id: User['id']; host: null; }, recipient: IRemoteUser, messages: MessagingMessage | MessagingMessage[]) {
|
|
||||||
const contents = toArray(messages)
|
|
||||||
.filter(x => x.uri)
|
|
||||||
.map(x => renderReadActivity(user, x));
|
|
||||||
|
|
||||||
if (contents.length > 1) {
|
|
||||||
const collection = orderedCollection(null, contents.length, undefined, undefined, contents);
|
|
||||||
deliver(user, renderActivity(collection), recipient.inbox);
|
|
||||||
} else {
|
|
||||||
for (const content of contents) {
|
|
||||||
deliver(user, renderActivity(content), recipient.inbox);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ export default define(meta, paramDef, async (ps, me) => {
|
||||||
const actor = await getInstanceActor();
|
const actor = await getInstanceActor();
|
||||||
const targetUser = await Users.findOneByOrFail({ id: report.targetUserId });
|
const targetUser = await Users.findOneByOrFail({ id: report.targetUserId });
|
||||||
|
|
||||||
deliver(actor, renderActivity(renderFlag(actor, report)), targetUser.inbox);
|
await deliver(renderActivity(renderFlag(actor, report)), targetUser.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
await AbuseUserReports.update(report.id, {
|
await AbuseUserReports.update(report.id, {
|
||||||
|
|
|
@ -4,7 +4,7 @@ import define from '@/server/api/define.js';
|
||||||
import { ApiError } from '@/server/api/error.js';
|
import { ApiError } from '@/server/api/error.js';
|
||||||
import { getUser } from '@/server/api/common/getters.js';
|
import { getUser } from '@/server/api/common/getters.js';
|
||||||
import { makePaginationQuery } from '@/server/api/common/make-pagination-query.js';
|
import { makePaginationQuery } from '@/server/api/common/make-pagination-query.js';
|
||||||
import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js';
|
import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js';
|
||||||
|
|
||||||
export const meta = {
|
export const meta = {
|
||||||
tags: ['messaging'],
|
tags: ['messaging'],
|
||||||
|
@ -75,11 +75,6 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
// Mark all as read
|
// Mark all as read
|
||||||
if (ps.markAsRead) {
|
if (ps.markAsRead) {
|
||||||
readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id));
|
readUserMessagingMessage(user.id, recipient.id, messages.filter(m => m.recipientId === user.id).map(x => x.id));
|
||||||
|
|
||||||
// リモートユーザーとのメッセージだったら既読配信
|
|
||||||
if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) {
|
|
||||||
deliverReadActivity(user, recipient, messages);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, {
|
return await Promise.all(messages.map(message => MessagingMessages.pack(message, user, {
|
||||||
|
|
|
@ -133,7 +133,7 @@ export default define(meta, paramDef, async (ps, user) => {
|
||||||
if (note.userHost != null) {
|
if (note.userHost != null) {
|
||||||
const pollOwner = await Users.findOneByOrFail({ id: note.userId }) as IRemoteUser;
|
const pollOwner = await Users.findOneByOrFail({ id: note.userId }) as IRemoteUser;
|
||||||
|
|
||||||
deliver(user, renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox);
|
await deliver(renderActivity(await renderVote(user, vote, note, poll, pollOwner)), pollOwner.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
// リモートフォロワーにUpdate配信
|
// リモートフォロワーにUpdate配信
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index.js';
|
import { UserGroupJoinings, Users, MessagingMessages } from '@/models/index.js';
|
||||||
import { User, ILocalUser, IRemoteUser } from '@/models/entities/user.js';
|
import { User, ILocalUser, IRemoteUser } from '@/models/entities/user.js';
|
||||||
import { UserGroup } from '@/models/entities/user-group.js';
|
import { UserGroup } from '@/models/entities/user-group.js';
|
||||||
import { readUserMessagingMessage, readGroupMessagingMessage, deliverReadActivity } from '@/server/api/common/read-messaging-message.js';
|
import { readUserMessagingMessage, readGroupMessagingMessage } from '@/server/api/common/read-messaging-message.js';
|
||||||
import Channel from '@/server/api/stream/channel.js';
|
import Channel from '@/server/api/stream/channel.js';
|
||||||
import { StreamMessages } from '@/server/api/stream/types.js';
|
import { StreamMessages } from '@/server/api/stream/types.js';
|
||||||
|
|
||||||
|
@ -69,13 +69,6 @@ export default class extends Channel {
|
||||||
case 'read':
|
case 'read':
|
||||||
if (this.otherpartyId) {
|
if (this.otherpartyId) {
|
||||||
readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]);
|
readUserMessagingMessage(this.user!.id, this.otherpartyId, [body.id]);
|
||||||
|
|
||||||
// リモートユーザーからのメッセージだったら既読配信
|
|
||||||
if (Users.isLocalUser(this.user!) && Users.isRemoteUser(this.otherparty!)) {
|
|
||||||
MessagingMessages.findOneBy({ id: body.id }).then(message => {
|
|
||||||
if (message) deliverReadActivity(this.user as ILocalUser, this.otherparty as IRemoteUser, message);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
} else if (this.groupId) {
|
} else if (this.groupId) {
|
||||||
readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]);
|
readGroupMessagingMessage(this.user!.id, this.groupId, [body.id]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ export default async function(blocker: User, blockee: User): Promise<void> {
|
||||||
|
|
||||||
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee) && blocker.federateBlocks) {
|
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee) && blocker.federateBlocks) {
|
||||||
const content = renderActivity(renderBlock(blocking));
|
const content = renderActivity(renderBlock(blocking));
|
||||||
deliver(blocker, content, blockee.inbox);
|
deliver(content, blockee.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,13 +78,13 @@ async function cancelRequest(follower: User, followee: User): Promise<void> {
|
||||||
// Send Undo Follow if followee is remote
|
// Send Undo Follow if followee is remote
|
||||||
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
||||||
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
||||||
deliver(follower, content, followee.inbox);
|
deliver(content, followee.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send Reject if follower is remote
|
// Send Reject if follower is remote
|
||||||
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
||||||
const content = renderActivity(renderReject(renderFollow(follower, followee, request.requestId!), followee));
|
const content = renderActivity(renderReject(renderFollow(follower, followee, request.requestId!), followee));
|
||||||
deliver(followee, content, follower.inbox);
|
deliver(content, follower.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -125,7 +125,7 @@ async function unFollow(follower: User, followee: User): Promise<void> {
|
||||||
// Send Undo Follow if follower is remote
|
// Send Undo Follow if follower is remote
|
||||||
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
||||||
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
||||||
deliver(follower, content, followee.inbox);
|
deliver(content, followee.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,6 @@ export default async function(blocker: User, blockee: User) {
|
||||||
// deliver if remote bloking
|
// deliver if remote bloking
|
||||||
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) {
|
if (Users.isLocalUser(blocker) && Users.isRemoteUser(blockee)) {
|
||||||
const content = renderActivity(renderUndo(renderBlock(blocking), blocker));
|
const content = renderActivity(renderUndo(renderBlock(blocking), blocker));
|
||||||
deliver(blocker, content, blockee.inbox);
|
await deliver(content, blockee.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -146,7 +146,7 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us
|
||||||
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocked) {
|
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocked) {
|
||||||
// リモートフォローを受けてブロックしていた場合は、エラーにするのではなくRejectを送り返しておしまい。
|
// リモートフォローを受けてブロックしていた場合は、エラーにするのではなくRejectを送り返しておしまい。
|
||||||
const content = renderActivity(renderReject(renderFollow(follower, followee, requestId), followee));
|
const content = renderActivity(renderReject(renderFollow(follower, followee, requestId), followee));
|
||||||
deliver(followee , content, follower.inbox);
|
deliver(content, follower.inbox);
|
||||||
return;
|
return;
|
||||||
} else if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocking) {
|
} else if (Users.isRemoteUser(follower) && Users.isLocalUser(followee) && blocking) {
|
||||||
// リモートフォローを受けてブロックされているはずの場合だったら、ブロック解除しておく。
|
// リモートフォローを受けてブロックされているはずの場合だったら、ブロック解除しておく。
|
||||||
|
@ -195,6 +195,6 @@ export default async function(_follower: { id: User['id'] }, _followee: { id: Us
|
||||||
|
|
||||||
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
||||||
const content = renderActivity(renderAccept(renderFollow(follower, followee, requestId), followee));
|
const content = renderActivity(renderAccept(renderFollow(follower, followee, requestId), followee));
|
||||||
deliver(followee, content, follower.inbox);
|
deliver(content, follower.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,13 +52,13 @@ export default async function(follower: { id: User['id']; host: User['host']; ur
|
||||||
|
|
||||||
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
||||||
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
||||||
deliver(follower, content, followee.inbox);
|
deliver(content, followee.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Users.isLocalUser(followee) && Users.isRemoteUser(follower)) {
|
if (Users.isLocalUser(followee) && Users.isRemoteUser(follower)) {
|
||||||
// local user has null host
|
// local user has null host
|
||||||
const content = renderActivity(renderReject(renderFollow(follower, followee), followee));
|
const content = renderActivity(renderReject(renderFollow(follower, followee), followee));
|
||||||
deliver(followee, content, follower.inbox);
|
deliver(content, follower.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -99,7 +99,7 @@ async function deliverReject(followee: Local, follower: Remote): Promise<void> {
|
||||||
});
|
});
|
||||||
|
|
||||||
const content = renderActivity(renderReject(renderFollow(follower, followee, request?.requestId || undefined), followee));
|
const content = renderActivity(renderReject(renderFollow(follower, followee, request?.requestId || undefined), followee));
|
||||||
deliver(followee, content, follower.inbox);
|
deliver(content, follower.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -27,7 +27,7 @@ export async function acceptFollowRequest(followee: User, follower: User): Promi
|
||||||
|
|
||||||
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
if (Users.isRemoteUser(follower) && Users.isLocalUser(followee)) {
|
||||||
const content = renderActivity(renderAccept(renderFollow(follower, followee, request.requestId!), followee));
|
const content = renderActivity(renderAccept(renderFollow(follower, followee, request.requestId!), followee));
|
||||||
deliver(followee, content, follower.inbox);
|
await deliver(content, follower.inbox);
|
||||||
}
|
}
|
||||||
|
|
||||||
Users.pack(followee.id, followee, {
|
Users.pack(followee.id, followee, {
|
||||||
|
|
|
@ -17,7 +17,7 @@ export async function cancelFollowRequest(followee: User, follower: User): Promi
|
||||||
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
const content = renderActivity(renderUndo(renderFollow(follower, followee), follower));
|
||||||
|
|
||||||
if (Users.isLocalUser(follower)) {
|
if (Users.isLocalUser(follower)) {
|
||||||
deliver(follower, content, followee.inbox);
|
await deliver(content, followee.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -64,6 +64,6 @@ export async function createFollowRequest(follower: User, followee: User, reques
|
||||||
|
|
||||||
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
if (Users.isLocalUser(follower) && Users.isRemoteUser(followee)) {
|
||||||
const content = renderActivity(renderFollow(follower, followee));
|
const content = renderActivity(renderFollow(follower, followee));
|
||||||
deliver(follower, content, followee.inbox);
|
await deliver(content, followee.inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,7 @@ export async function createMessage(user: { id: User['id']; host: User['host'];
|
||||||
|
|
||||||
const activity = renderActivity(renderCreate(await renderNote(note, false, true), note));
|
const activity = renderActivity(renderCreate(await renderNote(note, false, true), note));
|
||||||
|
|
||||||
deliver(user, activity, recipientUser.inbox);
|
await deliver(activity, recipientUser.inbox);
|
||||||
}
|
}
|
||||||
return messageObj;
|
return messageObj;
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ async function postDeleteMessage(message: MessagingMessage): Promise<void> {
|
||||||
|
|
||||||
if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) {
|
if (Users.isLocalUser(user) && Users.isRemoteUser(recipient)) {
|
||||||
const activity = renderActivity(renderDelete(renderTombstone(`${config.url}/notes/${message.id}`), user));
|
const activity = renderActivity(renderDelete(renderTombstone(`${config.url}/notes/${message.id}`), user));
|
||||||
deliver(user, activity, recipient.inbox);
|
await deliver(activity, recipient.inbox);
|
||||||
}
|
}
|
||||||
} else if (message.groupId) {
|
} else if (message.groupId) {
|
||||||
publishGroupMessagingStream(message.groupId, 'deleted', message.id);
|
publishGroupMessagingStream(message.groupId, 'deleted', message.id);
|
||||||
|
|
|
@ -60,8 +60,8 @@ export async function deleteNotes(notes: Note[], user?: User): Promise<void> {
|
||||||
|
|
||||||
// Compute addressing information.
|
// Compute addressing information.
|
||||||
// Since we do not send any actual content, we send all note deletions to everyone.
|
// Since we do not send any actual content, we send all note deletions to everyone.
|
||||||
const manager = new DeliverManager(fetchedUser, content);
|
const manager = new DeliverManager(content);
|
||||||
manager.addFollowersRecipe();
|
manager.addFollowersRecipe(fetchedUser);
|
||||||
manager.addEveryone();
|
manager.addEveryone();
|
||||||
// Check mentioned users, since not all may have a shared inbox.
|
// Check mentioned users, since not all may have a shared inbox.
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
|
|
|
@ -130,14 +130,14 @@ export async function createReaction(user: { id: User['id']; host: User['host'];
|
||||||
//#region 配信
|
//#region 配信
|
||||||
if (Users.isLocalUser(user) && !note.localOnly) {
|
if (Users.isLocalUser(user) && !note.localOnly) {
|
||||||
const content = renderActivity(await renderLike(record, note));
|
const content = renderActivity(await renderLike(record, note));
|
||||||
const dm = new DeliverManager(user, content);
|
const dm = new DeliverManager(content);
|
||||||
if (note.userHost !== null) {
|
if (note.userHost !== null) {
|
||||||
const reactee = await Users.findOneBy({ id: note.userId });
|
const reactee = await Users.findOneBy({ id: note.userId });
|
||||||
dm.addDirectRecipe(reactee as IRemoteUser);
|
dm.addDirectRecipe(reactee as IRemoteUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (['public', 'home', 'followers'].includes(note.visibility)) {
|
if (['public', 'home', 'followers'].includes(note.visibility)) {
|
||||||
dm.addFollowersRecipe();
|
dm.addFollowersRecipe(user);
|
||||||
} else if (note.visibility === 'specified') {
|
} else if (note.visibility === 'specified') {
|
||||||
const visibleUsers = await Promise.all(note.visibleUserIds.map(id => Users.findOneBy({ id })));
|
const visibleUsers = await Promise.all(note.visibleUserIds.map(id => Users.findOneBy({ id })));
|
||||||
for (const u of visibleUsers.filter(u => u && Users.isRemoteUser(u))) {
|
for (const u of visibleUsers.filter(u => u && Users.isRemoteUser(u))) {
|
||||||
|
|
|
@ -46,12 +46,12 @@ export async function deleteReaction(user: { id: User['id']; host: User['host'];
|
||||||
//#region 配信
|
//#region 配信
|
||||||
if (Users.isLocalUser(user) && !note.localOnly) {
|
if (Users.isLocalUser(user) && !note.localOnly) {
|
||||||
const content = renderActivity(renderUndo(await renderLike(exist, note), user));
|
const content = renderActivity(renderUndo(await renderLike(exist, note), user));
|
||||||
const dm = new DeliverManager(user, content);
|
const dm = new DeliverManager(content);
|
||||||
if (note.userHost !== null) {
|
if (note.userHost !== null) {
|
||||||
const reactee = await Users.findOneBy({ id: note.userId });
|
const reactee = await Users.findOneBy({ id: note.userId });
|
||||||
dm.addDirectRecipe(reactee as IRemoteUser);
|
dm.addDirectRecipe(reactee as IRemoteUser);
|
||||||
}
|
}
|
||||||
dm.addFollowersRecipe();
|
dm.addFollowersRecipe(user);
|
||||||
dm.execute();
|
dm.execute();
|
||||||
}
|
}
|
||||||
//#endregion
|
//#endregion
|
||||||
|
|
|
@ -335,7 +335,7 @@ export async function sideEffects(user: User, note: Note, silent = false, create
|
||||||
if (Users.isLocalUser(user) && !note.localOnly && created) {
|
if (Users.isLocalUser(user) && !note.localOnly && created) {
|
||||||
(async () => {
|
(async () => {
|
||||||
const noteActivity = renderActivity(await renderNoteOrRenoteActivity(note));
|
const noteActivity = renderActivity(await renderNoteOrRenoteActivity(note));
|
||||||
const dm = new DeliverManager(user, noteActivity);
|
const dm = new DeliverManager(noteActivity);
|
||||||
|
|
||||||
// Delivered to remote users who have been mentioned
|
// Delivered to remote users who have been mentioned
|
||||||
for (const u of mentionedUsers.filter(u => Users.isRemoteUser(u))) {
|
for (const u of mentionedUsers.filter(u => Users.isRemoteUser(u))) {
|
||||||
|
@ -368,7 +368,7 @@ export async function sideEffects(user: User, note: Note, silent = false, create
|
||||||
|
|
||||||
// Deliver to followers
|
// Deliver to followers
|
||||||
if (['public', 'home', 'followers'].includes(note.visibility)) {
|
if (['public', 'home', 'followers'].includes(note.visibility)) {
|
||||||
dm.addFollowersRecipe();
|
dm.addFollowersRecipe(user);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (['public'].includes(note.visibility)) {
|
if (['public'].includes(note.visibility)) {
|
||||||
|
|
|
@ -36,9 +36,8 @@ export async function addRelay(inbox: string): Promise<Relay> {
|
||||||
}).then(x => Relays.findOneByOrFail(x.identifiers[0]));
|
}).then(x => Relays.findOneByOrFail(x.identifiers[0]));
|
||||||
|
|
||||||
const relayActor = await getRelayActor();
|
const relayActor = await getRelayActor();
|
||||||
const follow = renderFollowRelay(relay, relayActor);
|
const activity = renderActivity(renderFollowRelay(relay, relayActor));
|
||||||
const activity = renderActivity(follow);
|
await deliver(activity, relay.inbox);
|
||||||
deliver(relayActor, activity, relay.inbox);
|
|
||||||
|
|
||||||
return relay;
|
return relay;
|
||||||
}
|
}
|
||||||
|
@ -53,17 +52,14 @@ export async function removeRelay(inbox: string): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
const relayActor = await getRelayActor();
|
const relayActor = await getRelayActor();
|
||||||
const follow = renderFollowRelay(relay, relayActor);
|
const activity = renderActivity(renderUndo(renderFollowRelay(relay, relayActor), relayActor));
|
||||||
const undo = renderUndo(follow, relayActor);
|
await deliver(activity, relay.inbox);
|
||||||
const activity = renderActivity(undo);
|
|
||||||
deliver(relayActor, activity, relay.inbox);
|
|
||||||
|
|
||||||
await Relays.delete(relay.id);
|
await Relays.delete(relay.id);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function listRelay(): Promise<Relay[]> {
|
export async function listRelay(): Promise<Relay[]> {
|
||||||
const relays = await Relays.find();
|
return await Relays.find();
|
||||||
return relays;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function relayAccepted(id: string): Promise<string> {
|
export async function relayAccepted(id: string): Promise<string> {
|
||||||
|
@ -93,9 +89,9 @@ export async function deliverToRelays(user: { id: User['id']; host: null; }, act
|
||||||
|
|
||||||
const signed = await attachLdSignature(copy, user);
|
const signed = await attachLdSignature(copy, user);
|
||||||
|
|
||||||
for (const relay of relays) {
|
await Promise.all(relays.map(relay =>
|
||||||
deliver(user, signed, relay.inbox);
|
deliver(signed, relay.inbox)
|
||||||
}
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function deliverMultipleToRelays(user: User, activities: any[]): Promise<void> {
|
export async function deliverMultipleToRelays(user: User, activities: any[]): Promise<void> {
|
||||||
|
@ -111,7 +107,7 @@ export async function deliverMultipleToRelays(user: User, activities: any[]): Pr
|
||||||
return attachLdSignature(copy, user);
|
return attachLdSignature(copy, user);
|
||||||
}));
|
}));
|
||||||
|
|
||||||
for (const relay of relays) {
|
await Promise.all(relays.map(relay =>
|
||||||
deliver(user, content, relay.inbox);
|
deliver(content, relay.inbox)
|
||||||
}
|
));
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ export async function doPostSuspend(user: { id: User['id']; host: User['host'] }
|
||||||
const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user));
|
const content = renderActivity(renderDelete(`${config.url}/users/${user.id}`, user));
|
||||||
|
|
||||||
// deliver to all of known network
|
// deliver to all of known network
|
||||||
const dm = new DeliverManager(user, content);
|
const dm = new DeliverManager(content);
|
||||||
dm.addEveryone();
|
dm.addEveryone();
|
||||||
await dm.execute(user.id);
|
await dm.execute(user.id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,7 +32,7 @@ export async function doPostUnsuspend(user: User): Promise<void> {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const inbox of queue) {
|
for (const inbox of queue) {
|
||||||
deliver(user as any, content, inbox);
|
await deliver(content, inbox);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue