Improve streaming server with cluster (#1970)
This commit is contained in:
		
							parent
							
								
									7a5086729a
								
							
						
					
					
						commit
						64e1d51025
					
				
					 2 changed files with 272 additions and 248 deletions
				
			
		|  | @ -81,3 +81,7 @@ SMTP_FROM_ADDRESS=notifications@example.com | |||
| # Advanced settings | ||||
| # If you need to use pgBouncer, you need to disable prepared statements: | ||||
| # PREPARED_STATEMENTS=false | ||||
| 
 | ||||
| # Cluster number setting for streaming API server. | ||||
| # If you comment out following line, cluster number will be `numOfCpuCores - 1`. | ||||
| STREAMING_CLUSTER_NUM=1 | ||||
|  |  | |||
|  | @ -1,3 +1,5 @@ | |||
| import os from 'os'; | ||||
| import cluster from 'cluster'; | ||||
| import dotenv from 'dotenv' | ||||
| import express from 'express' | ||||
| import http from 'http' | ||||
|  | @ -14,7 +16,24 @@ dotenv.config({ | |||
|   path: env === 'production' ? '.env.production' : '.env' | ||||
| }) | ||||
| 
 | ||||
| const pgConfigs = { | ||||
| if (cluster.isMaster) { | ||||
|   // cluster master
 | ||||
| 
 | ||||
|   const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : os.cpus().length - 1) | ||||
|   const fork = () => { | ||||
|     const worker = cluster.fork(); | ||||
|     worker.on('exit', (code, signal) => { | ||||
|       log.error(`Worker died with exit code ${code}, signal ${signal} received.`); | ||||
|       setTimeout(() => fork(), 0); | ||||
|     }); | ||||
|   }; | ||||
|   for (let i = 0; i < core; i++) fork(); | ||||
|   log.info(`Starting streaming API server master with ${core} workers`) | ||||
| 
 | ||||
| } else { | ||||
|   // cluster worker
 | ||||
| 
 | ||||
|   const pgConfigs = { | ||||
|     development: { | ||||
|       database: 'mastodon_development', | ||||
|       host:     '/var/run/postgresql', | ||||
|  | @ -29,22 +48,22 @@ const pgConfigs = { | |||
|       port:     process.env.DB_PORT || 5432, | ||||
|       max:      10 | ||||
|     } | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const app    = express() | ||||
| const pgPool = new pg.Pool(pgConfigs[env]) | ||||
| const server = http.createServer(app) | ||||
| const wss    = new WebSocket.Server({ server }) | ||||
|   const app    = express() | ||||
|   const pgPool = new pg.Pool(pgConfigs[env]) | ||||
|   const server = http.createServer(app) | ||||
|   const wss    = new WebSocket.Server({ server }) | ||||
| 
 | ||||
| const redisClient = redis.createClient({ | ||||
|   const redisClient = redis.createClient({ | ||||
|     host:     process.env.REDIS_HOST     || '127.0.0.1', | ||||
|     port:     process.env.REDIS_PORT     || 6379, | ||||
|     password: process.env.REDIS_PASSWORD | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| const subs = {} | ||||
|   const subs = {} | ||||
| 
 | ||||
| redisClient.on('pmessage', (_, channel, message) => { | ||||
|   redisClient.on('pmessage', (_, channel, message) => { | ||||
|     const callbacks = subs[channel] | ||||
| 
 | ||||
|     log.silly(`New message on channel ${channel}`) | ||||
|  | @ -54,37 +73,37 @@ redisClient.on('pmessage', (_, channel, message) => { | |||
|     } | ||||
| 
 | ||||
|     callbacks.forEach(callback => callback(message)) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| redisClient.psubscribe('timeline:*') | ||||
|   redisClient.psubscribe('timeline:*') | ||||
| 
 | ||||
| const subscribe = (channel, callback) => { | ||||
|   const subscribe = (channel, callback) => { | ||||
|     log.silly(`Adding listener for ${channel}`) | ||||
|     subs[channel] = subs[channel] || [] | ||||
|     subs[channel].push(callback) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const unsubscribe = (channel, callback) => { | ||||
|   const unsubscribe = (channel, callback) => { | ||||
|     log.silly(`Removing listener for ${channel}`) | ||||
|     subs[channel] = subs[channel].filter(item => item !== callback) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const allowCrossDomain = (req, res, next) => { | ||||
|   const allowCrossDomain = (req, res, next) => { | ||||
|     res.header('Access-Control-Allow-Origin', '*') | ||||
|     res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control') | ||||
|     res.header('Access-Control-Allow-Methods', 'GET, OPTIONS') | ||||
| 
 | ||||
|     next() | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const setRequestId = (req, res, next) => { | ||||
|   const setRequestId = (req, res, next) => { | ||||
|     req.requestId = uuid.v4() | ||||
|     res.header('X-Request-Id', req.requestId) | ||||
| 
 | ||||
|     next() | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const accountFromToken = (token, req, next) => { | ||||
|   const accountFromToken = (token, req, next) => { | ||||
|     pgPool.connect((err, client, done) => { | ||||
|       if (err) { | ||||
|         next(err) | ||||
|  | @ -112,9 +131,9 @@ const accountFromToken = (token, req, next) => { | |||
|         next() | ||||
|       }) | ||||
|     }) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const authenticationMiddleware = (req, res, next) => { | ||||
|   const authenticationMiddleware = (req, res, next) => { | ||||
|     if (req.method === 'OPTIONS') { | ||||
|       next() | ||||
|       return | ||||
|  | @ -133,17 +152,17 @@ const authenticationMiddleware = (req, res, next) => { | |||
|     const token = authorization.replace(/^Bearer /, '') | ||||
| 
 | ||||
|     accountFromToken(token, req, next) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const errorMiddleware = (err, req, res, next) => { | ||||
|   const errorMiddleware = (err, req, res, next) => { | ||||
|     log.error(req.requestId, err) | ||||
|     res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' }) | ||||
|     res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' })) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); | ||||
|   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); | ||||
| 
 | ||||
| const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { | ||||
|   const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { | ||||
|     log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`) | ||||
| 
 | ||||
|     const listener = message => { | ||||
|  | @ -191,10 +210,10 @@ const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) | |||
| 
 | ||||
|     subscribe(id, listener) | ||||
|     attachCloseHandler(id, listener) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| // Setup stream output to HTTP
 | ||||
| const streamToHttp = (req, res) => { | ||||
|   // Setup stream output to HTTP
 | ||||
|   const streamToHttp = (req, res) => { | ||||
|     res.setHeader('Content-Type', 'text/event-stream') | ||||
|     res.setHeader('Transfer-Encoding', 'chunked') | ||||
| 
 | ||||
|  | @ -209,17 +228,17 @@ const streamToHttp = (req, res) => { | |||
|       res.write(`event: ${event}\n`) | ||||
|       res.write(`data: ${payload}\n\n`) | ||||
|     } | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| // Setup stream end for HTTP
 | ||||
| const streamHttpEnd = req => (id, listener) => { | ||||
|   // Setup stream end for HTTP
 | ||||
|   const streamHttpEnd = req => (id, listener) => { | ||||
|     req.on('close', () => { | ||||
|       unsubscribe(id, listener) | ||||
|     }) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| // Setup stream output to WebSockets
 | ||||
| const streamToWs = (req, ws) => { | ||||
|   // Setup stream output to WebSockets
 | ||||
|   const streamToWs = (req, ws) => { | ||||
|     const heartbeat = setInterval(() => ws.ping(), 15000) | ||||
| 
 | ||||
|     ws.on('close', () => { | ||||
|  | @ -235,10 +254,10 @@ const streamToWs = (req, ws) => { | |||
| 
 | ||||
|       ws.send(JSON.stringify({ event, payload })) | ||||
|     } | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| // Setup stream end for WebSockets
 | ||||
| const streamWsEnd = ws => (id, listener) => { | ||||
|   // Setup stream end for WebSockets
 | ||||
|   const streamWsEnd = ws => (id, listener) => { | ||||
|     ws.on('close', () => { | ||||
|       unsubscribe(id, listener) | ||||
|     }) | ||||
|  | @ -246,34 +265,34 @@ const streamWsEnd = ws => (id, listener) => { | |||
|     ws.on('error', e => { | ||||
|       unsubscribe(id, listener) | ||||
|     }) | ||||
| } | ||||
|   } | ||||
| 
 | ||||
| app.use(setRequestId) | ||||
| app.use(allowCrossDomain) | ||||
| app.use(authenticationMiddleware) | ||||
| app.use(errorMiddleware) | ||||
|   app.use(setRequestId) | ||||
|   app.use(allowCrossDomain) | ||||
|   app.use(authenticationMiddleware) | ||||
|   app.use(errorMiddleware) | ||||
| 
 | ||||
| app.get('/api/v1/streaming/user', (req, res) => { | ||||
|   app.get('/api/v1/streaming/user', (req, res) => { | ||||
|     streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| app.get('/api/v1/streaming/public', (req, res) => { | ||||
|   app.get('/api/v1/streaming/public', (req, res) => { | ||||
|     streamFrom('timeline:public', req, streamToHttp(req, res), streamHttpEnd(req), true) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| app.get('/api/v1/streaming/public/local', (req, res) => { | ||||
|   app.get('/api/v1/streaming/public/local', (req, res) => { | ||||
|     streamFrom('timeline:public:local', req, streamToHttp(req, res), streamHttpEnd(req), true) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| app.get('/api/v1/streaming/hashtag', (req, res) => { | ||||
|   app.get('/api/v1/streaming/hashtag', (req, res) => { | ||||
|     streamFrom(`timeline:hashtag:${req.params.tag}`, req, streamToHttp(req, res), streamHttpEnd(req), true) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| app.get('/api/v1/streaming/hashtag/local', (req, res) => { | ||||
|   app.get('/api/v1/streaming/hashtag/local', (req, res) => { | ||||
|     streamFrom(`timeline:hashtag:${req.params.tag}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| wss.on('connection', ws => { | ||||
|   wss.on('connection', ws => { | ||||
|     const location = url.parse(ws.upgradeReq.url, true) | ||||
|     const token    = location.query.access_token | ||||
|     const req      = { requestId: uuid.v4() } | ||||
|  | @ -305,9 +324,10 @@ wss.on('connection', ws => { | |||
|         ws.close() | ||||
|       } | ||||
|     }) | ||||
| }) | ||||
|   }) | ||||
| 
 | ||||
| server.listen(process.env.PORT || 4000, () => { | ||||
|   server.listen(process.env.PORT || 4000, () => { | ||||
|     log.level = process.env.LOG_LEVEL || 'verbose' | ||||
|   log.info(`Starting streaming API server on port ${server.address().port}`) | ||||
| }) | ||||
|     log.info(`Starting streaming API server worker on port ${server.address().port}`) | ||||
|   }) | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue