Upgrade to latest redis-rb 4.x and fix deprecations (#23616)
Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
This commit is contained in:
		
							parent
							
								
									370cff9a5d
								
							
						
					
					
						commit
						08c2938b4a
					
				
					 11 changed files with 37 additions and 38 deletions
				
			
		| 
						 | 
					@ -558,7 +558,7 @@ GEM
 | 
				
			||||||
    rdf-normalize (0.5.1)
 | 
					    rdf-normalize (0.5.1)
 | 
				
			||||||
      rdf (~> 3.2)
 | 
					      rdf (~> 3.2)
 | 
				
			||||||
    redcarpet (3.6.0)
 | 
					    redcarpet (3.6.0)
 | 
				
			||||||
    redis (4.5.1)
 | 
					    redis (4.8.1)
 | 
				
			||||||
    redis-namespace (1.10.0)
 | 
					    redis-namespace (1.10.0)
 | 
				
			||||||
      redis (>= 4)
 | 
					      redis (>= 4)
 | 
				
			||||||
    redlock (1.3.2)
 | 
					    redlock (1.3.2)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -273,27 +273,27 @@ class FeedManager
 | 
				
			||||||
  def clean_feeds!(type, ids)
 | 
					  def clean_feeds!(type, ids)
 | 
				
			||||||
    reblogged_id_sets = {}
 | 
					    reblogged_id_sets = {}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    redis.pipelined do
 | 
					    redis.pipelined do |pipeline|
 | 
				
			||||||
      ids.each do |feed_id|
 | 
					      ids.each do |feed_id|
 | 
				
			||||||
        redis.del(key(type, feed_id))
 | 
					 | 
				
			||||||
        reblog_key = key(type, feed_id, 'reblogs')
 | 
					        reblog_key = key(type, feed_id, 'reblogs')
 | 
				
			||||||
        # We collect a future for this: we don't block while getting
 | 
					        # We collect a future for this: we don't block while getting
 | 
				
			||||||
        # it, but we can iterate over it later.
 | 
					        # it, but we can iterate over it later.
 | 
				
			||||||
        reblogged_id_sets[feed_id] = redis.zrange(reblog_key, 0, -1)
 | 
					        reblogged_id_sets[feed_id] = pipeline.zrange(reblog_key, 0, -1)
 | 
				
			||||||
        redis.del(reblog_key)
 | 
					        pipeline.del(key(type, feed_id), reblog_key)
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Remove all of the reblog tracking keys we just removed the
 | 
					    # Remove all of the reblog tracking keys we just removed the
 | 
				
			||||||
    # references to.
 | 
					    # references to.
 | 
				
			||||||
    redis.pipelined do
 | 
					    keys_to_delete = reblogged_id_sets.flat_map do |feed_id, future|
 | 
				
			||||||
      reblogged_id_sets.each do |feed_id, future|
 | 
					      future.value.map do |reblogged_id|
 | 
				
			||||||
        future.value.each do |reblogged_id|
 | 
					        key(type, feed_id, "reblogs:#{reblogged_id}")
 | 
				
			||||||
          reblog_set_key = key(type, feed_id, "reblogs:#{reblogged_id}")
 | 
					 | 
				
			||||||
          redis.del(reblog_set_key)
 | 
					 | 
				
			||||||
        end
 | 
					 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    redis.del(keys_to_delete) unless keys_to_delete.empty?
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    nil
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  private
 | 
					  private
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -20,9 +20,9 @@ class FollowRecommendationSuppression < ApplicationRecord
 | 
				
			||||||
  private
 | 
					  private
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def remove_follow_recommendations
 | 
					  def remove_follow_recommendations
 | 
				
			||||||
    redis.pipelined do
 | 
					    redis.pipelined do |pipeline|
 | 
				
			||||||
      I18n.available_locales.each do |locale|
 | 
					      I18n.available_locales.each do |locale|
 | 
				
			||||||
        redis.zrem("follow_recommendations:#{locale}", account_id)
 | 
					        pipeline.zrem("follow_recommendations:#{locale}", account_id)
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -45,9 +45,9 @@ class BatchedRemoveStatusService < BaseService
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Cannot be batched
 | 
					    # Cannot be batched
 | 
				
			||||||
    @status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago)
 | 
					    @status_id_cutoff = Mastodon::Snowflake.id_at(2.weeks.ago)
 | 
				
			||||||
    redis.pipelined do
 | 
					    redis.pipelined do |pipeline|
 | 
				
			||||||
      statuses.each do |status|
 | 
					      statuses.each do |status|
 | 
				
			||||||
        unpush_from_public_timelines(status)
 | 
					        unpush_from_public_timelines(status, pipeline)
 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
