refactor deliver
to extract actor from activity #377
1 changed files with 37 additions and 24 deletions
|
@ -97,35 +97,48 @@ webhookDeliverQueue
|
|||
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`))
|
||||
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
|
||||
|
||||
export async function deliver(content: unknown, to: string | null, deletingUserId?: string) {
|
||||
export async function deliver(content: IActivity|IActivity[], to: string | null, deletingUserId?: string) {
|
||||
if (content == null) return null;
|
||||
if (to == null) return null;
|
||||
|
||||
// extract user from the Activity
|
||||
const userUri = content.actor;
|
||||
if (!userUri) throw new Error("Cannot deliver activity without actor.");
|
||||
const user = await new DbResolver().getUserFromApId(userUri);
|
||||
if (!user) throw new Error("Actor not found, cannot deliver.");
|
||||
if (user.host != null) throw new Error("Cannot deliver for remote actor.");
|
||||
// group activities by actor
|
||||
const contentArray = Array.isArray(content) ? content : [content];
|
||||
let byActor = contentArray.reduce((acc, activity) => {
|
||||
if (activity.actor == null) throw new Error("Cannot deliver activity without actor.");
|
||||
if (!(activity.actor in acc)) {
|
||||
acc[activity.actor] = [];
|
||||
}
|
||||
acc[activity.actor].push(activity);
|
||||
return acc;
|
||||
}, []);
|
||||
|
||||
const data = {
|
||||
user: {
|
||||
id: user.id,
|
||||
},
|
||||
content,
|
||||
to,
|
||||
deletingUserId,
|
||||
};
|
||||
// add groups to deliver queue
|
||||
const dbResolver = new DbResolver();
|
||||
for (const actor in byActor) {
|
||||
// extract user from the Activity
|
||||
const user = await dbResolver.getUserFromApId(actor);
|
||||
if (!user) throw new Error("Actor not found, cannot deliver.");
|
||||
if (user.host != null) throw new Error("Cannot deliver for remote actor.");
|
||||
|
||||
return deliverQueue.add(data, {
|
||||
attempts: config.deliverJobMaxAttempts,
|
||||
timeout: MINUTE,
|
||||
backoff: {
|
||||
type: 'apBackoff',
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
});
|
||||
// add item to deliver queue
|
||||
const data = {
|
||||
user: {
|
||||
id: user.id,
|
||||
},
|
||||
byActor[actor],
|
||||
to,
|
||||
deletingUserId,
|
||||
};
|
||||
deliverQueue.add(data, {
|
||||
attempts: config.deliverJobMaxAttempts,
|
||||
timeout: MINUTE,
|
||||
backoff: {
|
||||
type: 'apBackoff',
|
||||
},
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {
|
||||
|
|
Loading…
Reference in a new issue