Execute PushUpdateWorker only for accounts who uses StreamingAPI just now. (#3278)
* Add redis key "subscribed:timeline:#{account.id}" to indicate active streaming API listeners exists.
* Add endpoint for notification only stream.
* Run PushUpdateWorker only for users uses Streaming API now.
* Move close hander streamTo(Http/Ws) -> stream(Http/Ws)End (Deal with #3370)
* Add stream type for stream start log message.
			
			
This commit is contained in:
		
							parent
							
								
									62fc0af2e4
								
							
						
					
					
						commit
						7e95d45c8d
					
				
					 2 changed files with 49 additions and 9 deletions
				
			
		|  | @ -34,7 +34,7 @@ class FeedManager | ||||||
|       trim(timeline_type, account.id) |       trim(timeline_type, account.id) | ||||||
|     end |     end | ||||||
| 
 | 
 | ||||||
|     PushUpdateWorker.perform_async(account.id, status.id) |     PushUpdateWorker.perform_async(account.id, status.id) if push_update_required?(timeline_type, account.id) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def trim(type, account_id) |   def trim(type, account_id) | ||||||
|  | @ -43,6 +43,10 @@ class FeedManager | ||||||
|     redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}") |     redis.zremrangebyscore(key(type, account_id), '-inf', "(#{last.last}") | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|  |   def push_update_required?(timeline_type, account_id) | ||||||
|  |     timeline_type != :home || redis.get("subscribed:timeline:#{account_id}").present? | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|   def merge_into_timeline(from_account, into_account) |   def merge_into_timeline(from_account, into_account) | ||||||
|     timeline_key = key(:home, into_account.id) |     timeline_key = key(:home, into_account.id) | ||||||
|     query        = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4) |     query        = from_account.statuses.limit(FeedManager::MAX_ITEMS / 4) | ||||||
|  |  | ||||||
|  | @ -110,11 +110,12 @@ const startWorker = (workerId) => { | ||||||
| 
 | 
 | ||||||
|   const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; |   const redisPrefix = redisNamespace ? `${redisNamespace}:` : ''; | ||||||
| 
 | 
 | ||||||
|  |   const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL); | ||||||
|   const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); |   const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL); | ||||||
| 
 | 
 | ||||||
|   const subs = {}; |   const subs = {}; | ||||||
| 
 | 
 | ||||||
|   redisClient.on('pmessage', (_, channel, message) => { |   redisSubscribeClient.on('pmessage', (_, channel, message) => { | ||||||
|     const callbacks = subs[channel]; |     const callbacks = subs[channel]; | ||||||
| 
 | 
 | ||||||
|     log.silly(`New message on channel ${channel}`); |     log.silly(`New message on channel ${channel}`); | ||||||
|  | @ -126,7 +127,19 @@ const startWorker = (workerId) => { | ||||||
|     callbacks.forEach(callback => callback(message)); |     callbacks.forEach(callback => callback(message)); | ||||||
|   }); |   }); | ||||||
| 
 | 
 | ||||||
|   redisClient.psubscribe(`${redisPrefix}timeline:*`); |   redisSubscribeClient.psubscribe(`${redisPrefix}timeline:*`); | ||||||
|  | 
 | ||||||
|  |   const subscriptionHeartbeat = (channel) => { | ||||||
|  |     const interval = 6*60; | ||||||
|  |     const tellSubscribed = () => { | ||||||
|  |       redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3); | ||||||
|  |     }; | ||||||
|  |     tellSubscribed(); | ||||||
|  |     const heartbeat = setInterval(tellSubscribed, interval*1000); | ||||||
|  |     return () => { | ||||||
|  |       clearInterval(heartbeat); | ||||||
|  |     }; | ||||||
|  |   }; | ||||||
| 
 | 
 | ||||||