| 
						 | 
					@ -70,22 +70,22 @@ class BatchedRemoveStatusService < BaseService
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def unpush_from_public_timelines(status)
 | 
					  def unpush_from_public_timelines(status, pipeline)
 | 
				
			||||||
    return unless status.public_visibility? && status.id > @status_id_cutoff
 | 
					    return unless status.public_visibility? && status.id > @status_id_cutoff
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    payload = Oj.dump(event: :delete, payload: status.id.to_s)
 | 
					    payload = Oj.dump(event: :delete, payload: status.id.to_s)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    redis.publish('timeline:public', payload)
 | 
					    pipeline.publish('timeline:public', payload)
 | 
				
			||||||
    redis.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
 | 
					    pipeline.publish(status.local? ? 'timeline:public:local' : 'timeline:public:remote', payload)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    if status.media_attachments.any?
 | 
					    if status.media_attachments.any?
 | 
				
			||||||
      redis.publish('timeline:public:media', payload)
 | 
					      pipeline.publish('timeline:public:media', payload)
 | 
				
			||||||
      redis.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
 | 
					      pipeline.publish(status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', payload)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
 | 
					    status.tags.map { |tag| tag.name.mb_chars.downcase }.each do |hashtag|
 | 
				
			||||||
      redis.publish("timeline:hashtag:#{hashtag}", payload)
 | 
					      pipeline.publish("timeline:hashtag:#{hashtag}", payload)
 | 
				
			||||||
      redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
 | 
					      pipeline.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -20,7 +20,7 @@ class Scheduler::FollowRecommendationsScheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Trends.available_locales.each do |locale|
 | 
					    Trends.available_locales.each do |locale|
 | 
				
			||||||
      recommendations = if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
 | 
					      recommendations = if AccountSummary.safe.filtered.localized(locale).exists? # We can skip the work if no accounts with that language exist
 | 
				
			||||||
                          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.account_id, recommendation.rank] }
 | 
					                          FollowRecommendation.localized(locale).order(rank: :desc).limit(SET_SIZE).map { |recommendation| [recommendation.rank, recommendation.account_id] }
 | 
				
			||||||
                        else
 | 
					                        else
 | 
				
			||||||
                          []
 | 
					                          []
 | 
				
			||||||
                        end
 | 
					                        end
 | 
				
			||||||
| 
						 | 
					@ -33,14 +33,14 @@ class Scheduler::FollowRecommendationsScheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # Language-specific results should be above language-agnostic ones,
 | 
					        # Language-specific results should be above language-agnostic ones,
 | 
				
			||||||
        # otherwise language-agnostic ones will always overshadow them
 | 
					        # otherwise language-agnostic ones will always overshadow them
 | 
				
			||||||
        recommendations.map! { |(account_id, rank)| [account_id, rank + max_fallback_rank] }
 | 
					        recommendations.map! { |(rank, account_id)| [rank + max_fallback_rank, account_id] }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        added = 0
 | 
					        added = 0
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        fallback_recommendations.each do |recommendation|
 | 
					        fallback_recommendations.each do |recommendation|
 | 
				
			||||||
          next if recommendations.any? { |(account_id, _)| account_id == recommendation.account_id }
 | 
					          next if recommendations.any? { |(_, account_id)| account_id == recommendation.account_id }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          recommendations << [recommendation.account_id, recommendation.rank]
 | 
					          recommendations << [recommendation.rank, recommendation.account_id]
 | 
				
			||||||
          added += 1
 | 
					          added += 1
 | 
				
			||||||
 | 
					
 | 
				
			||||||
          break if added >= missing
 | 
					          break if added >= missing
 | 
				
			||||||
| 
						 | 
					@ -49,10 +49,7 @@ class Scheduler::FollowRecommendationsScheduler
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      redis.multi do |multi|
 | 
					      redis.multi do |multi|
 | 
				
			||||||
        multi.del(key(locale))
 | 
					        multi.del(key(locale))
 | 
				
			||||||
 | 
					        multi.zadd(key(locale), recommendations)
 | 
				
			||||||
        recommendations.each do |(account_id, rank)|
 | 
					 | 
				
			||||||
          multi.zadd(key(locale), rank, account_id)
 | 
					 | 
				
			||||||
        end
 | 
					 | 
				
			||||||
      end
 | 
					      end
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -87,6 +87,8 @@ Rails.application.configure do
 | 
				
			||||||
  config.x.otp_secret = ENV.fetch('OTP_SECRET', '1fc2b87989afa6351912abeebe31ffc5c476ead9bf8b3d74cbc4a302c7b69a45b40b1bbef3506ddad73e942e15ed5ca4b402bf9a66423626051104f4b5f05109')
 | 
					  config.x.otp_secret = ENV.fetch('OTP_SECRET', '1fc2b87989afa6351912abeebe31ffc5c476ead9bf8b3d74cbc4a302c7b69a45b40b1bbef3506ddad73e942e15ed5ca4b402bf9a66423626051104f4b5f05109')
 | 
				
			||||||
end
 | 
					end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Redis.raise_deprecations = true
 | 
				
			||||||
 | 
					
 | 
				
			||||||
ActiveRecordQueryTrace.enabled = ENV['QUERY_TRACE_ENABLED'] == 'true'
 | 
					ActiveRecordQueryTrace.enabled = ENV['QUERY_TRACE_ENABLED'] == 'true'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
module PrivateAddressCheck
 | 
					module PrivateAddressCheck
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -73,3 +73,5 @@ end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
# Catch serialization warnings early
 | 
					# Catch serialization warnings early
 | 
				
			||||||
Sidekiq.strict_args!
 | 
					Sidekiq.strict_args!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					Redis.raise_deprecations = true
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
							
								
								
									
										1
									
								
								config/initializers/redis.rb
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								config/initializers/redis.rb
									
									
									
									
									
										Normal file
									
								
							| 
						 | 
					@ -0,0 +1 @@
 | 
				
			||||||
 | 
					Redis.sadd_returns_boolean = false
 | 
				
			||||||
| 
						 | 
					@ -1,6 +1,6 @@
 | 
				
			||||||
class FixReblogsInFeeds < ActiveRecord::Migration[5.1]
 | 
					class FixReblogsInFeeds < ActiveRecord::Migration[5.1]
 | 
				
			||||||
  def up
 | 
					  def up
 | 
				
			||||||
    redis = Redis.current
 | 
					    redis = RedisConfiguration.pool.checkout
 | 
				
			||||||
    fm = FeedManager.instance
 | 
					    fm = FeedManager.instance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # Old scheme:
 | 
					    # Old scheme:
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -2,7 +2,8 @@ class MigrateUnavailableInboxes < ActiveRecord::Migration[5.2]
 | 
				
			||||||
  disable_ddl_transaction!
 | 
					  disable_ddl_transaction!
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def up
 | 
					  def up
 | 
				
			||||||
    urls = Redis.current.smembers('unavailable_inboxes')
 | 
					    redis = RedisConfiguration.pool.checkout
 | 
				
			||||||
 | 
					    urls = redis.smembers('unavailable_inboxes')
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    hosts = urls.map do |url|
 | 
					    hosts = urls.map do |url|
 | 
				
			||||||
      Addressable::URI.parse(url).normalized_host
 | 
					      Addressable::URI.parse(url).normalized_host
 | 
				
			||||||
| 
						 | 
					@ -14,7 +15,7 @@ class MigrateUnavailableInboxes < ActiveRecord::Migration[5.2]
 | 
				
			||||||
      UnavailableDomain.create(domain: host)
 | 
					      UnavailableDomain.create(domain: host)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    Redis.current.del(*(['unavailable_inboxes'] + Redis.current.keys('exhausted_deliveries:*')))
 | 
					    redis.del(*(['unavailable_inboxes'] + redis.keys('exhausted_deliveries:*')))
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  def down; end
 | 
					  def down; end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -53,11 +53,7 @@ module Mastodon
 | 
				
			||||||
    desc 'clear', 'Remove all home and list feeds from Redis'
 | 
					    desc 'clear', 'Remove all home and list feeds from Redis'
 | 
				
			||||||
    def clear
 | 
					    def clear
 | 
				
			||||||
      keys = redis.keys('feed:*')
 | 
					      keys = redis.keys('feed:*')
 | 
				
			||||||
 | 
					      redis.del(keys)
 | 
				
			||||||
      redis.pipelined do
 | 
					 | 
				
			||||||
        keys.each { |key| redis.del(key) }
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      say('OK', :green)
 | 
					      say('OK', :green)
 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue