Improve queue configuration

Resolve #4157
Resolve #4158
This commit is contained in:
syuilo 2019-02-06 13:51:02 +09:00
parent 08c0be11b2
commit e26bec6ab4
No known key found for this signature in database
GPG key ID: BDC4C49D06AB9D69
3 changed files with 71 additions and 30 deletions

View file

@ -5,11 +5,15 @@ program
.version(pkg.version) .version(pkg.version)
.option('--no-daemons', 'Disable daemon processes (for debbuging)') .option('--no-daemons', 'Disable daemon processes (for debbuging)')
.option('--disable-clustering', 'Disable clustering') .option('--disable-clustering', 'Disable clustering')
.option('--disable-queue', 'Disable job queue') .option('--disable-queue', 'Disable job queue processing')
.option('--only-queue', 'Pocessing job queue only')
.option('--quiet', 'Suppress all logs') .option('--quiet', 'Suppress all logs')
.option('--verbose', 'Enable all logs') .option('--verbose', 'Enable all logs')
.option('--slow', 'Delay all requests (for debbuging)') .option('--slow', 'Delay all requests (for debbuging)')
.option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.') .option('--color', 'This option is a dummy for some external program\'s (e.g. forever) issue.')
.parse(process.argv); .parse(process.argv);
if (process.env.MK_DISABLE_QUEUE) program.disableQueue = true;
if (process.env.MK_ONLY_QUEUE) program.onlyQueue = true;
export { program }; export { program };

View file

@ -35,6 +35,11 @@ const ev = new Xev();
function main() { function main() {
process.title = `Misskey (${cluster.isMaster ? 'master' : 'worker'})`; process.title = `Misskey (${cluster.isMaster ? 'master' : 'worker'})`;
if (program.onlyQueue) {
queueMain();
return;
}
if (cluster.isMaster || program.disableClustering) { if (cluster.isMaster || program.disableClustering) {
masterMain(); masterMain();
@ -53,12 +58,7 @@ function main() {
} }
} }
/** function greet() {
* Init master process
*/
async function masterMain() {
let config: Config;
if (!program.quiet) { if (!program.quiet) {
//#region Misskey logo //#region Misskey logo
const v = `v${pkg.version}`; const v = `v${pkg.version}`;
@ -75,10 +75,34 @@ async function masterMain() {
bootLogger.info('Welcome to Misskey!'); bootLogger.info('Welcome to Misskey!');
bootLogger.info(`Misskey v${pkg.version}`, true); bootLogger.info(`Misskey v${pkg.version}`, true);
bootLogger.info('Misskey is maintained by @syuilo, @AyaMorisawa, @mei23, and @acid-chicken.'); bootLogger.info('Misskey is maintained by @syuilo, @AyaMorisawa, @mei23, and @acid-chicken.');
}
/**
* Init master process
*/
async function masterMain() {
greet();
let config: Config;
try { try {
// initialize app // initialize app
config = await init(); config = await init();
if (config.port == null) {
bootLogger.error('The port is not configured. Please configure port.', true);
process.exit(1);
}
if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
process.exit(1);
}
if (!await isPortAvailable(config.port)) {
bootLogger.error(`Port ${config.port} is already in use`, true);
process.exit(1);
}
} catch (e) { } catch (e) {
bootLogger.error('Fatal error occurred during initialization', true); bootLogger.error('Fatal error occurred during initialization', true);
process.exit(1); process.exit(1);
@ -90,6 +114,9 @@ async function masterMain() {
await spawnWorkers(config.clusterLimit); await spawnWorkers(config.clusterLimit);
} }
// start queue
require('./queue').default();
bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, true); bootLogger.succ(`Now listening on port ${config.port} on ${config.url}`, true);
} }
@ -100,15 +127,35 @@ async function workerMain() {
// start server // start server
await require('./server').default(); await require('./server').default();
// start processor
require('./queue').default();
if (cluster.isWorker) { if (cluster.isWorker) {
// Send a 'ready' message to parent process // Send a 'ready' message to parent process
process.send('ready'); process.send('ready');
} }
} }
async function queueMain() {
greet();
try {
// initialize app
await init();
} catch (e) {
bootLogger.error('Fatal error occurred during initialization', true);
process.exit(1);
}
bootLogger.succ('Misskey initialized');
// start processor
const queue = require('./queue').default();
if (queue) {
bootLogger.succ('Queue started', true);
} else {
bootLogger.error('Queue not available');
}
}
const runningNodejsVersion = process.version.slice(1).split('.').map(x => parseInt(x, 10)); const runningNodejsVersion = process.version.slice(1).split('.').map(x => parseInt(x, 10));
const requiredNodejsVersion = [10, 0, 0]; const requiredNodejsVersion = [10, 0, 0];
const satisfyNodejsVersion = !lessThan(runningNodejsVersion, requiredNodejsVersion); const satisfyNodejsVersion = !lessThan(runningNodejsVersion, requiredNodejsVersion);
@ -170,21 +217,6 @@ async function init(): Promise<Config> {
configLogger.succ('Loaded'); configLogger.succ('Loaded');
if (config.port == null) {
bootLogger.error('The port is not configured. Please configure port.', true);
process.exit(1);
}
if (process.platform === 'linux' && isWellKnownPort(config.port) && !isRoot()) {
bootLogger.error('You need root privileges to listen on well-known port on Linux', true);
process.exit(1);
}
if (!await isPortAvailable(config.port)) {
bootLogger.error(`Port ${config.port} is already in use`, true);
process.exit(1);
}
// Try to connect to MongoDB // Try to connect to MongoDB
try { try {
await checkMongoDB(config, bootLogger); await checkMongoDB(config, bootLogger);

View file

@ -4,13 +4,15 @@ import config from '../config';
import { ILocalUser } from '../models/user'; import { ILocalUser } from '../models/user';
import { program } from '../argv'; import { program } from '../argv';
import handler from './processors'; import handler from './processors';
import { queueLogger } from './logger';
const enableQueue = config.redis != null && !program.disableQueue; const enableQueue = !program.disableQueue;
const queueAvailable = config.redis != null;
const queue = initializeQueue(); const queue = initializeQueue();
function initializeQueue() { function initializeQueue() {
if (enableQueue) { if (queueAvailable) {
return new Queue('misskey', { return new Queue('misskey', {
redis: { redis: {
port: config.redis.port, port: config.redis.port,
@ -30,7 +32,7 @@ function initializeQueue() {
} }
export function createHttpJob(data: any) { export function createHttpJob(data: any) {
if (enableQueue) { if (queueAvailable) {
return queue.createJob(data) return queue.createJob(data)
.retries(4) .retries(4)
.backoff('exponential', 16384) // 16s .backoff('exponential', 16384) // 16s
@ -52,7 +54,7 @@ export function deliver(user: ILocalUser, content: any, to: any) {
} }
export function createExportNotesJob(user: ILocalUser) { export function createExportNotesJob(user: ILocalUser) {
if (!enableQueue) throw 'queue disabled'; if (!queueAvailable) throw 'queue unavailable';
return queue.createJob({ return queue.createJob({
type: 'exportNotes', type: 'exportNotes',
@ -62,7 +64,10 @@ export function createExportNotesJob(user: ILocalUser) {
} }
export default function() { export default function() {
if (enableQueue) { if (queueAvailable && enableQueue) {
queue.process(128, handler); queue.process(128, handler);
queueLogger.succ('Processing started');
} }
return queue;
} }