Improve streaming API cluster logging (#3370)
* Improve streaming API cluster logging * Less verbose error middleware logging (stack trace useless there) * Fix error logging * Prevent potential issue * Add missing "done()" in catch of Promise.all, websocket heartbeat re-implemented like in example * I actually forgot a done(), the absolute madman
This commit is contained in:
parent
2f10e05e24
commit
1245d4fdbc
3 changed files with 67 additions and 47 deletions
|
@ -102,6 +102,7 @@
|
|||
"sass-loader": "^6.0.3",
|
||||
"stringz": "^0.1.2",
|
||||
"style-loader": "^0.16.1",
|
||||
"throng": "^4.0.0",
|
||||
"uuid": "^3.0.1",
|
||||
"uws": "^0.14.5",
|
||||
"webpack": "^2.4.1",
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import os from 'os';
|
||||
import cluster from 'cluster';
|
||||
import throng from 'throng';
|
||||
import dotenv from 'dotenv';
|
||||
import express from 'express';
|
||||
import http from 'http';
|
||||
|
@ -16,6 +16,8 @@ dotenv.config({
|
|||
path: env === 'production' ? '.env.production' : '.env',
|
||||
});
|
||||
|
||||
log.level = process.env.LOG_LEVEL || 'verbose';
|
||||
|
||||
const dbUrlToConfig = (dbUrl) => {
|
||||
if (!dbUrl) {
|
||||
return {};
|
||||
|
@ -65,24 +67,15 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
|
|||
}));
|
||||
};
|
||||
|
||||
if (cluster.isMaster) {
|
||||
// Cluster master
|
||||
const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
|
||||
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
|
||||
|
||||
const fork = () => {
|
||||
const worker = cluster.fork();
|
||||
|
||||
worker.on('exit', (code, signal) => {
|
||||
log.error(`Worker died with exit code ${code}, signal ${signal} received.`);
|
||||
setTimeout(() => fork(), 0);
|
||||
});
|
||||
const startMaster = () => {
|
||||
log.info(`Starting streaming API server master with ${numWorkers} workers`);
|
||||
};
|
||||
|
||||
for (let i = 0; i < core; i++) fork();
|
||||
const startWorker = (workerId) => {
|
||||
log.info(`Starting worker ${workerId}`);
|
||||
|
||||
log.info(`Starting streaming API server master with ${core} workers`);
|
||||
} else {
|
||||
// Cluster worker
|
||||
const pgConfigs = {
|
||||
development: {
|
||||
database: 'mastodon_development',
|
||||
|
@ -130,6 +123,7 @@ if (cluster.isMaster) {
|
|||
if (!callbacks) {
|
||||
return;
|
||||
}
|
||||
|
||||
callbacks.forEach(callback => callback(message));
|
||||
});
|
||||
|
||||
|
@ -215,9 +209,9 @@ if (cluster.isMaster) {
|
|||
};
|
||||
|
||||
const errorMiddleware = (err, req, res, next) => {
|
||||
log.error(req.requestId, err);
|
||||
log.error(req.requestId, err.toString());
|
||||
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
|
||||
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }));
|
||||
res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
|
||||
};
|
||||
|
||||
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
|
||||
|
@ -249,8 +243,9 @@ if (cluster.isMaster) {
|
|||
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []);
|
||||
const accountDomain = unpackedPayload.account.acct.split('@')[1];
|
||||
|
||||
if (req.filteredLanguages.indexOf(unpackedPayload.language) !== -1) {
|
||||
if (Array.isArray(req.filteredLanguages) && req.filteredLanguages.includes(unpackedPayload.language)) {
|
||||
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
|
||||
done();
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -271,6 +266,7 @@ if (cluster.isMaster) {
|
|||
|
||||
transmit();
|
||||
}).catch(err => {
|
||||
done();
|
||||
log.error(err);
|
||||
});
|
||||
});
|
||||
|
@ -309,19 +305,7 @@ if (cluster.isMaster) {
|
|||
};
|
||||
|
||||
// Setup stream output to WebSockets
|
||||
const streamToWs = (req, ws) => {
|
||||
const heartbeat = setInterval(() => {
|
||||
// TODO: Can't add multiple listeners, due to the limitation of uws.
|
||||
if (ws.readyState !== ws.OPEN) {
|
||||
log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
|
||||
clearInterval(heartbeat);
|
||||
return;
|
||||
}
|
||||
|
||||
ws.ping();
|
||||
}, 15000);
|
||||
|
||||
return (event, payload) => {
|
||||
const streamToWs = (req, ws) => (event, payload) => {
|
||||
if (ws.readyState !== ws.OPEN) {
|
||||
log.error(req.requestId, 'Tried writing to closed socket');
|
||||
return;
|
||||
|
@ -329,7 +313,6 @@ if (cluster.isMaster) {
|
|||
|
||||
ws.send(JSON.stringify({ event, payload }));
|
||||
};
|
||||
};
|
||||
|
||||
// Setup stream end for WebSockets
|
||||
const streamWsEnd = ws => (id, listener) => {
|
||||
|
@ -372,6 +355,12 @@ if (cluster.isMaster) {
|
|||
const token = location.query.access_token;
|
||||
const req = { requestId: uuid.v4() };
|
||||
|
||||
ws.isAlive = true;
|
||||
|
||||
ws.on('pong', () => {
|
||||
ws.isAlive = true;
|
||||
});
|
||||
|
||||
accountFromToken(token, req, err => {
|
||||
if (err) {
|
||||
log.error(req.requestId, err);
|
||||
|
@ -401,16 +390,40 @@ if (cluster.isMaster) {
|
|||
});
|
||||
});
|
||||
|
||||
const wsInterval = setInterval(() => {
|
||||
wss.clients.forEach(ws => {
|
||||
if (ws.isAlive === false) {
|
||||
ws.terminate();
|
||||
return;
|
||||
}
|
||||
|
||||
ws.isAlive = false;
|
||||
ws.ping('', false, true);
|
||||
});
|
||||
}, 30000);
|
||||
|
||||
server.listen(process.env.PORT || 4000, () => {
|
||||
log.level = process.env.LOG_LEVEL || 'verbose';
|
||||
log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`);
|
||||
log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
|
||||
});
|
||||
|
||||
process.on('SIGINT', exit);
|
||||
process.on('SIGTERM', exit);
|
||||
process.on('exit', exit);
|
||||
|
||||
function exit() {
|
||||
const onExit = () => {
|
||||
log.info(`Worker ${workerId} exiting, bye bye`);
|
||||
server.close();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
const onError = (err) => {
|
||||
log.error(err);
|
||||
};
|
||||
|
||||
process.on('SIGINT', onExit);
|
||||
process.on('SIGTERM', onExit);
|
||||
process.on('exit', onExit);
|
||||
process.on('error', onError);
|
||||
};
|
||||
|
||||
throng({
|
||||
workers: numWorkers,
|
||||
lifetime: Infinity,
|
||||
start: startWorker,
|
||||
master: startMaster,
|
||||
});
|
||||
|
|
|
@ -6498,6 +6498,12 @@ text-table@~0.2.0:
|
|||
version "0.2.0"
|
||||
resolved "https://registry.yarnpkg.com/text-table/-/text-table-0.2.0.tgz#7f5ee823ae805207c00af2df4a84ec3fcfa570b4"
|
||||
|
||||
throng@^4.0.0:
|
||||
version "4.0.0"
|
||||
resolved "https://registry.yarnpkg.com/throng/-/throng-4.0.0.tgz#983c6ba1993b58eae859998aa687ffe88df84c17"
|
||||
dependencies:
|
||||
lodash.defaults "^4.0.1"
|
||||
|
||||
through@2, through@^2.3.6:
|
||||
version "2.3.8"
|
||||
resolved "https://registry.yarnpkg.com/through/-/through-2.3.8.tgz#0dd4c9ffaabc357960b1b724115d7e0e86a2e1f5"
|
||||
|
|
Loading…
Reference in a new issue