Change Web Push API deliveries to use request pooling (#16014)
This commit is contained in:
		
							parent
							
								
									70bea95155
								
							
						
					
					
						commit
						a8bf951926
					
				
					 5 changed files with 150 additions and 58 deletions
				
			
		
							
								
								
									
										2
									
								
								Gemfile
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								Gemfile
									
									
									
									
									
								
							|  | @ -94,7 +94,7 @@ gem 'tty-prompt', '~> 0.23', require: false | ||||||
| gem 'twitter-text', '~> 3.1.0' | gem 'twitter-text', '~> 3.1.0' | ||||||
| gem 'tzinfo-data', '~> 1.2021' | gem 'tzinfo-data', '~> 1.2021' | ||||||
| gem 'webpacker', '~> 5.2' | gem 'webpacker', '~> 5.2' | ||||||
| gem 'webpush' | gem 'webpush', '~> 0.3' | ||||||
| gem 'webauthn', '~> 3.0.0.alpha1' | gem 'webauthn', '~> 3.0.0.alpha1' | ||||||
| 
 | 
 | ||||||
| gem 'json-ld' | gem 'json-ld' | ||||||
|  |  | ||||||
|  | @ -808,5 +808,5 @@ DEPENDENCIES | ||||||
|   webauthn (~> 3.0.0.alpha1) |   webauthn (~> 3.0.0.alpha1) | ||||||
|   webmock (~> 3.12) |   webmock (~> 3.12) | ||||||
|   webpacker (~> 5.2) |   webpacker (~> 5.2) | ||||||
|   webpush |   webpush (~> 0.3) | ||||||
|   xorcist (~> 1.1) |   xorcist (~> 1.1) | ||||||
|  |  | ||||||
|  | @ -24,81 +24,80 @@ class Web::PushSubscription < ApplicationRecord | ||||||
|   validates :key_p256dh, presence: true |   validates :key_p256dh, presence: true | ||||||
|   validates :key_auth, presence: true |   validates :key_auth, presence: true | ||||||
| 
 | 
 | ||||||
|   def push(notification) |   delegate :locale, to: :associated_user | ||||||
|     I18n.with_locale(associated_user&.locale || I18n.default_locale) do | 
 | ||||||
|       push_payload(payload_for_notification(notification), 48.hours.seconds) |   def encrypt(payload) | ||||||
|  |     Webpush::Encryption.encrypt(payload, key_p256dh, key_auth) | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   def audience | ||||||
|  |     @audience ||= Addressable::URI.parse(endpoint).normalized_site | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def crypto_key_header | ||||||
|  |     p256ecdsa = vapid_key.public_key_for_push_header | ||||||
|  | 
 | ||||||
|  |     "p256ecdsa=#{p256ecdsa}" | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def authorization_header | ||||||
|  |     jwt = JWT.encode({ aud: audience, exp: 24.hours.from_now.to_i, sub: "mailto:#{contact_email}" }, vapid_key.curve, 'ES256', typ: 'JWT') | ||||||
|  | 
 | ||||||
|  |     "WebPush #{jwt}" | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def pushable?(notification) |   def pushable?(notification) | ||||||
|     data&.key?('alerts') && ActiveModel::Type::Boolean.new.cast(data['alerts'][notification.type.to_s]) |     ActiveModel::Type::Boolean.new.cast(data&.dig('alerts', notification.type.to_s)) | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   def associated_user |   def associated_user | ||||||
|     return @associated_user if defined?(@associated_user) |     return @associated_user if defined?(@associated_user) | ||||||
| 
 | 
 | ||||||
|     @associated_user = if user_id.nil? |     @associated_user = begin | ||||||
|  |       if user_id.nil? | ||||||
|         session_activation.user |         session_activation.user | ||||||
|       else |       else | ||||||
|         user |         user | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
|  |   end | ||||||
| 
 | 
 | ||||||
|   def associated_access_token |   def associated_access_token | ||||||
|     return @associated_access_token if defined?(@associated_access_token) |     return @associated_access_token if defined?(@associated_access_token) | ||||||
| 
 | 
 | ||||||
|     @associated_access_token = if access_token_id.nil? |     @associated_access_token = begin | ||||||
|  |       if access_token_id.nil? | ||||||
|         find_or_create_access_token.token |         find_or_create_access_token.token | ||||||
|       else |       else | ||||||
|         access_token.token |         access_token.token | ||||||
|       end |       end | ||||||
|     end |     end | ||||||
|  |   end | ||||||
| 
 | 
 | ||||||
|   class << self |   class << self | ||||||
|     def unsubscribe_for(application_id, resource_owner) |     def unsubscribe_for(application_id, resource_owner) | ||||||
|       access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil) |       access_token_ids = Doorkeeper::AccessToken.where(application_id: application_id, resource_owner_id: resource_owner.id, revoked_at: nil).pluck(:id) | ||||||
|                                                 .pluck(:id) |  | ||||||
| 
 |  | ||||||
|       where(access_token_id: access_token_ids).delete_all |       where(access_token_id: access_token_ids).delete_all | ||||||
|     end |     end | ||||||
|   end |   end | ||||||
| 
 | 
 | ||||||
|   private |   private | ||||||
| 
 | 
 | ||||||
|   def push_payload(message, ttl = 5.minutes.seconds) |  | ||||||
|     Webpush.payload_send( |  | ||||||
|       message: Oj.dump(message), |  | ||||||
|       endpoint: endpoint, |  | ||||||
|       p256dh: key_p256dh, |  | ||||||
|       auth: key_auth, |  | ||||||
|       ttl: ttl, |  | ||||||
|       ssl_timeout: 10, |  | ||||||
|       open_timeout: 10, |  | ||||||
|       read_timeout: 10, |  | ||||||
|       vapid: { |  | ||||||
|         subject: "mailto:#{::Setting.site_contact_email}", |  | ||||||
|         private_key: Rails.configuration.x.vapid_private_key, |  | ||||||
|         public_key: Rails.configuration.x.vapid_public_key, |  | ||||||
|       } |  | ||||||
|     ) |  | ||||||
|   end |  | ||||||
| 
 |  | ||||||
|   def payload_for_notification(notification) |  | ||||||
|     ActiveModelSerializers::SerializableResource.new( |  | ||||||
|       notification, |  | ||||||
|       serializer: Web::NotificationSerializer, |  | ||||||
|       scope: self, |  | ||||||
|       scope_name: :current_push_subscription |  | ||||||
|     ).as_json |  | ||||||
|   end |  | ||||||
| 
 |  | ||||||
|   def find_or_create_access_token |   def find_or_create_access_token | ||||||
|     Doorkeeper::AccessToken.find_or_create_for( |     Doorkeeper::AccessToken.find_or_create_for( | ||||||
|       application: Doorkeeper::Application.find_by(superapp: true), |       application: Doorkeeper::Application.find_by(superapp: true), | ||||||
|       resource_owner: session_activation.user_id, |       resource_owner: user_id || session_activation.user_id, | ||||||
|       scopes: Doorkeeper::OAuth::Scopes.from_string('read write follow push'), |       scopes: Doorkeeper::OAuth::Scopes.from_string('read write follow push'), | ||||||
|       expires_in: Doorkeeper.configuration.access_token_expires_in, |       expires_in: Doorkeeper.configuration.access_token_expires_in, | ||||||
|       use_refresh_token: Doorkeeper.configuration.refresh_token_enabled? |       use_refresh_token: Doorkeeper.configuration.refresh_token_enabled? | ||||||
|     ) |     ) | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   def vapid_key | ||||||
|  |     @vapid_key ||= Webpush::VapidKey.from_keys(Rails.configuration.x.vapid_public_key, Rails.configuration.x.vapid_private_key) | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def contact_email | ||||||
|  |     @contact_email ||= ::Setting.site_contact_email | ||||||
|  |   end | ||||||
| end | end | ||||||
|  |  | ||||||
|  | @ -3,22 +3,67 @@ | ||||||
| class Web::PushNotificationWorker | class Web::PushNotificationWorker | ||||||
|   include Sidekiq::Worker |   include Sidekiq::Worker | ||||||
| 
 | 
 | ||||||
|   sidekiq_options backtrace: true, retry: 5 |   sidekiq_options queue: 'push', retry: 5 | ||||||
|  | 
 | ||||||
|  |   TTL     = 48.hours.to_s | ||||||
|  |   URGENCY = 'normal' | ||||||
| 
 | 
 | ||||||
|   def perform(subscription_id, notification_id) |   def perform(subscription_id, notification_id) | ||||||
|     subscription = ::Web::PushSubscription.find(subscription_id) |     @subscription = Web::PushSubscription.find(subscription_id) | ||||||
|     notification = Notification.find(notification_id) |     @notification = Notification.find(notification_id) | ||||||
| 
 | 
 | ||||||