|   const subscribe = (channel, callback) => { |   const subscribe = (channel, callback) => { | ||||||
|     log.silly(`Adding listener for ${channel}`); |     log.silly(`Adding listener for ${channel}`); | ||||||
|  | @ -231,8 +244,9 @@ const startWorker = (workerId) => { | ||||||
| 
 | 
 | ||||||
|   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); |   const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', '); | ||||||
| 
 | 
 | ||||||
|   const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false) => { |   const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { | ||||||
|     log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}`); |     const streamType = notificationOnly ? ' (notification)' : ''; | ||||||
|  |     log.verbose(req.requestId, `Starting stream from ${id} for ${req.accountId}${streamType}`); | ||||||
| 
 | 
 | ||||||
|     const listener = message => { |     const listener = message => { | ||||||
|       const { event, payload, queued_at } = JSON.parse(message); |       const { event, payload, queued_at } = JSON.parse(message); | ||||||
|  | @ -245,6 +259,10 @@ const startWorker = (workerId) => { | ||||||
|         output(event, payload); |         output(event, payload); | ||||||
|       }; |       }; | ||||||
| 
 | 
 | ||||||
|  |       if (notificationOnly && event !== 'notification') { | ||||||
|  |         return; | ||||||
|  |       } | ||||||
|  | 
 | ||||||
|       // Only messages that may require filtering are statuses, since notifications
 |       // Only messages that may require filtering are statuses, since notifications
 | ||||||
|       // are already personalized and deletes do not matter
 |       // are already personalized and deletes do not matter
 | ||||||
|       if (needsFiltering && event === 'update') { |       if (needsFiltering && event === 'update') { | ||||||
|  | @ -313,9 +331,12 @@ const startWorker = (workerId) => { | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|   // Setup stream end for HTTP
 |   // Setup stream end for HTTP
 | ||||||
|   const streamHttpEnd = req => (id, listener) => { |   const streamHttpEnd = (req, closeHandler = false) => (id, listener) => { | ||||||
|     req.on('close', () => { |     req.on('close', () => { | ||||||
|       unsubscribe(id, listener); |       unsubscribe(id, listener); | ||||||
|  |       if (closeHandler) { | ||||||
|  |         closeHandler(); | ||||||
|  |       } | ||||||
|     }); |     }); | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|  | @ -330,15 +351,21 @@ const startWorker = (workerId) => { | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|   // Setup stream end for WebSockets
 |   // Setup stream end for WebSockets
 | ||||||
|   const streamWsEnd = (req, ws) => (id, listener) => { |   const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => { | ||||||
|     ws.on('close', () => { |     ws.on('close', () => { | ||||||
|       log.verbose(req.requestId, `Ending stream for ${req.accountId}`); |       log.verbose(req.requestId, `Ending stream for ${req.accountId}`); | ||||||
|       unsubscribe(id, listener); |       unsubscribe(id, listener); | ||||||
|  |       if (closeHandler) { | ||||||
|  |         closeHandler(); | ||||||
|  |       } | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|     ws.on('error', e => { |     ws.on('error', e => { | ||||||
|       log.verbose(req.requestId, `Ending stream for ${req.accountId}`); |       log.verbose(req.requestId, `Ending stream for ${req.accountId}`); | ||||||
|       unsubscribe(id, listener); |       unsubscribe(id, listener); | ||||||
|  |       if (closeHandler) { | ||||||
|  |         closeHandler(); | ||||||
|  |       } | ||||||
|     }); |     }); | ||||||
|   }; |   }; | ||||||
| 
 | 
 | ||||||
|  | @ -348,7 +375,12 @@ const startWorker = (workerId) => { | ||||||
|   app.use(errorMiddleware); |   app.use(errorMiddleware); | ||||||
| 
 | 
 | ||||||
|   app.get('/api/v1/streaming/user', (req, res) => { |   app.get('/api/v1/streaming/user', (req, res) => { | ||||||
|     streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req)); |     const channel = `timeline:${req.accountId}`; | ||||||
|  |     streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel))); | ||||||
|  |   }); | ||||||
|  | 
 | ||||||
|  |   app.get('/api/v1/streaming/user/notification', (req, res) => { | ||||||
|  |     streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true); | ||||||
|   }); |   }); | ||||||
| 
 | 
 | ||||||
|   app.get('/api/v1/streaming/public', (req, res) => { |   app.get('/api/v1/streaming/public', (req, res) => { | ||||||
|  | @ -382,7 +414,11 @@ const startWorker = (workerId) => { | ||||||
| 
 | 
 | ||||||
|     switch(location.query.stream) { |     switch(location.query.stream) { | ||||||
|     case 'user': |     case 'user': | ||||||
|       streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws)); |       const channel = `timeline:${req.accountId}`; | ||||||
|  |       streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); | ||||||
|  |       break; | ||||||
|  |     case 'user:notification': | ||||||
|  |       streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true); | ||||||
|       break; |       break; | ||||||
|     case 'public': |     case 'public': | ||||||
|       streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true); |       streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in a new issue