diff --git a/packages/backend/src/queue/index.ts b/packages/backend/src/queue/index.ts index 55b5676c3..f52660448 100644 --- a/packages/backend/src/queue/index.ts +++ b/packages/backend/src/queue/index.ts @@ -84,34 +84,47 @@ webhookDeliverQueue .on('error', (job: any, err: Error) => webhookLogger.error(`error ${err}`, { job, e: renderError(err) })) .on('stalled', (job) => webhookLogger.warn(`stalled ${getJobInfo(job)} to=${job.data.to}`)); -export function deliver(content: unknown, to: string | null) { +export function deliver(content: IActivity|IActivity[], to: string | null) { if (content == null) return null; if (to == null) return null; - // extract user from the Activity - const userUri = content.actor; - if (!userUri) throw new Error("Cannot deliver activity without actor."); - const user = await new DbResolver().getUserFromApId(userUri); - if (!user) throw new Error("Actor not found, cannot deliver."); - if (user.host != null) throw new Error("Cannot deliver for remote actor."); + // group activities by actor + const contentArray = Array.isArray(content) ? content : [content]; + let byActor = contentArray.reduce((acc, activity) => { + if (activity.actor == null) throw new Error("Cannot deliver activity without actor."); + if (!(activity.actor in acc)) { + acc[activity.actor] = []; + } + acc[activity.actor].push(activity); + return acc; + }, []); - const data = { - user: { - id: user.id, - }, - content, - to, - }; + // add groups to deliver queue + const dbResolver = new DbResolver(); + for (const actor in byActor) { + // extract user from the Activity + const user = await dbResolver.getUserFromApId(actor); + if (!user) throw new Error("Actor not found, cannot deliver."); + if (user.host != null) throw new Error("Cannot deliver for remote actor."); - return deliverQueue.add(data, { - attempts: config.deliverJobMaxAttempts, - timeout: MINUTE, - backoff: { - type: 'apBackoff', - }, - removeOnComplete: true, - removeOnFail: true, - }); + // add item to deliver queue + const data = { + user: { + id: user.id, + }, + byActor[actor], + to, + }; + deliverQueue.add(data, { + attempts: config.deliverJobMaxAttempts, + timeout: MINUTE, + backoff: { + type: 'apBackoff', + }, + removeOnComplete: true, + removeOnFail: true, + }); + } } export function inbox(activity: IActivity, signature: httpSignature.IParsedSignature) {