fix for delivering multiple activities at once

This commit is contained in:
Johann150 2023-05-01 12:41:39 +02:00
parent b374a79eb1
commit 965ee4b041
Signed by untrusted user: Johann150
GPG key ID: 9EE6577A2A06F8F1

View file

@ -84,34 +84,47 @@ webhookDeliverQueue
.on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) }))
.on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`));
export function deliver(content: unknown, to: string | null) { export function deliver(content: IActivity|IActivity[], to: string | null) {
if (content == null) return null; if (content == null) return null;
if (to == null) return null; if (to == null) return null;
// extract user from the Activity // group activities by actor
const userUri = content.actor; const contentArray = Array.isArray(content) ? content : [content];
if (!userUri) throw new Error("Cannot deliver activity without actor."); let byActor = contentArray.reduce((acc, activity) => {
const user = await new DbResolver().getUserFromApId(userUri); if (activity.actor == null) throw new Error("Cannot deliver activity without actor.");
if (!user) throw new Error("Actor not found, cannot deliver."); if (!(activity.actor in acc)) {
if (user.host != null) throw new Error("Cannot deliver for remote actor."); acc[activity.actor] = [];
}
acc[activity.actor].push(activity);
return acc;
}, []);
const data = { // add groups to deliver queue
user: { const dbResolver = new DbResolver();
id: user.id, for (const actor in byActor) {
}, // extract user from the Activity
content, const user = await dbResolver.getUserFromApId(actor);
to, if (!user) throw new Error("Actor not found, cannot deliver.");
}; if (user.host != null) throw new Error("Cannot deliver for remote actor.");
return deliverQueue.add(data, { // add item to deliver queue
attempts: config.deliverJobMaxAttempts, const data = {
timeout: MINUTE, user: {
backoff: { id: user.id,
type: 'apBackoff', },
}, byActor[actor],
removeOnComplete: true, to,
removeOnFail: true, };
}); deliverQueue.add(data, {
attempts: config.deliverJobMaxAttempts,
timeout: MINUTE,
backoff: {
type: 'apBackoff',
},
removeOnComplete: true,
removeOnFail: true,
});
}
} }
export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) { export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {