Refactor notifications to go through a separate stream in streaming API (#16765)
Eliminate need to have custom notifications filtering logic in the streaming API code by publishing notifications into a separate stream and then simply using the multi-stream capability to subscribe to that stream when necessary
This commit is contained in:
		
							parent
							
								
									ce84967ee4
								
							
						
					
					
						commit
						4752c0a8d3
					
				
					 2 changed files with 46 additions and 31 deletions
				
			
		|  | @ -127,7 +127,7 @@ class NotifyService < BaseService | |||
|   def push_notification! | ||||
|     return if @notification.activity.nil? | ||||
| 
 | ||||
|     Redis.current.publish("timeline:#{@recipient.id}", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification))) | ||||
|     Redis.current.publish("timeline:#{@recipient.id}:notifications", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification))) | ||||
|     send_push_notifications! | ||||
|   end | ||||
| 
 | ||||
|  |  | |||
|  | @ -282,6 +282,14 @@ const startWorker = (workerId) => { | |||
|     next(); | ||||
|   }; | ||||
| 
 | ||||
|   /** | ||||
|    * @param {any} req | ||||
|    * @param {string[]} necessaryScopes | ||||
|    * @return {boolean} | ||||
|    */ | ||||
|   const isInScope = (req, necessaryScopes) => | ||||
|     req.scopes.some(scope => necessaryScopes.includes(scope)); | ||||
| 
 | ||||
|   /** | ||||
|    * @param {string} token | ||||
|    * @param {any} req | ||||
|  | @ -314,7 +322,6 @@ const startWorker = (workerId) => { | |||
|         req.scopes = result.rows[0].scopes.split(' '); | ||||
|         req.accountId = result.rows[0].account_id; | ||||
|         req.chosenLanguages = result.rows[0].chosen_languages; | ||||
|         req.allowNotifications = req.scopes.some(scope => ['read', 'read:notifications'].includes(scope)); | ||||
|         req.deviceId = result.rows[0].device_id; | ||||
| 
 | ||||
|         resolve(); | ||||
|  | @ -580,14 +587,12 @@ const startWorker = (workerId) => { | |||
|    * @param {function(string, string): void} output | ||||
|    * @param {function(string[], function(string): void): void} attachCloseHandler | ||||
|    * @param {boolean=} needsFiltering | ||||
|    * @param {boolean=} notificationOnly | ||||
|    * @return {function(string): void} | ||||
|    */ | ||||
|   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => { | ||||
|   const streamFrom = (ids, req, output, attachCloseHandler, needsFiltering = false) => { | ||||
|     const accountId  = req.accountId || req.remoteAddress; | ||||
|     const streamType = notificationOnly ? ' (notification)' : ''; | ||||
| 
 | ||||
|     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}${streamType}`); | ||||
|     log.verbose(req.requestId, `Starting stream from ${ids.join(', ')} for ${accountId}`); | ||||
| 
 | ||||
|     const listener = message => { | ||||
|       const json = parseJSON(message); | ||||
|  | @ -605,14 +610,6 @@ const startWorker = (workerId) => { | |||
|         output(event, encodedPayload); | ||||
|       }; | ||||
| 
 | ||||
|       if (notificationOnly && event !== 'notification') { | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       if (event === 'notification' && !req.allowNotifications) { | ||||
|         return; | ||||
|       } | ||||
| 
 | ||||
|       // Only messages that may require filtering are statuses, since notifications
 | ||||
|       // are already personalized and deletes do not matter
 | ||||
|       if (!needsFiltering || event !== 'update') { | ||||
|  | @ -759,7 +756,7 @@ const startWorker = (workerId) => { | |||
|       const onSend = streamToHttp(req, res); | ||||
|       const onEnd  = streamHttpEnd(req, subscriptionHeartbeat(channelIds)); | ||||
| 
 | ||||
|       streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering, options.notificationOnly); | ||||
|       streamFrom(channelIds, req, onSend, onEnd, options.needsFiltering); | ||||
|     }).catch(err => { | ||||
|       log.verbose(req.requestId, 'Subscription error:', err.toString()); | ||||
|       httpNotFound(res); | ||||
|  | @ -775,74 +772,92 @@ const startWorker = (workerId) => { | |||
|    * @property {string} [only_media] | ||||
|    */ | ||||
| 
 | ||||
|   /** | ||||
|    * @param {any} req | ||||
|    * @return {string[]} | ||||
|    */ | ||||
|   const channelsForUserStream = req => { | ||||
|     const arr = [`timeline:${req.accountId}`]; | ||||
| 
 | ||||
|     if (isInScope(req, ['crypto']) && req.deviceId) { | ||||
|       arr.push(`timeline:${req.accountId}:${req.deviceId}`); | ||||
|     } | ||||
| 
 | ||||
|     if (isInScope(req, ['read', 'read:notifications'])) { | ||||
|       arr.push(`timeline:${req.accountId}:notifications`); | ||||
|     } | ||||
| 
 | ||||
|     return arr; | ||||
|   }; | ||||
| 
 | ||||
|   /** | ||||
|    * @param {any} req | ||||
|    * @param {string} name | ||||
|    * @param {StreamParams} params | ||||
|    * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean, notificationOnly: boolean } }>} | ||||
|    * @return {Promise.<{ channelIds: string[], options: { needsFiltering: boolean } }>} | ||||
|    */ | ||||
|   const channelNameToIds = (req, name, params) => new Promise((resolve, reject) => { | ||||
|     switch(name) { | ||||
|     case 'user': | ||||
|       resolve({ | ||||
|         channelIds: req.deviceId ? [`timeline:${req.accountId}`, `timeline:${req.accountId}:${req.deviceId}`] : [`timeline:${req.accountId}`], | ||||
|         options: { needsFiltering: false, notificationOnly: false }, | ||||
|         channelIds: channelsForUserStream(req), | ||||
|         options: { needsFiltering: false }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'user:notification': | ||||
|       resolve({ | ||||
|         channelIds: [`timeline:${req.accountId}`], | ||||
|         options: { needsFiltering: false, notificationOnly: true }, | ||||
|         channelIds: [`timeline:${req.accountId}:notifications`], | ||||
|         options: { needsFiltering: false }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public:local': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public:local'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public:remote': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public:remote'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public:media': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public:media'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public:local:media': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public:local:media'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'public:remote:media': | ||||
|       resolve({ | ||||
|         channelIds: ['timeline:public:remote:media'], | ||||
|         options: { needsFiltering: true, notificationOnly: false }, | ||||
|         options: { needsFiltering: true }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|     case 'direct': | ||||
|       resolve({ | ||||
|         channelIds: [`timeline:direct:${req.accountId}`], | ||||
|         options: { needsFiltering: false, notificationOnly: false }, | ||||
|         options: { needsFiltering: false }, | ||||
|       }); | ||||
| 
 | ||||
|       break; | ||||
|  | @ -852,7 +867,7 @@ const startWorker = (workerId) => { | |||
|       } else { | ||||
|         resolve({ | ||||
|           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}`], | ||||
|           options: { needsFiltering: true, notificationOnly: false }, | ||||
|           options: { needsFiltering: true }, | ||||
|         }); | ||||
|       } | ||||
| 
 | ||||
|  | @ -863,7 +878,7 @@ const startWorker = (workerId) => { | |||
|       } else { | ||||
|         resolve({ | ||||
|           channelIds: [`timeline:hashtag:${params.tag.toLowerCase()}:local`], | ||||
|           options: { needsFiltering: true, notificationOnly: false }, | ||||
|           options: { needsFiltering: true }, | ||||
|         }); | ||||
|       } | ||||
| 
 | ||||
|  | @ -872,7 +887,7 @@ const startWorker = (workerId) => { | |||
|       authorizeListAccess(params.list, req).then(() => { | ||||
|         resolve({ | ||||
|           channelIds: [`timeline:list:${params.list}`], | ||||
|           options: { needsFiltering: false, notificationOnly: false }, | ||||
|           options: { needsFiltering: false }, | ||||
|         }); | ||||
|       }).catch(() => { | ||||
|         reject('Not authorized to stream this list'); | ||||
|  | @ -919,7 +934,7 @@ const startWorker = (workerId) => { | |||
| 
 | ||||
|       const onSend        = streamToWs(request, socket, streamNameFromChannelName(channelName, params)); | ||||
|       const stopHeartbeat = subscriptionHeartbeat(channelIds); | ||||
|       const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering, options.notificationOnly); | ||||
|       const listener      = streamFrom(channelIds, request, onSend, undefined, options.needsFiltering); | ||||
| 
 | ||||
|       subscriptions[channelIds.join(';')] = { | ||||
|         listener, | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue