@ -6,33 +6,56 @@ class FeedManager
include Singleton
include Redisable
# Maximum number of items stored in a single feed
MAX_ITEMS = 400
# Must be <= MAX_ITEMS or the tracking sets will grow forever
# Number of items in the feed since last reblog of status
# before the new reblog will be inserted. Must be <= MAX_ITEMS
# or the tracking sets will grow forever
REBLOG_FALLOFF = 40
# Execute block for every active account
# @yield [Account]
# @return [void]
def with_active_accounts ( & block )
Account . joins ( :user ) . where ( 'users.current_sign_in_at > ?' , User :: ACTIVE_DURATION . ago ) . find_each ( & block )
end
# Redis key of a feed
# @param [Symbol] type
# @param [Integer] id
# @param [Symbol] subtype
# @return [String]
def key ( type , id , subtype = nil )
return " feed: #{ type } : #{ id } " unless subtype
" feed: #{ type } : #{ id } : #{ subtype } "
end
def filter? ( timeline_type , status , receiver_id )
if timeline_type == :home
filter_from_home? ( status , receiver_id , build_crutches ( receiver_id , [ status ] ) )
elsif timeline_type == :mentions
filter_from_mentions? ( status , receiver_id )
elsif timeline_type == :direct
filter_from_direct? ( status , receiver_id )
# Check if the status should not be added to a feed
# @param [Symbol] timeline_type
# @param [Status] status
# @param [Account|List] receiver
# @return [Boolean]
def filter? ( timeline_type , status , receiver )
case timeline_type
when :home
filter_from_home? ( status , receiver . id , build_crutches ( receiver . id , [ status ] ) )
when :list
filter_from_list? ( status , receiver ) || filter_from_home? ( status , receiver . account_id , build_crutches ( receiver . account_id , [ status ] ) )
when :mentions
filter_from_mentions? ( status , receiver . id )
when :direct
filter_from_direct? ( status , receiver . id )
else
false
end
end
# Add a status to a home feed and send a streaming API update
# @param [Account] account
# @param [Status] status
# @return [Boolean]
def push_to_home ( account , status )
return false unless add_to_feed ( :home , account . id , status , account . user & . aggregates_reblogs? )
@ -41,6 +64,10 @@ class FeedManager
true
end
# Remove a status from a home feed and send a streaming API update
# @param [Account] account
# @param [Status] status
# @return [Boolean]
def unpush_from_home ( account , status )
return false unless remove_from_feed ( :home , account . id , status , account . user & . aggregates_reblogs? )
@ -48,21 +75,22 @@ class FeedManager
true
end
# Add a status to a list feed and send a streaming API update
# @param [List] list
# @param [Status] status
# @return [Boolean]
def push_to_list ( list , status )
if status . reply? && status . in_reply_to_account_id != status . account_id
should_filter = status . in_reply_to_account_id != list . account_id
should_filter && = ! list . show_all_replies?
should_filter && = ! ( list . show_list_replies? && ListAccount . where ( list_id : list . id , account_id : status . in_reply_to_account_id ) . exists? )
return false if should_filter
end
return false unless add_to_feed ( :list , list . id , status , list . account . user & . aggregates_reblogs? )
return false if filter_from_list? ( status , list ) || ! add_to_feed ( :list , list . id , status , list . account . user & . aggregates_reblogs? )
trim ( :list , list . id )
PushUpdateWorker . perform_async ( list . account_id , status . id , " timeline:list: #{ list . id } " ) if push_update_required? ( " timeline:list: #{ list . id } " )
true
end
# Remove a status from a list feed and send a streaming API update
# @param [List] list
# @param [Status] status
# @return [Boolean]
def unpush_from_list ( list , status )
return false unless remove_from_feed ( :list , list . id , status , list . account . user & . aggregates_reblogs? )
@ -70,44 +98,34 @@ class FeedManager
true
end
# Add a status to a linear direct message feed and send a streaming API update
# @param [Account] account
# @param [Status] status
# @return [Boolean]
def push_to_direct ( account , status )
return false unless add_to_feed ( :direct , account . id , status )
trim ( :direct , account . id )
PushUpdateWorker . perform_async ( account . id , status . id , " timeline:direct: #{ account . id } " )
true
end
# Remove a status from a linear direct message feed and send a streaming API update
# @param [List] list
# @param [Status] status
# @return [Boolean]
def unpush_from_direct ( account , status )
return false unless remove_from_feed ( :direct , account . id , status )
redis . publish ( " timeline:direct: #{ account . id } " , Oj . dump ( event : :delete , payload : status . id . to_s ) )
end
def trim ( type , account_id )
timeline_key = key ( type , account_id )
reblog_key = key ( type , account_id , 'reblogs' )
# Remove any items past the MAX_ITEMS'th entry in our feed
redis . zremrangebyrank ( timeline_key , 0 , - ( FeedManager :: MAX_ITEMS + 1 ) )
# Get the score of the REBLOG_FALLOFF'th item in our feed, and stop
# tracking anything after it for deduplication purposes.
falloff_rank = FeedManager :: REBLOG_FALLOFF - 1
falloff_range = redis . zrevrange ( timeline_key , falloff_rank , falloff_rank , with_scores : true )
falloff_score = falloff_range & . first & . last & . to_i || 0
# Get any reblogs we might have to clean up after.
redis . zrangebyscore ( reblog_key , 0 , falloff_score ) . each do | reblogged_id |
# Remove it from the set of reblogs we're tracking *first* to avoid races.
redis . zrem ( reblog_key , reblogged_id )
# Just drop any set we might have created to track additional reblogs.
# This means that if this reblog is deleted, we won't automatically insert
# another reblog, but also that any new reblog can be inserted into the
# feed.
redis . del ( key ( type , account_id , " reblogs: #{ reblogged_id } " ) )
end
redis . publish ( " timeline:direct: #{ account . id } " , Oj . dump ( event : :delete , payload : status . id . to_s ) )
true
end
def merge_into_timeline ( from_account , into_account )
# Fill a home feed with an account's statuses
# @param [Account] from_account
# @param [Account] into_account
# @return [void]
def merge_into_home ( from_account , into_account )
timeline_key = key ( :home , into_account . id )
aggregate = into_account . user & . aggregates_reblogs?
query = from_account . statuses . where ( visibility : [ :public , :unlisted , :private ] ) . includes ( :preloadable_poll , reblog : :account ) . limit ( FeedManager :: MAX_ITEMS / 4 )
@ -129,7 +147,37 @@ class FeedManager
trim ( :home , into_account . id )
end
def unmerge_from_timeline ( from_account , into_account )
# Fill a list feed with an account's statuses
# @param [Account] from_account
# @param [List] list
# @return [void]
def merge_into_list ( from_account , list )
timeline_key = key ( :list , list . id )
aggregate = list . account . user & . aggregates_reblogs?
query = from_account . statuses . where ( visibility : [ :public , :unlisted , :private ] ) . includes ( :preloadable_poll , reblog : :account ) . limit ( FeedManager :: MAX_ITEMS / 4 )
if redis . zcard ( timeline_key ) > = FeedManager :: MAX_ITEMS / 4
oldest_home_score = redis . zrange ( timeline_key , 0 , 0 , with_scores : true ) . first . last . to_i
query = query . where ( 'id > ?' , oldest_home_score )
end
statuses = query . to_a
crutches = build_crutches ( list . account_id , statuses )
statuses . each do | status |
next if filter_from_home? ( status , list . account_id , crutches ) || filter_from_list? ( status , list )
add_to_feed ( :list , list . id , status , aggregate )
end
trim ( :list , list . id )
end
# Remove an account's statuses from a home feed
# @param [Account] from_account
# @param [Account] into_account
# @return [void]
def unmerge_from_home ( from_account , into_account )
timeline_key = key ( :home , into_account . id )
oldest_home_score = redis . zrange ( timeline_key , 0 , 0 , with_scores : true ) & . first & . last & . to_i || 0
@ -138,14 +186,31 @@ class FeedManager
end
end
def clear_from_timeline ( account , target_account )
# Clear from timeline all statuses from or mentionning target_account
# Remove an account's statuses from a list feed
# @param [Account] from_account
# @param [List] list
# @return [void]
def unmerge_from_list ( from_account , list )
timeline_key = key ( :list , list . id )
oldest_list_score = redis . zrange ( timeline_key , 0 , 0 , with_scores : true ) & . first & . last & . to_i || 0
from_account . statuses . select ( 'id, reblog_of_id' ) . where ( 'id > ?' , oldest_list_score ) . reorder ( nil ) . find_each do | status |
remove_from_feed ( :list , list . id , status , list . account . user & . aggregates_reblogs? )
end
end
# Clear all statuses from or mentioning target_account from a home feed
# @param [Account] account
# @param [Account] target_account
# @return [void]
def clear_from_home ( account , target_account )
timeline_key = key ( :home , account . id )
timeline_status_ids = redis . zrange ( timeline_key , 0 , - 1 )
statuses = Status . where ( id : timeline_status_ids ) . select ( :id , :reblog_of_id , :account_id ) . to_a
reblogged_ids = Status . where ( id : statuses . map ( & :reblog_of_id ) . compact , account : target_account ) . pluck ( :id )
with_mentions_ids = Mention . active . where ( status_id : statuses . flat_map { | s | [ s . id , s . reblog_of_id ] } . compact , account : target_account ) . pluck ( :status_id )
target_statuses = statuses . filter do | status |
target_statuses = statuses . select do | status |
status . account_id == target_account . id || reblogged_ids . include? ( status . reblog_of_id ) || with_mentions_ids . include? ( status . id ) || with_mentions_ids . include? ( status . reblog_of_id )
end
@ -154,7 +219,10 @@ class FeedManager
end
end
def populate_feed ( account )
# Populate home feed of account from scratch
# @param [Account] account
# @return [void]
def populate_home ( account )
limit = FeedManager :: MAX_ITEMS / 2
aggregate = account . user & . aggregates_reblogs?
timeline_key = key ( :home , account . id )
@ -187,6 +255,9 @@ class FeedManager
end
end
# Populate direct feed of account from scratch
# @param [Account] account
# @return [void]
def populate_direct_feed ( account )
added = 0
limit = FeedManager :: MAX_ITEMS / 2
@ -210,15 +281,59 @@ class FeedManager
private
def push_update_required? ( timeline_id )
redis . exists? ( " subscribed: #{ timeline_id } " )
# Trim a feed to maximum size by removing older items
# @param [Symbol] type
# @param [Integer] timeline_id
# @return [void]
def trim ( type , timeline_id )
timeline_key = key ( type , timeline_id )
reblog_key = key ( type , timeline_id , 'reblogs' )
# Remove any items past the MAX_ITEMS'th entry in our feed
redis . zremrangebyrank ( timeline_key , 0 , - ( FeedManager :: MAX_ITEMS + 1 ) )
# Get the score of the REBLOG_FALLOFF'th item in our feed, and stop
# tracking anything after it for deduplication purposes.
falloff_rank = FeedManager :: REBLOG_FALLOFF
falloff_range = redis . zrevrange ( timeline_key , falloff_rank , falloff_rank , with_scores : true )
falloff_score = falloff_range & . first & . last & . to_i
return if falloff_score . nil?
# Get any reblogs we might have to clean up after.
redis . zrangebyscore ( reblog_key , 0 , falloff_score ) . each do | reblogged_id |
# Remove it from the set of reblogs we're tracking *first* to avoid races.
redis . zrem ( reblog_key , reblogged_id )
# Just drop any set we might have created to track additional reblogs.
# This means that if this reblog is deleted, we won't automatically insert
# another reblog, but also that any new reblog can be inserted into the
# feed.
redis . del ( key ( type , timeline_id , " reblogs: #{ reblogged_id } " ) )
end
end
# Check if there is a streaming API client connected
# for the given feed
# @param [String] timeline_key
# @return [Boolean]
def push_update_required? ( timeline_key )
redis . exists? ( " subscribed: #{ timeline_key } " )
end
# Check if the account is blocking or muting any of the given accounts
# @param [Integer] receiver_id
# @param [Array<Integer>] account_ids
# @param [Symbol] context
def blocks_or_mutes? ( receiver_id , account_ids , context )
Block . where ( account_id : receiver_id , target_account_id : account_ids ) . any? ||
( context == :home ? Mute . where ( account_id : receiver_id , target_account_id : account_ids ) . any? : Mute . where ( account_id : receiver_id , target_account_id : account_ids , hide_notifications : true ) . any? )
end
# Check if status should not be added to the home feed
# @param [Status] status
# @param [Integer] receiver_id
# @param [Hash] crutches
# @return [Boolean]
def filter_from_home? ( status , receiver_id , crutches )
return false if receiver_id == status . account_id
return true if status . reply? && ( status . in_reply_to_id . nil? || status . in_reply_to_account_id . nil? )
@ -251,6 +366,11 @@ class FeedManager
false
end
# Check if status should not be added to the mentions feed
# @see NotifyService
# @param [Status] status
# @param [Integer] receiver_id
# @return [Boolean]
def filter_from_mentions? ( status , receiver_id )
return true if receiver_id == status . account_id
return true if phrase_filtered? ( status , receiver_id , :notifications )
@ -267,11 +387,36 @@ class FeedManager
should_filter
end
# Check if status should not be added to the linear direct message feed
# @param [Status] status
# @param [Integer] receiver_id
# @return [Boolean]
def filter_from_direct? ( status , receiver_id )
return false if receiver_id == status . account_id
filter_from_mentions? ( status , receiver_id )
end
# Check if status should not be added to the list feed
# @param [Status] status
# @param [List] list
# @return [Boolean]
def filter_from_list? ( status , list )
if status . reply? && status . in_reply_to_account_id != status . account_id
should_filter = status . in_reply_to_account_id != list . account_id
should_filter && = ! list . show_all_replies?
should_filter && = ! ( list . show_list_replies? && ListAccount . where ( list_id : list . id , account_id : status . in_reply_to_account_id ) . exists? )
return ! ! should_filter
end
false
end
# Check if the status hits a phrase filter
# @param [Status] status
# @param [Integer] receiver_id
# @param [Symbol] context
# @return [Boolean]
def phrase_filtered? ( status , receiver_id , context )
active_filters = Rails . cache . fetch ( " filters: #{ receiver_id } " ) { CustomFilter . where ( account_id : receiver_id ) . active_irreversible . to_a } . to_a
@ -307,6 +452,11 @@ class FeedManager
# added, and false if it was not added to the feed. Note that this is
# an internal helper: callers must call trim or push updates if
# either action is appropriate.
# @param [Symbol] timeline_type
# @param [Integer] account_id
# @param [Status] status
# @param [Boolean] aggregate_reblogs
# @return [Boolean]
def add_to_feed ( timeline_type , account_id , status , aggregate_reblogs = true )
timeline_key = key ( timeline_type , account_id )
reblog_key = key ( timeline_type , account_id , 'reblogs' )
@ -319,14 +469,12 @@ class FeedManager
return false if ! rank . nil? && rank < FeedManager :: REBLOG_FALLOFF
reblog_rank = redis . zrevrank ( reblog_key , status . reblog_of_id )
if re blog_rank. nil?
# The ordered set at `reblog_key` holds statuses which have a reblog
# in the top `REBLOG_FALLOFF` statuses of the timeline
if re dis. zadd ( reblog_key , status . id , status . reblog_of_id , nx : true )
# This is not something we've already seen reblogged, so we
# can just add it to the feed (and note that we're
# reblogging it).
# can just add it to the feed (and note that we're reblogging it).
redis . zadd ( timeline_key , status . id , status . id )
redis . zadd ( reblog_key , status . id , status . reblog_of_id )
else
# Another reblog of the same status was already in the
# REBLOG_FALLOFF most recent statuses, so we note that this
@ -340,9 +488,7 @@ class FeedManager
# delay of the worker deliverying the original status, the late addition
# by merging timelines, and other reasons.
# If such a reblog already exists, just do not re-insert it into the feed.
rank = redis . zrevrank ( reblog_key , status . id )
return false unless rank . nil?
return false unless redis . zscore ( reblog_key , status . id ) . nil?
redis . zadd ( timeline_key , status . id , status . id )
end
@ -354,6 +500,11 @@ class FeedManager
# with reblogs, and returning true if a status was removed. As with
# `add_to_feed`, this does not trigger push updates, so callers must
# do so if appropriate.
# @param [Symbol] timeline_type
# @param [Integer] account_id
# @param [Status] status
# @param [Boolean] aggregate_reblogs
# @return [Boolean]
def remove_from_feed ( timeline_type , account_id , status , aggregate_reblogs = true )
timeline_key = key ( timeline_type , account_id )
reblog_key = key ( timeline_type , account_id , 'reblogs' )
@ -388,6 +539,11 @@ class FeedManager
redis . zrem ( timeline_key , status . id )
end
# Pre-fetch various objects and relationships for given statuses that
# are going to be checked by the filtering methods
# @param [Integer] receiver_id
# @param [Array<Status>] statuses
# @return [Hash]
def build_crutches ( receiver_id , statuses )
crutches = { }