forked from FoundKeyGang/FoundKey
server: refactor Cache to hold fetcher as attribute
Instead of having to pass the fetcher every time you want to fetch something, the fetcher is stored in an attribute of the Cache.
This commit is contained in:
parent
131c12a30b
commit
d1ec058d5c
11 changed files with 137 additions and 132 deletions
|
@ -1,10 +1,12 @@
|
|||
export class Cache<T> {
|
||||
public cache: Map<string | null, { date: number; value: T; }>;
|
||||
private lifetime: number;
|
||||
public fetcher: (key: string | null) => Promise<T | undefined>;
|
||||
|
||||
constructor(lifetime: Cache<never>['lifetime']) {
|
||||
constructor(lifetime: number, fetcher: Cache<T>['fetcher']) {
|
||||
this.cache = new Map();
|
||||
this.lifetime = lifetime;
|
||||
this.fetcher = fetcher;
|
||||
}
|
||||
|
||||
public set(key: string | null, value: T): void {
|
||||
|
@ -17,10 +19,13 @@ export class Cache<T> {
|
|||
public get(key: string | null): T | undefined {
|
||||
const cached = this.cache.get(key);
|
||||
if (cached == null) return undefined;
|
||||
|
||||
// discard if past the cache lifetime
|
||||
if ((Date.now() - cached.date) > this.lifetime) {
|
||||
this.cache.delete(key);
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return cached.value;
|
||||
}
|
||||
|
||||
|
@ -29,52 +34,22 @@ export class Cache<T> {
|
|||
}
|
||||
|
||||
/**
|
||||
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
|
||||
* optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします
|
||||
* If the value is cached, it is returned. Otherwise the fetcher is
|
||||
* run to get the value. If the fetcher returns undefined, it is
|
||||
* returned but not cached.
|
||||
*/
|
||||
public async fetch(key: string | null, fetcher: () => Promise<T>, validator?: (cachedValue: T) => boolean): Promise<T> {
|
||||
const cachedValue = this.get(key);
|
||||
if (cachedValue !== undefined) {
|
||||
if (validator) {
|
||||
if (validator(cachedValue)) {
|
||||
// Cache HIT
|
||||
return cachedValue;
|
||||
}
|
||||
} else {
|
||||
// Cache HIT
|
||||
return cachedValue;
|
||||
}
|
||||
}
|
||||
public async fetch(key: string | null): Promise<T | undefined> {
|
||||
const cached = this.get(key);
|
||||
if (cached !== undefined) {
|
||||
return cached;
|
||||
} else {
|
||||
const value = await this.fetcher();
|
||||
|
||||
// Cache MISS
|
||||
const value = await fetcher();
|
||||
this.set(key, value);
|
||||
return value;
|
||||
}
|
||||
// don't cache undefined
|
||||
if (value !== undefined)
|
||||
this.set(key, value);
|
||||
|
||||
/**
|
||||
* キャッシュがあればそれを返し、無ければfetcherを呼び出して結果をキャッシュ&返します
|
||||
* optional: キャッシュが存在してもvalidatorでfalseを返すとキャッシュ無効扱いにします
|
||||
*/
|
||||
public async fetchMaybe(key: string | null, fetcher: () => Promise<T | undefined>, validator?: (cachedValue: T) => boolean): Promise<T | undefined> {
|
||||
const cachedValue = this.get(key);
|
||||
if (cachedValue !== undefined) {
|
||||
if (validator) {
|
||||
if (validator(cachedValue)) {
|
||||
// Cache HIT
|
||||
return cachedValue;
|
||||
}
|
||||
} else {
|
||||
// Cache HIT
|
||||
return cachedValue;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
// Cache MISS
|
||||
const value = await fetcher();
|
||||
if (value !== undefined) {
|
||||
this.set(key, value);
|
||||
}
|
||||
return value;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,22 +3,26 @@ import { Note } from '@/models/entities/note.js';
|
|||
import { User } from '@/models/entities/user.js';
|
||||
import { UserListJoinings, UserGroupJoinings, Blockings } from '@/models/index.js';
|
||||
import * as Acct from '@/misc/acct.js';
|
||||
import { MINUTE } from '@/const.js';
|
||||
import { getFullApAccount } from './convert-host.js';
|
||||
import { Packed } from './schema.js';
|
||||
import { Cache } from './cache.js';
|
||||
|
||||
const blockingCache = new Cache<User['id'][]>(1000 * 60 * 5);
|
||||
const blockingCache = new Cache<User['id'][]>(
|
||||
5 * MINUTE,
|
||||
(blockerId) => Blockings.findBy({ blockerId }).then(res => res.map(x => x.blockeeId)),
|
||||
);
|
||||
|
||||
// NOTE: フォローしているユーザーのノート、リストのユーザーのノート、グループのユーザーのノート指定はパフォーマンス上の理由で無効になっている
|
||||
// designation for users you follow, list users and groups is disabled for performance reasons
|
||||
|
||||
/**
|
||||
* noteUserFollowers / antennaUserFollowing はどちらか一方が指定されていればよい
|
||||
* either noteUserFollowers or antennaUserFollowing must be specified
|
||||
*/
|
||||
export async function checkHitAntenna(antenna: Antenna, note: (Note | Packed<'Note'>), noteUser: { id: User['id']; username: string; host: string | null; }, noteUserFollowers?: User['id'][], antennaUserFollowing?: User['id'][]): Promise<boolean> {
|
||||
if (note.visibility === 'specified') return false;
|
||||
|
||||
// アンテナ作成者がノート作成者にブロックされていたらスキップ
|
||||
const blockings = await blockingCache.fetch(noteUser.id, () => Blockings.findBy({ blockerId: noteUser.id }).then(res => res.map(x => x.blockeeId)));
|
||||
// skip if the antenna creator is blocked by the note author
|
||||
const blockings = await blockingCache.fetch(noteUser.id);
|
||||
if (blockings.some(blocking => blocking === antenna.userId)) return false;
|
||||
|
||||
if (note.visibility === 'followers') {
|
||||
|
|
|
@ -3,8 +3,11 @@ import { User } from '@/models/entities/user.js';
|
|||
import { UserKeypair } from '@/models/entities/user-keypair.js';
|
||||
import { Cache } from './cache.js';
|
||||
|
||||
const cache = new Cache<UserKeypair>(Infinity);
|
||||
const cache = new Cache<UserKeypair>(
|
||||
Infinity,
|
||||
(userId) => UserKeypairs.findOneByOrFail({ userId }),
|
||||
);
|
||||
|
||||
export async function getUserKeypair(userId: User['id']): Promise<UserKeypair> {
|
||||
return await cache.fetch(userId, () => UserKeypairs.findOneByOrFail({ userId }));
|
||||
return await cache.fetch(userId);
|
||||
}
|
||||
|
|
|
@ -4,11 +4,24 @@ import { Emojis } from '@/models/index.js';
|
|||
import { Emoji } from '@/models/entities/emoji.js';
|
||||
import { Note } from '@/models/entities/note.js';
|
||||
import { query } from '@/prelude/url.js';
|
||||
import { HOUR } from '@/const.js';
|
||||
import { Cache } from './cache.js';
|
||||
import { isSelfHost, toPunyNullable } from './convert-host.js';
|
||||
import { decodeReaction } from './reaction-lib.js';
|
||||
|
||||
const cache = new Cache<Emoji | null>(1000 * 60 * 60 * 12);
|
||||
/**
|
||||
* composite cache key: `${host ?? ''}:${name}`
|
||||
*/
|
||||
const cache = new Cache<Emoji | null>(
|
||||
12 * HOUR,
|
||||
async (key) => {
|
||||
const [host, name] = key.split(':');
|
||||
return (await Emojis.findOneBy({
|
||||
name,
|
||||
host: host || IsNull(),
|
||||
})) || null;
|
||||
},
|
||||
);
|
||||
|
||||
/**
|
||||
* Information needed to attach in ActivityPub
|
||||
|
@ -51,12 +64,7 @@ export async function populateEmoji(emojiName: string, noteUserHost: string | nu
|
|||
const { name, host } = parseEmojiStr(emojiName, noteUserHost);
|
||||
if (name == null) return null;
|
||||
|
||||
const queryOrNull = async () => (await Emojis.findOneBy({
|
||||
name,
|
||||
host: host ?? IsNull(),
|
||||
})) || null;
|
||||
|
||||
const emoji = await cache.fetch(`${name} ${host}`, queryOrNull);
|
||||
const emoji = await cache.fetch(`${host ?? ''}:${name}`);
|
||||
|
||||
if (emoji == null) return null;
|
||||
|
||||
|
@ -105,7 +113,10 @@ export function aggregateNoteEmojis(notes: Note[]) {
|
|||
* Query list of emojis in bulk and add them to the cache.
|
||||
*/
|
||||
export async function prefetchEmojis(emojis: { name: string; host: string | null; }[]): Promise<void> {
|
||||
const notCachedEmojis = emojis.filter(emoji => cache.get(`${emoji.name} ${emoji.host}`) == null);
|
||||
const notCachedEmojis = emojis.filter(emoji => {
|
||||
// check if the cache has this emoji
|
||||
return cache.get(`${emoji.host ?? ''}:${emoji.name}`) == null;
|
||||
});
|
||||
|
||||
// check if there even are any uncached emoji to handle
|
||||
if (notCachedEmojis.length === 0) return;
|
||||
|
@ -127,7 +138,7 @@ export async function prefetchEmojis(emojis: { name: string; host: string | null
|
|||
}).then(emojis => {
|
||||
// store all emojis into the cache
|
||||
emojis.forEach(emoji => {
|
||||
cache.set(`${emoji.name} ${emoji.host}`, emoji);
|
||||
cache.set(`${emoji.host ?? ''}:${emoji.name}`, emoji);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
|
|
@ -6,13 +6,16 @@ import { Packed } from '@/misc/schema.js';
|
|||
import { awaitAll, Promiseable } from '@/prelude/await-all.js';
|
||||
import { populateEmojis } from '@/misc/populate-emojis.js';
|
||||
import { getAntennas } from '@/misc/antenna-cache.js';
|
||||
import { USER_ACTIVE_THRESHOLD, USER_ONLINE_THRESHOLD } from '@/const.js';
|
||||
import { USER_ACTIVE_THRESHOLD, USER_ONLINE_THRESHOLD, HOUR } from '@/const.js';
|
||||
import { Cache } from '@/misc/cache.js';
|
||||
import { db } from '@/db/postgre.js';
|
||||
import { Instance } from '../entities/instance.js';
|
||||
import { Notes, NoteUnreads, FollowRequests, Notifications, MessagingMessages, UserNotePinings, Followings, Blockings, Mutings, RenoteMutings, UserProfiles, UserSecurityKeys, UserGroupJoinings, Pages, Announcements, AnnouncementReads, AntennaNotes, ChannelFollowings, Instances, DriveFiles } from '../index.js';
|
||||
|
||||
const userInstanceCache = new Cache<Instance | null>(1000 * 60 * 60 * 3);
|
||||
const userInstanceCache = new Cache<Instance | null>(
|
||||
3 * HOUR,
|
||||
(host) => Instances.findOneBy({ host }).then(x => x ?? undefined),
|
||||
);
|
||||
|
||||
type IsUserDetailed<Detailed extends boolean> = Detailed extends true ? Packed<'UserDetailed'> : Packed<'UserLite'>;
|
||||
type IsMeAndIsUserDetailed<ExpectsMe extends boolean | null, Detailed extends boolean> =
|
||||
|
@ -309,17 +312,15 @@ export const UserRepository = db.getRepository(User).extend({
|
|||
isModerator: user.isModerator || falsy,
|
||||
isBot: user.isBot || falsy,
|
||||
isCat: user.isCat || falsy,
|
||||
instance: user.host ? userInstanceCache.fetch(user.host,
|
||||
() => Instances.findOneBy({ host: user.host! }),
|
||||
v => v != null,
|
||||
).then(instance => instance ? {
|
||||
name: instance.name,
|
||||
softwareName: instance.softwareName,
|
||||
softwareVersion: instance.softwareVersion,
|
||||
iconUrl: instance.iconUrl,
|
||||
faviconUrl: instance.faviconUrl,
|
||||
themeColor: instance.themeColor,
|
||||
} : undefined) : undefined,
|
||||
instance: !user.host ? undefined : userInstanceCache.fetch(user.host)
|
||||
.then(instance => !instance ? undefined : {
|
||||
name: instance.name,
|
||||
softwareName: instance.softwareName,
|
||||
softwareVersion: instance.softwareVersion,
|
||||
iconUrl: instance.iconUrl,
|
||||
faviconUrl: instance.faviconUrl,
|
||||
themeColor: instance.themeColor,
|
||||
}),
|
||||
emojis: populateEmojis(user.emojis, user.host),
|
||||
onlineStatus: this.getOnlineStatus(user),
|
||||
|
||||
|
|
|
@ -10,8 +10,14 @@ import { uriPersonCache, userByIdCache } from '@/services/user-cache.js';
|
|||
import { IObject, getApId } from './type.js';
|
||||
import { resolvePerson } from './models/person.js';
|
||||
|
||||
const publicKeyCache = new Cache<UserPublickey | null>(Infinity);
|
||||
const publicKeyByUserIdCache = new Cache<UserPublickey | null>(Infinity);
|
||||
const publicKeyCache = new Cache<UserPublickey>(
|
||||
Infinity,
|
||||
(keyId) => UserPublickeys.findOneBy({ keyId }).then(x => x ?? undefined),
|
||||
);
|
||||
const publicKeyByUserIdCache = new Cache<UserPublickey>(
|
||||
Infinity,
|
||||
(userId) => UserPublickeys.findOneBy({ userId }).then(x => x ?? undefined),
|
||||
);
|
||||
|
||||
export type UriParseResult = {
|
||||
/** wether the URI was generated by us */
|
||||
|
@ -99,13 +105,9 @@ export default class DbResolver {
|
|||
if (parsed.local) {
|
||||
if (parsed.type !== 'users') return null;
|
||||
|
||||
return await userByIdCache.fetchMaybe(parsed.id, () => Users.findOneBy({
|
||||
id: parsed.id,
|
||||
}).then(x => x ?? undefined)) ?? null;
|
||||
return await userByIdCache.fetch(parsed.id) ?? null;
|
||||
} else {
|
||||
return await uriPersonCache.fetch(parsed.uri, () => Users.findOneBy({
|
||||
uri: parsed.uri,
|
||||
}));
|
||||
return await uriPersonCache.fetch(parsed.uri) ?? null;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -116,20 +118,12 @@ export default class DbResolver {
|
|||
user: CacheableRemoteUser;
|
||||
key: UserPublickey;
|
||||
} | null> {
|
||||
const key = await publicKeyCache.fetch(keyId, async () => {
|
||||
const key = await UserPublickeys.findOneBy({
|
||||
keyId,
|
||||
});
|
||||
|
||||
if (key == null) return null;
|
||||
|
||||
return key;
|
||||
}, key => key != null);
|
||||
const key = await publicKeyCache.fetch(keyId);
|
||||
|
||||
if (key == null) return null;
|
||||
|
||||
return {
|
||||
user: await userByIdCache.fetch(key.userId, () => Users.findOneByOrFail({ id: key.userId })) as CacheableRemoteUser,
|
||||
user: await userByIdCache.fetch(key.userId) as CacheableRemoteUser,
|
||||
key,
|
||||
};
|
||||
}
|
||||
|
@ -145,7 +139,7 @@ export default class DbResolver {
|
|||
|
||||
if (user == null) return null;
|
||||
|
||||
const key = await publicKeyByUserIdCache.fetch(user.id, () => UserPublickeys.findOneBy({ userId: user.id }), v => v != null);
|
||||
const key = await publicKeyByUserIdCache.fetch(user.id);
|
||||
|
||||
return {
|
||||
user,
|
||||
|
|
|
@ -6,7 +6,10 @@ import { App } from '@/models/entities/app.js';
|
|||
import { userByIdCache, localUserByNativeTokenCache } from '@/services/user-cache.js';
|
||||
import isNativeToken from './common/is-native-token.js';
|
||||
|
||||
const appCache = new Cache<App>(Infinity);
|
||||
const appCache = new Cache<App>(
|
||||
Infinity,
|
||||
(id) => Apps.findOneByOrFail({ id }),
|
||||
);
|
||||
|
||||
export class AuthenticationError extends Error {
|
||||
constructor(message: string) {
|
||||
|
@ -39,8 +42,7 @@ export default async (authorization: string | null | undefined, bodyToken: strin
|
|||
const token: string = maybeToken;
|
||||
|
||||
if (isNativeToken(token)) {
|
||||
const user = await localUserByNativeTokenCache.fetch(token,
|
||||
() => Users.findOneBy({ token }) as Promise<ILocalUser | null>);
|
||||
const user = await localUserByNativeTokenCache.fetch(token);
|
||||
|
||||
if (user == null) {
|
||||
throw new AuthenticationError('unknown token');
|
||||
|
@ -64,17 +66,13 @@ export default async (authorization: string | null | undefined, bodyToken: strin
|
|||
lastUsedAt: new Date(),
|
||||
});
|
||||
|
||||
const user = await userByIdCache.fetch(accessToken.userId,
|
||||
() => Users.findOneBy({
|
||||
id: accessToken.userId,
|
||||
}) as Promise<ILocalUser>);
|
||||
const user = await userByIdCache.fetch(accessToken.userId);
|
||||
|
||||
// can't authorize remote users
|
||||
if (!Users.isLocalUser(user)) return [null, null];
|
||||
|
||||
if (accessToken.appId) {
|
||||
const app = await appCache.fetch(accessToken.appId,
|
||||
() => Apps.findOneByOrFail({ id: accessToken.appId! }));
|
||||
const app = await appCache.fetch(accessToken.appId);
|
||||
|
||||
return [user, {
|
||||
id: accessToken.id,
|
||||
|
|
|
@ -36,13 +36,22 @@ import { Cache } from '@/misc/cache.js';
|
|||
import { UserProfile } from '@/models/entities/user-profile.js';
|
||||
import { getActiveWebhooks } from '@/misc/webhook-cache.js';
|
||||
import { IActivity } from '@/remote/activitypub/type.js';
|
||||
import { MINUTE } from '@/const.js';
|
||||
import { updateHashtags } from '../update-hashtag.js';
|
||||
import { registerOrFetchInstanceDoc } from '../register-or-fetch-instance-doc.js';
|
||||
import { createNotification } from '../create-notification.js';
|
||||
import { addNoteToAntenna } from '../add-note-to-antenna.js';
|
||||
import { deliverToRelays } from '../relay.js';
|
||||
|
||||
const mutedWordsCache = new Cache<{ userId: UserProfile['userId']; mutedWords: UserProfile['mutedWords']; }[]>(1000 * 60 * 5);
|
||||
const mutedWordsCache = new Cache<{ userId: UserProfile['userId']; mutedWords: UserProfile['mutedWords']; }[]>(
|
||||
5 * MINUTE,
|
||||
() => UserProfiles.find({
|
||||
where: {
|
||||
enableWordMute: true,
|
||||
},
|
||||
select: ['userId', 'mutedWords'],
|
||||
}),
|
||||
);
|
||||
|
||||
type NotificationType = 'reply' | 'renote' | 'quote' | 'mention';
|
||||
|
||||
|
@ -257,12 +266,7 @@ export default async (user: { id: User['id']; username: User['username']; host:
|
|||
incNotesCountOfUser(user);
|
||||
|
||||
// Word mute
|
||||
mutedWordsCache.fetch(null, () => UserProfiles.find({
|
||||
where: {
|
||||
enableWordMute: true,
|
||||
},
|
||||
select: ['userId', 'mutedWords'],
|
||||
})).then(us => {
|
||||
mutedWordsCache.fetch(null).then(us => {
|
||||
for (const u of us) {
|
||||
checkWordMute(note, { id: u.userId }, u.mutedWords).then(shouldMute => {
|
||||
if (shouldMute) {
|
||||
|
|
|
@ -3,29 +3,27 @@ import { Instances } from '@/models/index.js';
|
|||
import { genId } from '@/misc/gen-id.js';
|
||||
import { toPuny } from '@/misc/convert-host.js';
|
||||
import { Cache } from '@/misc/cache.js';
|
||||
import { HOUR } from '@/const.js';
|
||||
|
||||
const cache = new Cache<Instance>(1000 * 60 * 60);
|
||||
const cache = new Cache<Instance>(
|
||||
HOUR,
|
||||
(host) => Instances.findOneBy({ host }).then(x => x ?? undefined),
|
||||
);
|
||||
|
||||
export async function registerOrFetchInstanceDoc(idnHost: string): Promise<Instance> {
|
||||
const host = toPuny(idnHost);
|
||||
|
||||
const cached = cache.get(host);
|
||||
const cached = cache.fetch(host);
|
||||
if (cached) return cached;
|
||||
|
||||
const index = await Instances.findOneBy({ host });
|
||||
// apparently a new instance
|
||||
const i = await Instances.insert({
|
||||
id: genId(),
|
||||
host,
|
||||
caughtAt: new Date(),
|
||||
lastCommunicatedAt: new Date(),
|
||||
}).then(x => Instances.findOneByOrFail(x.identifiers[0]));
|
||||
|
||||
if (index == null) {
|
||||
const i = await Instances.insert({
|
||||
id: genId(),
|
||||
host,
|
||||
caughtAt: new Date(),
|
||||
lastCommunicatedAt: new Date(),
|
||||
}).then(x => Instances.findOneByOrFail(x.identifiers[0]));
|
||||
|
||||
cache.set(host, i);
|
||||
return i;
|
||||
} else {
|
||||
cache.set(host, index);
|
||||
return index;
|
||||
}
|
||||
cache.set(host, i);
|
||||
return i;
|
||||
}
|
||||
|
|
|
@ -8,11 +8,21 @@ import { Users, Relays } from '@/models/index.js';
|
|||
import { genId } from '@/misc/gen-id.js';
|
||||
import { Cache } from '@/misc/cache.js';
|
||||
import { Relay } from '@/models/entities/relay.js';
|
||||
import { MINUTE } from '@/const.js';
|
||||
import { createSystemUser } from './create-system-user.js';
|
||||
|
||||
const ACTOR_USERNAME = 'relay.actor' as const;
|
||||
|
||||
const relaysCache = new Cache<Relay[]>(1000 * 60 * 10);
|
||||
/**
|
||||
* There is only one cache key: null.
|
||||
* A cache is only used here to have expiring storage.
|
||||
*/
|
||||
const relaysCache = new Cache<Relay[]>(
|
||||
10 * MINUTE,
|
||||
() => Relays.findBy({
|
||||
status: 'accepted',
|
||||
}),
|
||||
);
|
||||
|
||||
export async function getRelayActor(): Promise<ILocalUser> {
|
||||
const user = await Users.findOneBy({
|
||||
|
@ -83,9 +93,7 @@ export async function relayRejected(id: string) {
|
|||
export async function deliverToRelays(user: { id: User['id']; host: null; }, activity: any) {
|
||||
if (activity == null) return;
|
||||
|
||||
const relays = await relaysCache.fetch(null, () => Relays.findBy({
|
||||
status: 'accepted',
|
||||
}));
|
||||
const relays = await relaysCache.fetch(null);
|
||||
if (relays.length === 0) return;
|
||||
|
||||
// TODO
|
||||
|
|
|
@ -3,9 +3,18 @@ import { Users } from '@/models/index.js';
|
|||
import { Cache } from '@/misc/cache.js';
|
||||
import { subscriber } from '@/db/redis.js';
|
||||
|
||||
export const userByIdCache = new Cache<CacheableUser>(Infinity);
|
||||
export const localUserByNativeTokenCache = new Cache<CacheableLocalUser | null>(Infinity);
|
||||
export const uriPersonCache = new Cache<CacheableUser | null>(Infinity);
|
||||
export const userByIdCache = new Cache<CacheableUser>(
|
||||
Infinity,
|
||||
(id) => Users.findOneBy({ id }).then(x => x ?? undefined),
|
||||
);
|
||||
export const localUserByNativeTokenCache = new Cache<CacheableLocalUser>(
|
||||
Infinity,
|
||||
(token) => Users.findOneBy({ token }).then(x => x ?? undefined),
|
||||
);
|
||||
export const uriPersonCache = new Cache<CacheableUser>(
|
||||
Infinity,
|
||||
(uri) => Users.findOneBy({ uri }).then(x => x ?? undefined),
|
||||
);
|
||||
|
||||
subscriber.on('message', async (_, data) => {
|
||||
const obj = JSON.parse(data);
|
||||
|
|
Loading…
Reference in a new issue