Optimize FixReblogsInFeeds migration (#5538)
We have changed how we store reblogs in the redis for bigint IDs. This process is done by 1) scan all entries in users feed, and 2) re-store reblogs by 3 write commands. However, this operation is really slow for large instances. e.g. 1hrs on friends.nico (w/ 50k users). So I have tried below tweaks. * It checked non-reblogs by `entry[0] == entry[1]`, but this condition won't work because `entry[0]` is String while `entry[1]` is Float. Changing `entry[0].to_i == entry[1]` seems work. -> about 4-20x faster (feed with less reblogs will be faster) * Write operations can be batched by pipeline -> about 6x faster * Wrap operation by Lua script and execute by EVALSHA command. This really reduces packets between Ruby and Redis. -> about 3x faster I've taken Lua script way, though doing other optimizations may be enough.
This commit is contained in:
		
							parent
							
								
									fe11a72c0b
								
							
						
					
					
						commit
						75776cf237
					
				
					 1 changed files with 49 additions and 35 deletions
				
			
		| 
						 | 
					@ -3,48 +3,62 @@ class FixReblogsInFeeds < ActiveRecord::Migration[5.1]
 | 
				
			||||||
    redis = Redis.current
 | 
					    redis = Redis.current
 | 
				
			||||||
    fm = FeedManager.instance
 | 
					    fm = FeedManager.instance
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # Old scheme:
 | 
				
			||||||
 | 
					    # Each user's feed zset had a series of score:value entries,
 | 
				
			||||||
 | 
					    # where "regular" statuses had the same score and value (their
 | 
				
			||||||
 | 
					    # ID). Reblogs had a score of the reblogging status' ID, and a
 | 
				
			||||||
 | 
					    # value of the reblogged status' ID.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # New scheme:
 | 
				
			||||||
 | 
					    # The feed contains only entries with the same score and value.
 | 
				
			||||||
 | 
					    # Reblogs result in the reblogging status being added to the
 | 
				
			||||||
 | 
					    # feed, with an entry in a reblog tracking zset (where the score
 | 
				
			||||||
 | 
					    # is once again set to the reblogging status' ID, and the value
 | 
				
			||||||
 | 
					    # is set to the reblogged status' ID). This is safe for Redis'
 | 
				
			||||||
 | 
					    # float coersion because in this reblog tracking zset, we only
 | 
				
			||||||
 | 
					    # need the rebloggging status' ID to be able to stop tracking
 | 
				
			||||||
 | 
					    # entries after they have gotten too far down the feed, which
 | 
				
			||||||
 | 
					    # does not require an exact value.
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    # This process reads all feeds and writes 3 times for each reblogs.
 | 
				
			||||||
 | 
					    # So we use Lua script to avoid overhead between Ruby and Redis.
 | 
				
			||||||
 | 
					    script = <<-LUA
 | 
				
			||||||
 | 
					      local timeline_key = KEYS[1]
 | 
				
			||||||
 | 
					      local reblog_key = KEYS[2]
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					      -- So, first, we iterate over the user's feed to find any reblogs.
 | 
				
			||||||
 | 
					      local items = redis.call('zrange', timeline_key, 0, -1, 'withscores')
 | 
				
			||||||
 | 
					      
 | 
				
			||||||
 | 
					      for i = 1, #items, 2 do
 | 
				
			||||||
 | 
					        local reblogged_id = items[i]
 | 
				
			||||||
 | 
					        local reblogging_id = items[i + 1]
 | 
				
			||||||
 | 
					        if (reblogged_id ~= reblogging_id) then
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          -- The score and value don't match, so this is a reblog.
 | 
				
			||||||
 | 
					          -- (note that we're transitioning from IDs < 53 bits so we
 | 
				
			||||||
 | 
					          -- don't have to worry about the loss of precision)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          -- Remove the old entry
 | 
				
			||||||
 | 
					          redis.call('zrem', timeline_key, reblogged_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          -- Add a new one for the reblogging status
 | 
				
			||||||
 | 
					          redis.call('zadd', timeline_key, reblogging_id, reblogging_id)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					          -- Track the fact that this was a reblog
 | 
				
			||||||
 | 
					          redis.call('zadd', reblog_key, reblogging_id, reblogged_id)
 | 
				
			||||||
 | 
					        end
 | 
				
			||||||
 | 
					      end
 | 
				
			||||||
 | 
					    LUA
 | 
				
			||||||
 | 
					    script_hash = redis.script(:load, script)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    # find_each is batched on the database side.
 | 
					    # find_each is batched on the database side.
 | 
				
			||||||
    User.includes(:account).find_each do |user|
 | 
					    User.includes(:account).find_each do |user|
 | 
				
			||||||
      account = user.account
 | 
					      account = user.account
 | 
				
			||||||
 | 
					
 | 
				
			||||||
      # Old scheme:
 | 
					 | 
				
			||||||
      # Each user's feed zset had a series of score:value entries,
 | 
					 | 
				
			||||||
      # where "regular" statuses had the same score and value (their
 | 
					 | 
				
			||||||
      # ID). Reblogs had a score of the reblogging status' ID, and a
 | 
					 | 
				
			||||||
      # value of the reblogged status' ID.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      # New scheme:
 | 
					 | 
				
			||||||
      # The feed contains only entries with the same score and value.
 | 
					 | 
				
			||||||
      # Reblogs result in the reblogging status being added to the
 | 
					 | 
				
			||||||
      # feed, with an entry in a reblog tracking zset (where the score
 | 
					 | 
				
			||||||
      # is once again set to the reblogging status' ID, and the value
 | 
					 | 
				
			||||||
      # is set to the reblogged status' ID). This is safe for Redis'
 | 
					 | 
				
			||||||
      # float coersion because in this reblog tracking zset, we only
 | 
					 | 
				
			||||||
      # need the rebloggging status' ID to be able to stop tracking
 | 
					 | 
				
			||||||
      # entries after they have gotten too far down the feed, which
 | 
					 | 
				
			||||||
      # does not require an exact value.
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
      # So, first, we iterate over the user's feed to find any reblogs.
 | 
					 | 
				
			||||||
      timeline_key = fm.key(:home, account.id)
 | 
					      timeline_key = fm.key(:home, account.id)
 | 
				
			||||||
      reblog_key = fm.key(:home, account.id, 'reblogs')
 | 
					      reblog_key = fm.key(:home, account.id, 'reblogs')
 | 
				
			||||||
      redis.zrange(timeline_key, 0, -1, with_scores: true).each do |entry|
 | 
					 | 
				
			||||||
        next if entry[0] == entry[1]
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
        # The score and value don't match, so this is a reblog.
 | 
					      redis.evalsha(script_hash, [timeline_key, reblog_key])
 | 
				
			||||||
        # (note that we're transitioning from IDs < 53 bits so we
 | 
					 | 
				
			||||||
        # don't have to worry about the loss of precision)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        reblogged_id, reblogging_id = entry
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Remove the old entry
 | 
					 | 
				
			||||||
        redis.zrem(timeline_key, reblogged_id)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Add a new one for the reblogging status
 | 
					 | 
				
			||||||
        redis.zadd(timeline_key, reblogging_id, reblogging_id)
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
        # Track the fact that this was a reblog
 | 
					 | 
				
			||||||
        redis.zadd(reblog_key, reblogging_id, reblogged_id)
 | 
					 | 
				
			||||||
      end
 | 
					 | 
				
			||||||
    end
 | 
					    end
 | 
				
			||||||
  end
 | 
					  end
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
		Reference in a new issue