From dc3c80e3cef48bbbb1de2c5d8c6fb462b25de6e0 Mon Sep 17 00:00:00 2001 From: syuilo Date: Mon, 11 Jun 2018 06:48:25 +0900 Subject: [PATCH] wip --- src/daemons/hashtags-stats-child.ts | 60 +++++++++++++++++++++++++ src/daemons/hashtags-stats.ts | 20 +++++++++ src/{ => daemons}/notes-stats-child.ts | 10 +++-- src/{ => daemons}/notes-stats.ts | 0 src/{ => daemons}/server-stats.ts | 10 ++++- src/index.ts | 6 ++- src/server/api/stream/hashtags-stats.ts | 35 +++++++++++++++ src/server/api/streaming.ts | 6 +++ 8 files changed, 140 insertions(+), 7 deletions(-) create mode 100644 src/daemons/hashtags-stats-child.ts create mode 100644 src/daemons/hashtags-stats.ts rename src/{ => daemons}/notes-stats-child.ts (75%) rename src/{ => daemons}/notes-stats.ts (100%) rename src/{ => daemons}/server-stats.ts (88%) create mode 100644 src/server/api/stream/hashtags-stats.ts diff --git a/src/daemons/hashtags-stats-child.ts b/src/daemons/hashtags-stats-child.ts new file mode 100644 index 000000000..3f7f4d6e9 --- /dev/null +++ b/src/daemons/hashtags-stats-child.ts @@ -0,0 +1,60 @@ +import Note from '../models/note'; + +// 10分 +const interval = 1000 * 60 * 10; + +async function tick() { + const res = await Note.aggregate([{ + $match: { + createdAt: { + $gt: new Date(Date.now() - interval) + }, + tags: { + $exists: true, + $ne: [] + } + } + }, { + $unwind: '$tags' + }, { + $group: { + _id: '$tags', + count: { + $sum: 1 + } + } + }, { + $group: { + _id: null, + tags: { + $push: { + tag: '$_id', + count: '$count' + } + } + } + }, { + $project: { + _id: false, + tags: true + } + }]) as { + tags: Array<{ + tag: string; + count: number; + }> + }; + + const stats = res.tags + .sort((a, b) => a.count - b.count) + .map(tag => [tag.tag, tag.count]) + .slice(0, 10); + + console.log(stats); + + process.send(stats); +} + +tick(); + +setInterval(tick, interval); diff --git a/src/daemons/hashtags-stats.ts b/src/daemons/hashtags-stats.ts new file mode 100644 index 000000000..5ed028ac3 --- /dev/null +++ b/src/daemons/hashtags-stats.ts @@ -0,0 +1,20 @@ +import * as childProcess from 'child_process'; +import Xev from 'xev'; + +const ev = new Xev(); + +export default function() { + const log = []; + + const p = childProcess.fork(__dirname + '/hashtags-stats-child.js'); + + p.on('message', stats => { + ev.emit('hashtagsStats', stats); + log.push(stats); + if (log.length > 30) log.shift(); + }); + + ev.on('requestHashTagsStatsLog', id => { + ev.emit('hashtagsStatsLog:' + id, log); + }); +} diff --git a/src/notes-stats-child.ts b/src/daemons/notes-stats-child.ts similarity index 75% rename from src/notes-stats-child.ts rename to src/daemons/notes-stats-child.ts index 5f85a2a3c..7f54a36bf 100644 --- a/src/notes-stats-child.ts +++ b/src/daemons/notes-stats-child.ts @@ -1,8 +1,8 @@ -import Note from './models/note'; +import Note from '../models/note'; const interval = 5000; -setInterval(async () => { +async function tick() { const [all, local] = await Promise.all([Note.count({ createdAt: { $gte: new Date(Date.now() - interval) @@ -19,4 +19,8 @@ setInterval(async () => { }; process.send(stats); -}, interval); +} + +tick(); + +setInterval(tick, interval); diff --git a/src/notes-stats.ts b/src/daemons/notes-stats.ts similarity index 100% rename from src/notes-stats.ts rename to src/daemons/notes-stats.ts diff --git a/src/server-stats.ts b/src/daemons/server-stats.ts similarity index 88% rename from src/server-stats.ts rename to src/daemons/server-stats.ts index 7b0d4a857..140340250 100644 --- a/src/server-stats.ts +++ b/src/daemons/server-stats.ts @@ -5,6 +5,8 @@ import Xev from 'xev'; const ev = new Xev(); +const interval = 1000; + /** * Report server stats regularly */ @@ -15,7 +17,7 @@ export default function() { ev.emit('serverStatsLog:' + id, log); }); - setInterval(() => { + async function tick() { osUtils.cpuUsage(cpuUsage => { const disk = diskusage.checkSync(os.platform() == 'win32' ? 'c:' : '/'); const stats = { @@ -32,5 +34,9 @@ export default function() { log.push(stats); if (log.length > 50) log.shift(); }); - }, 1000); + } + + tick(); + + setInterval(tick, interval); } diff --git a/src/index.ts b/src/index.ts index 4a98b7564..27c5dd027 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,8 +17,9 @@ import ProgressBar from './utils/cli/progressbar'; import EnvironmentInfo from './utils/environmentInfo'; import MachineInfo from './utils/machineInfo'; import DependencyInfo from './utils/dependencyInfo'; -import serverStats from './server-stats'; -import notesStats from './notes-stats'; +import serverStats from './daemons/server-stats'; +import notesStats from './daemons/notes-stats'; +import hashtagsStats from './daemons/hashtags-stats'; import loadConfig from './config/load'; import { Config } from './config/types'; @@ -52,6 +53,7 @@ function main() { ev.mount(); serverStats(); notesStats(); + hashtagsStats(); } else { workerMain(opt); } diff --git a/src/server/api/stream/hashtags-stats.ts b/src/server/api/stream/hashtags-stats.ts new file mode 100644 index 000000000..47183467f --- /dev/null +++ b/src/server/api/stream/hashtags-stats.ts @@ -0,0 +1,35 @@ +import * as websocket from 'websocket'; +import Xev from 'xev'; + +const ev = new Xev(); + +export default function(request: websocket.request, connection: websocket.connection): void { + const onStats = stats => { + connection.send(JSON.stringify({ + type: 'stats', + body: stats + })); + }; + + connection.on('message', async data => { + const msg = JSON.parse(data.utf8Data); + + switch (msg.type) { + case 'requestLog': + ev.once('hashtagsStatsLog:' + msg.id, statsLog => { + connection.send(JSON.stringify({ + type: 'statsLog', + body: statsLog + })); + }); + ev.emit('requestHashtagsStatsLog', msg.id); + break; + } + }); + + ev.addListener('hashtagsStats', onStats); + + connection.on('close', () => { + ev.removeListener('hashtagsStats', onStats); + }); +} diff --git a/src/server/api/streaming.ts b/src/server/api/streaming.ts index 2d4cfc108..e4156096e 100644 --- a/src/server/api/streaming.ts +++ b/src/server/api/streaming.ts @@ -14,6 +14,7 @@ import othelloGameStream from './stream/othello-game'; import othelloStream from './stream/othello'; import serverStatsStream from './stream/server-stats'; import notesStatsStream from './stream/notes-stats'; +import hashtagsStatsStream from './stream/hashtags-stats'; import requestsStream from './stream/requests'; import { ParsedUrlQuery } from 'querystring'; import authenticate from './authenticate'; @@ -39,6 +40,11 @@ module.exports = (server: http.Server) => { return; } + if (request.resourceURL.pathname === '/hashtags-stats') { + hashtagsStatsStream(request, connection); + return; + } + if (request.resourceURL.pathname === '/requests') { requestsStream(request, connection); return;