|     subscription.push(notification) unless notification.activity.nil? |     # Polymorphically associated activity could have been deleted | ||||||
|   rescue Webpush::ResponseError => e |     # in the meantime, so we have to double-check before proceeding | ||||||
|     code = e.response.code.to_i |     return unless @notification.activity.present? && @subscription.pushable?(@notification) | ||||||
| 
 | 
 | ||||||
|     if (400..499).cover?(code) && ![408, 429].include?(code) |     payload = @subscription.encrypt(push_notification_json) | ||||||
|       subscription.destroy! | 
 | ||||||
|     else |     request_pool.with(@subscription.audience) do |http_client| | ||||||
|       raise e |       request = Request.new(:post, @subscription.endpoint, body: payload.fetch(:ciphertext), http_client: http_client) | ||||||
|  | 
 | ||||||
|  |       request.add_headers( | ||||||
|  |         'Content-Type'     => 'application/octet-stream', | ||||||
|  |         'Ttl'              => TTL, | ||||||
|  |         'Urgency'          => URGENCY, | ||||||
|  |         'Content-Encoding' => 'aesgcm', | ||||||
|  |         'Encryption'       => "salt=#{Webpush.encode64(payload.fetch(:salt)).delete('=')}", | ||||||
|  |         'Crypto-Key'       => "dh=#{Webpush.encode64(payload.fetch(:server_public_key)).delete('=')};#{@subscription.crypto_key_header}", | ||||||
|  |         'Authorization'    => @subscription.authorization_header | ||||||
|  |       ) | ||||||
|  | 
 | ||||||
|  |       request.perform do |response| | ||||||
|  |         # If the server responds with an error in the 4xx range | ||||||
|  |         # that isn't about rate-limiting or timeouts, we can | ||||||
|  |         # assume that the subscription is invalid or expired | ||||||
|  |         # and must be removed | ||||||
|  | 
 | ||||||
|  |         if (400..499).cover?(response.code) && ![408, 429].include?(response.code) | ||||||
|  |           @subscription.destroy! | ||||||
|  |         elsif !(200...300).cover?(response.code) | ||||||
|  |           raise Mastodon::UnexpectedResponseError, response | ||||||
|  |         end | ||||||
|  |       end | ||||||
|     end |     end | ||||||
|   rescue ActiveRecord::RecordNotFound |   rescue ActiveRecord::RecordNotFound | ||||||
|     true |     true | ||||||
|   end |   end | ||||||
|  | 
 | ||||||
|  |   private | ||||||
|  | 
 | ||||||
|  |   def push_notification_json | ||||||
|  |     json = I18n.with_locale(@subscription.locale || I18n.default_locale) do | ||||||
|  |       ActiveModelSerializers::SerializableResource.new( | ||||||
|  |         @notification, | ||||||
|  |         serializer: Web::NotificationSerializer, | ||||||
|  |         scope: @subscription, | ||||||
|  |         scope_name: :current_push_subscription | ||||||
|  |       ).as_json | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     Oj.dump(json) | ||||||
|  |   end | ||||||
|  | 
 | ||||||
|  |   def request_pool | ||||||
|  |     RequestPool.current | ||||||
|  |   end | ||||||
| end | end | ||||||
|  |  | ||||||
							
								
								
									
										48
									
								
								spec/workers/web/push_notification_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										48
									
								
								spec/workers/web/push_notification_worker_spec.rb
									
									
									
									
									
										Normal file
									
								
							|  | @ -0,0 +1,48 @@ | ||||||
|  | # frozen_string_literal: true | ||||||
|  | 
 | ||||||
|  | require 'rails_helper' | ||||||
|  | 
 | ||||||
|  | describe Web::PushNotificationWorker do | ||||||
|  |   subject { described_class.new } | ||||||
|  | 
 | ||||||
|  |   let(:p256dh) { 'BN4GvZtEZiZuqFxSKVZfSfluwKBD7UxHNBmWkfiZfCtgDE8Bwh-_MtLXbBxTBAWH9r7IPKL0lhdcaqtL1dfxU5E=' } | ||||||
|  |   let(:auth) { 'Q2BoAjC09xH3ywDLNJr-dA==' } | ||||||
|  |   let(:endpoint) { 'https://updates.push.services.mozilla.com/push/v1/subscription-id' } | ||||||
|  |   let(:user) { Fabricate(:user) } | ||||||
|  |   let(:notification) { Fabricate(:notification) } | ||||||
|  |   let(:subscription) { Fabricate(:web_push_subscription, user_id: user.id, key_p256dh: p256dh, key_auth: auth, endpoint: endpoint, data: { alerts: { notification.type => true } }) } | ||||||
|  |   let(:vapid_public_key) { 'BB37UCyc8LLX4PNQSe-04vSFvpUWGrENubUaslVFM_l5TxcGVMY0C3RXPeUJAQHKYlcOM2P4vTYmkoo0VZGZTM4=' } | ||||||
|  |   let(:vapid_private_key) { 'OPrw1Sum3gRoL4-DXfSCC266r-qfFSRZrnj8MgIhRHg=' } | ||||||
|  |   let(:vapid_key) { Webpush::VapidKey.from_keys(vapid_public_key, vapid_private_key) } | ||||||
|  |   let(:contact_email) { 'sender@example.com' } | ||||||
|  |   let(:ciphertext) { "+\xB8\xDBT}\x13\xB6\xDD.\xF9\xB0\xA7\xC8\xD2\x80\xFD\x99#\xF7\xAC\x83\xA4\xDB,\x1F\xB5\xB9w\x85>\xF7\xADr" } | ||||||
|  |   let(:salt) { "X\x97\x953\xE4X\xF8_w\xE7T\x95\xC51q\xFE" } | ||||||
|  |   let(:server_public_key) { "\x04\b-RK9w\xDD$\x16lFz\xF9=\xB4~\xC6\x12k\xF3\xF40t\xA9\xC1\fR\xC3\x81\x80\xAC\f\x7F\xE4\xCC\x8E\xC2\x88 n\x8BB\xF1\x9C\x14\a\xFA\x8D\xC9\x80\xA1\xDDyU\\&c\x01\x88#\x118Ua" } | ||||||
|  |   let(:shared_secret) { "\t\xA7&\x85\t\xC5m\b\xA8\xA7\xF8B{1\xADk\xE1y'm\xEDE\xEC\xDD\xEDj\xB3$s\xA9\xDA\xF0" } | ||||||
|  |   let(:payload) { { ciphertext: ciphertext, salt: salt, server_public_key: server_public_key, shared_secret: shared_secret } } | ||||||
|  | 
 | ||||||
|  |   describe 'perform' do | ||||||
|  |     before do | ||||||
|  |       allow_any_instance_of(subscription.class).to receive(:contact_email).and_return(contact_email) | ||||||
|  |       allow_any_instance_of(subscription.class).to receive(:vapid_key).and_return(vapid_key) | ||||||
|  |       allow(Webpush::Encryption).to receive(:encrypt).and_return(payload) | ||||||
|  |       allow(JWT).to receive(:encode).and_return('jwt.encoded.payload') | ||||||
|  | 
 | ||||||
|  |       stub_request(:post, endpoint).to_return(status: 201, body: '') | ||||||
|  | 
 | ||||||
|  |       subject.perform(subscription.id, notification.id) | ||||||
|  |     end | ||||||
|  | 
 | ||||||
|  |     it 'calls the relevant service with the correct headers' do | ||||||
|  |       expect(a_request(:post, endpoint).with(headers: { | ||||||
|  |         'Content-Encoding' => 'aesgcm', | ||||||
|  |         'Content-Type' => 'application/octet-stream', | ||||||
|  |         'Crypto-Key' => 'dh=BAgtUks5d90kFmxGevk9tH7GEmvz9DB0qcEMUsOBgKwMf-TMjsKIIG6LQvGcFAf6jcmAod15VVwmYwGIIxE4VWE;p256ecdsa=' + vapid_public_key.delete('='), | ||||||
|  |         'Encryption' => 'salt=WJeVM-RY-F9351SVxTFx_g', | ||||||
|  |         'Ttl' => '172800', | ||||||
|  |         'Urgency' => 'normal', | ||||||
|  |         'Authorization' => 'WebPush jwt.encoded.payload', | ||||||
|  |       }, body: "+\xB8\xDBT}\u0013\xB6\xDD.\xF9\xB0\xA7\xC8Ҁ\xFD\x99#\xF7\xAC\x83\xA4\xDB,\u001F\xB5\xB9w\x85>\xF7\xADr")).to have_been_made | ||||||
|  |     end | ||||||
|  |   end | ||||||
|  | end | ||||||
		Loading…
	
		Reference in a new issue