forked from FoundKeyGang/FoundKey
refactor: remove note re-packing in streaming API
Instead of packing the note for public user before passing it to streams, the note is now either packed for the user the respective stream belongs to (`mainStream`) or not packed at all and then packed later (`notesStream`). Because this is a new common task between different channels, a shared implementation of packing a note from notesStream is created. This implementation will simply skip a note if it is not visible to the user that the channel belongs to.
This commit is contained in:
parent
c6192ac95a
commit
cfa371b52b
13 changed files with 72 additions and 169 deletions
|
@ -1,4 +1,8 @@
|
|||
import Connection from '.';
|
||||
import { Note } from '@/models/entities/note.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { Packed } from '@/misc/schema.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
|
||||
/**
|
||||
* Stream channel
|
||||
|
@ -54,6 +58,32 @@ export default abstract class Channel {
|
|||
});
|
||||
}
|
||||
|
||||
protected withPackedNote(callback: (note: Packed<'Note'>) => void): (Note) => void {
|
||||
return async (note: Note) => {
|
||||
try {
|
||||
// because `note` was previously JSON.stringify'ed, the fields that
|
||||
// were objects before are now strings and have to be restored or
|
||||
// removed from the object
|
||||
note.createdAt = new Date(note.createdAt);
|
||||
delete note.reply;
|
||||
delete note.renote;
|
||||
delete note.user;
|
||||
delete note.channel;
|
||||
|
||||
const packed = await Notes.pack(note, this.user, { detail: true });
|
||||
|
||||
callback(packed);
|
||||
} catch (err) {
|
||||
if (err instanceof IdentifiableError && err.id === '9725d0ce-ba28-4dde-95a7-2cbb2c15de24') {
|
||||
// skip: note not visible to user
|
||||
return;
|
||||
} else {
|
||||
throw err;
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public abstract init(params: any): void;
|
||||
public dispose?(): void;
|
||||
public onMessage?(type: string, body: any): void;
|
||||
|
|
|
@ -2,6 +2,7 @@ import Channel from '../channel.js';
|
|||
import { Notes } from '@/models/index.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { StreamMessages } from '../types.js';
|
||||
import { IdentifiableError } from '@/misc/identifiable-error.js';
|
||||
|
||||
export default class extends Channel {
|
||||
public readonly chName = 'antenna';
|
||||
|
@ -23,16 +24,25 @@ export default class extends Channel {
|
|||
|
||||
private async onEvent(data: StreamMessages['antenna']['payload']) {
|
||||
if (data.type === 'note') {
|
||||
const note = await Notes.pack(data.body.id, this.user, { detail: true });
|
||||
try {
|
||||
const note = await Notes.pack(data.body.id, this.user, { detail: true });
|
||||
|
||||
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.muting)) return;
|
||||
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.blocking)) return;
|
||||
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.muting)) return;
|
||||
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.blocking)) return;
|
||||
|
||||
this.connection.cacheNote(note);
|
||||
this.connection.cacheNote(note);
|
||||
|
||||
this.send('note', note);
|
||||
this.send('note', note);
|
||||
} catch (e) {
|
||||
if (e instanceof IdentifiableError && e.id === '9725d0ce-ba28-4dde-95a7-2cbb2c15de24') {
|
||||
// skip: note not visible to user
|
||||
return;
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
this.send(data.type, data.body);
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import Channel from '../channel.js';
|
||||
import { Notes, Users } from '@/models/index.js';
|
||||
import { Users } from '@/models/index.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { User } from '@/models/entities/user.js';
|
||||
import { StreamMessages } from '../types.js';
|
||||
|
@ -15,7 +15,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -30,19 +30,6 @@ export default class extends Channel {
|
|||
private async onNote(note: Packed<'Note'>) {
|
||||
if (note.channelId !== this.channelId) return;
|
||||
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
|
||||
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.muting)) return;
|
||||
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import Channel from '../channel.js';
|
||||
import { fetchMeta } from '@/misc/fetch-meta.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { checkWordMute } from '@/misc/check-word-mute.js';
|
||||
import { isInstanceMuted } from '@/misc/is-instance-muted.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
|
@ -13,7 +12,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -30,19 +29,6 @@ export default class extends Channel {
|
|||
if (note.visibility !== 'public') return;
|
||||
if (note.channelId != null) return;
|
||||
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
|
||||
// 関係ない返信は除外
|
||||
if (note.reply && !this.user!.showTimelineReplies) {
|
||||
const reply = note.reply;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import Channel from '../channel.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { normalizeForSearch } from '@/misc/normalize-for-search.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { Packed } from '@/misc/schema.js';
|
||||
|
@ -12,7 +11,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -29,13 +28,6 @@ export default class extends Channel {
|
|||
const matched = this.q.some(tags => tags.every(tag => noteTags.includes(normalizeForSearch(tag))));
|
||||
if (!matched) return;
|
||||
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
|
||||
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.muting)) return;
|
||||
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import Channel from '../channel.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { checkWordMute } from '@/misc/check-word-mute.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { isInstanceMuted } from '@/misc/is-instance-muted.js';
|
||||
|
@ -12,7 +11,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -31,29 +30,6 @@ export default class extends Channel {
|
|||
// Ignore notes from instances the user has muted
|
||||
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
|
||||
|
||||
if (['followers', 'specified'].includes(note.visibility)) {
|
||||
note = await Notes.pack(note.id, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
|
||||
if (note.isHidden) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// 関係ない返信は除外
|
||||
if (note.reply && !this.user!.showTimelineReplies) {
|
||||
const reply = note.reply;
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import Channel from '../channel.js';
|
||||
import { fetchMeta } from '@/misc/fetch-meta.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { checkWordMute } from '@/misc/check-word-mute.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { isInstanceMuted } from '@/misc/is-instance-muted.js';
|
||||
|
@ -13,7 +12,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -36,29 +35,6 @@ export default class extends Channel {
|
|||
(note.channelId != null && this.followingChannels.has(note.channelId))
|
||||
)) return;
|
||||
|
||||
if (['followers', 'specified'].includes(note.visibility)) {
|
||||
note = await Notes.pack(note.id, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
|
||||
if (note.isHidden) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user!, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// Ignore notes from instances the user has muted
|
||||
if (isInstanceMuted(note, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
import Channel from '../channel.js';
|
||||
import { fetchMeta } from '@/misc/fetch-meta.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { checkWordMute } from '@/misc/check-word-mute.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { Packed } from '@/misc/schema.js';
|
||||
|
@ -12,7 +11,7 @@ export default class extends Channel {
|
|||
|
||||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -30,19 +29,6 @@ export default class extends Channel {
|
|||
if (note.visibility !== 'public') return;
|
||||
if (note.channelId != null && !this.followingChannels.has(note.channelId)) return;
|
||||
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
|
||||
// 関係ない返信は除外
|
||||
if (note.reply && !this.user!.showTimelineReplies) {
|
||||
const reply = note.reply;
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import Channel from '../channel.js';
|
||||
import { Notes } from '@/models/index.js';
|
||||
import { isInstanceMuted, isUserFromMutedInstance } from '@/misc/is-instance-muted.js';
|
||||
|
||||
export default class extends Channel {
|
||||
|
@ -16,26 +15,12 @@ export default class extends Channel {
|
|||
if (isUserFromMutedInstance(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
|
||||
if (data.body.userId && this.muting.has(data.body.userId)) return;
|
||||
|
||||
if (data.body.note && data.body.note.isHidden) {
|
||||
const note = await Notes.pack(data.body.note.id, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
this.connection.cacheNote(note);
|
||||
data.body.note = note;
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'mention': {
|
||||
if (isInstanceMuted(data.body, new Set<string>(this.userProfile?.mutedInstances ?? []))) return;
|
||||
|
||||
if (this.muting.has(data.body.userId)) return;
|
||||
if (data.body.isHidden) {
|
||||
const note = await Notes.pack(data.body.id, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
this.connection.cacheNote(note);
|
||||
data.body = note;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import Channel from '../channel.js';
|
||||
import { Notes, UserListJoinings, UserLists } from '@/models/index.js';
|
||||
import { UserListJoinings, UserLists } from '@/models/index.js';
|
||||
import { User } from '@/models/entities/user.js';
|
||||
import { isUserRelated } from '@/misc/is-user-related.js';
|
||||
import { Packed } from '@/misc/schema.js';
|
||||
|
@ -15,7 +15,7 @@ export default class extends Channel {
|
|||
constructor(id: string, connection: Channel['connection']) {
|
||||
super(id, connection);
|
||||
this.updateListUsers = this.updateListUsers.bind(this);
|
||||
this.onNote = this.onNote.bind(this);
|
||||
this.onNote = this.withPackedNote(this.onNote.bind(this));
|
||||
}
|
||||
|
||||
public async init(params: any) {
|
||||
|
@ -51,29 +51,6 @@ export default class extends Channel {
|
|||
private async onNote(note: Packed<'Note'>) {
|
||||
if (!this.listUsers.includes(note.userId)) return;
|
||||
|
||||
if (['followers', 'specified'].includes(note.visibility)) {
|
||||
note = await Notes.pack(note.id, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
|
||||
if (note.isHidden) {
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
// リプライなら再pack
|
||||
if (note.replyId != null) {
|
||||
note.reply = await Notes.pack(note.replyId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
// Renoteなら再pack
|
||||
if (note.renoteId != null) {
|
||||
note.renote = await Notes.pack(note.renoteId, this.user, {
|
||||
detail: true,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// 流れてきたNoteがミュートしているユーザーが関わるものだったら無視する
|
||||
if (isUserRelated(note, this.muting)) return;
|
||||
// 流れてきたNoteがブロックされているユーザーが関わるものだったら無視する
|
||||
|
|
|
@ -243,7 +243,7 @@ export type StreamMessages = {
|
|||
};
|
||||
notes: {
|
||||
name: 'notesStream';
|
||||
payload: Packed<'Note'>;
|
||||
payload: Note;
|
||||
};
|
||||
};
|
||||
|
||||
|
|
|
@ -345,19 +345,15 @@ export default async (user: { id: User['id']; username: User['username']; host:
|
|||
}
|
||||
}
|
||||
|
||||
// Pack the note
|
||||
const noteObj = await Notes.pack(note);
|
||||
publishNotesStream(note);
|
||||
|
||||
publishNotesStream(noteObj);
|
||||
const webhooks = await getActiveWebhooks().then(webhooks => webhooks.filter(x => x.userId === user.id && x.on.includes('note')));
|
||||
|
||||
getActiveWebhooks().then(webhooks => {
|
||||
webhooks = webhooks.filter(x => x.userId === user.id && x.on.includes('note'));
|
||||
for (const webhook of webhooks) {
|
||||
webhookDeliver(webhook, 'note', {
|
||||
note: noteObj,
|
||||
});
|
||||
}
|
||||
});
|
||||
for (const webhook of webhooks) {
|
||||
webhookDeliver(webhook, 'note', {
|
||||
note: await Notes.pack(note, user),
|
||||
});
|
||||
}
|
||||
|
||||
const nm = new NotificationManager(user, note);
|
||||
const nmRelatedPromises = [];
|
||||
|
@ -378,12 +374,14 @@ export default async (user: { id: User['id']; username: User['username']; host:
|
|||
|
||||
if (!threadMuted) {
|
||||
nm.push(data.reply.userId, 'reply');
|
||||
publishMainStream(data.reply.userId, 'reply', noteObj);
|
||||
|
||||
const packedReply = await Notes.pack(note, { id: data.reply.userId });
|
||||
publishMainStream(data.reply.userId, 'reply', packedReply);
|
||||
|
||||
const webhooks = (await getActiveWebhooks()).filter(x => x.userId === data.reply!.userId && x.on.includes('reply'));
|
||||
for (const webhook of webhooks) {
|
||||
webhookDeliver(webhook, 'reply', {
|
||||
note: noteObj,
|
||||
note: packedReply,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -404,12 +402,13 @@ export default async (user: { id: User['id']; username: User['username']; host:
|
|||
|
||||
// Publish event
|
||||
if ((user.id !== data.renote.userId) && data.renote.userHost === null) {
|
||||
publishMainStream(data.renote.userId, 'renote', noteObj);
|
||||
const packedRenote = await Notes.pack(note, { id: data.renote.userId });
|
||||
publishMainStream(data.renote.userId, 'renote', packedRenote);
|
||||
|
||||
const webhooks = (await getActiveWebhooks()).filter(x => x.userId === data.renote!.userId && x.on.includes('renote'));
|
||||
for (const webhook of webhooks) {
|
||||
webhookDeliver(webhook, 'renote', {
|
||||
note: noteObj,
|
||||
note: packedRenote,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import {
|
|||
UserListStreamTypes,
|
||||
UserStreamTypes,
|
||||
} from '@/server/api/stream/types.js';
|
||||
import { Packed } from '@/misc/schema.js';
|
||||
|
||||
class Publisher {
|
||||
private publish = (channel: StreamChannels, type: string | null, value?: any): void => {
|
||||
|
@ -87,7 +86,7 @@ class Publisher {
|
|||
this.publish(`messagingIndexStream:${userId}`, type, typeof value === 'undefined' ? null : value);
|
||||
};
|
||||
|
||||
public publishNotesStream = (note: Packed<'Note'>): void => {
|
||||
public publishNotesStream = (note: Note): void => {
|
||||
this.publish('notesStream', null, note);
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in a new issue