diff --git a/app/models/trends/base.rb b/app/models/trends/base.rb index 200f8d6358..0471112489 100644 --- a/app/models/trends/base.rb +++ b/app/models/trends/base.rb @@ -64,33 +64,38 @@ class Trends::Base redis.expire(used_key(at_time), 1.day.seconds) end - def trim_older_items - redis.zremrangebyscore("#{key_prefix}:all", '-inf', '(0.3') - redis.zremrangebyscore("#{key_prefix}:allowed", '-inf', '(0.3') - end - def score_at_rank(rank) redis.zrevrange("#{key_prefix}:allowed", 0, rank, with_scores: true).last&.last || 0 end - # @param [Integer] id - # @param [Float] score - # @param [Hash] subsets - def add_to_and_remove_from_subsets(id, score, subsets = {}) - subsets.each_key do |subset| - key = [key_prefix, subset].compact.join(':') + def replace_items(suffix, items) + tmp_prefix = "#{key_prefix}:tmp:#{SecureRandom.alphanumeric(6)}#{suffix}" + allowed_items = filter_for_allowed_items(items) + + redis.pipelined do |pipeline| + items.each { |item| pipeline.zadd("#{tmp_prefix}:all", item[:score], item[:item].id) } + allowed_items.each { |item| pipeline.zadd("#{tmp_prefix}:allowed", item[:score], item[:item].id) } - if score.positive? && subsets[subset] - redis.zadd(key, score, id) - else - redis.zrem(key, id) - end + rename_set(pipeline, "#{tmp_prefix}:all", "#{key_prefix}:all#{suffix}", items) + rename_set(pipeline, "#{tmp_prefix}:allowed", "#{key_prefix}:allowed#{suffix}", allowed_items) end end + def filter_for_allowed_items(items) + raise NotImplementedError + end + private def used_key(at_time) "#{key_prefix}:used:#{at_time.beginning_of_day.to_i}" end + + def rename_set(pipeline, from_key, to_key, set_items) + if set_items.empty? + pipeline.del(to_key) + else + pipeline.rename(from_key, to_key) + end + end end diff --git a/app/models/trends/links.rb b/app/models/trends/links.rb index 5f046643a0..604894cd67 100644 --- a/app/models/trends/links.rb +++ b/app/models/trends/links.rb @@ -8,14 +8,15 @@ class Trends::Links < Trends::Base review_threshold: 3, max_score_cooldown: 2.days.freeze, max_score_halflife: 8.hours.freeze, + decay_threshold: 1, } def register(status, at_time = Time.now.utc) - original_status = status.reblog? ? status.reblog : status + original_status = status.proper - return unless original_status.public_visibility? && status.public_visibility? && - !original_status.account.silenced? && !status.account.silenced? && - !original_status.spoiler_text? + return unless (original_status.public_visibility? && status.public_visibility?) && + !(original_status.account.silenced? || status.account.silenced?) && + !(original_status.spoiler_text? || original_status.sensitive?) original_status.preview_cards.each do |preview_card| add(preview_card, status.account_id, at_time) if preview_card.appropriate_for_trends? @@ -61,6 +62,9 @@ class Trends::Links < Trends::Base private def calculate_scores(preview_cards, at_time) + global_items = [] + locale_items = Hash.new { |h, key| h[key] = [] } + preview_cards.each do |preview_card| expected = preview_card.history.get(at_time - 1.day).accounts.to_f expected = 1.0 if expected.zero? @@ -87,33 +91,23 @@ class Trends::Links < Trends::Base decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f)) - add_to_and_remove_from_subsets(preview_card.id, decaying_score, { - all: true, - allowed: preview_card.trendable?, - }) - - next unless valid_locale?(preview_card.language) + next unless decaying_score >= options[:decay_threshold] - add_to_and_remove_from_subsets(preview_card.id, decaying_score, { - "all:#{preview_card.language}" => true, - "allowed:#{preview_card.language}" => preview_card.trendable?, - }) + global_items << { score: decaying_score, item: preview_card } + locale_items[preview_card.language] << { score: decaying_score, item: preview_card } if valid_locale?(preview_card.language) end - trim_older_items - - # Clean up localized sets by calculating the intersection with the main - # set. We do this instead of just deleting the localized sets to avoid - # having moments where the API returns empty results + replace_items('', global_items) - redis.pipelined do - Trends.available_locales.each do |locale| - redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') - redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:allowed"], aggregate: 'max') - end + Trends.available_locales.each do |locale| + replace_items(":#{locale}", locale_items[locale]) end end + def filter_for_allowed_items(items) + items.select { |item| item[:item].trendable? } + end + def would_be_trending?(id) score(id) > score_at_rank(options[:review_threshold] - 1) end diff --git a/app/models/trends/statuses.rb b/app/models/trends/statuses.rb index 3013bc1d13..777065d3e3 100644 --- a/app/models/trends/statuses.rb +++ b/app/models/trends/statuses.rb @@ -7,6 +7,7 @@ class Trends::Statuses < Trends::Base threshold: 5, review_threshold: 3, score_halflife: 2.hours.freeze, + decay_threshold: 0.3, } class Query < Trends::Query @@ -31,7 +32,7 @@ class Trends::Statuses < Trends::Base end def register(status, at_time = Time.now.utc) - add(status.proper, status.account_id, at_time) if eligible?(status) + add(status.proper, status.account_id, at_time) if eligible?(status.proper) end def add(status, _account_id, at_time = Time.now.utc) @@ -74,53 +75,45 @@ class Trends::Statuses < Trends::Base private def eligible?(status) - original_status = status.proper - - original_status.public_visibility? && - original_status.account.discoverable? && !original_status.account.silenced? && - original_status.spoiler_text.blank? && !original_status.sensitive? && !original_status.reply? + status.public_visibility? && status.account.discoverable? && !status.account.silenced? && status.spoiler_text.blank? && !status.sensitive? && !status.reply? end def calculate_scores(statuses, at_time) - redis.pipelined do - statuses.each do |status| - expected = 1.0 - observed = (status.reblogs_count + status.favourites_count).to_f - - score = begin - if expected > observed || observed < options[:threshold] - 0 - else - ((observed - expected)**2) / expected - end + global_items = [] + locale_items = Hash.new { |h, key| h[key] = [] } + + statuses.each do |status| + expected = 1.0 + observed = (status.reblogs_count + status.favourites_count).to_f + + score = begin + if expected > observed || observed < options[:threshold] + 0 + else + ((observed - expected)**2) / expected end + end - decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f)) + decaying_score = score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f)) - add_to_and_remove_from_subsets(status.id, decaying_score, { - all: true, - allowed: status.trendable? && status.account.discoverable?, - }) + next unless decaying_score >= options[:decay_threshold] - next unless valid_locale?(status.language) + global_items << { score: decaying_score, item: status } + locale_items[status.language] << { account_id: status.account_id, score: decaying_score, item: status } if valid_locale?(status.language) + end - add_to_and_remove_from_subsets(status.id, decaying_score, { - "all:#{status.language}" => true, - "allowed:#{status.language}" => status.trendable? && status.account.discoverable?, - }) - end + replace_items('', global_items) - trim_older_items + Trends.available_locales.each do |locale| + replace_items(":#{locale}", locale_items[locale]) + end + end - # Clean up localized sets by calculating the intersection with the main - # set. We do this instead of just deleting the localized sets to avoid - # having moments where the API returns empty results + def filter_for_allowed_items(items) + # Show only one status per account, pick the one with the highest score + # that's also eligible to trend - Trends.available_locales.each do |locale| - redis.zinterstore("#{key_prefix}:all:#{locale}", ["#{key_prefix}:all:#{locale}", "#{key_prefix}:all"], aggregate: 'max') - redis.zinterstore("#{key_prefix}:allowed:#{locale}", ["#{key_prefix}:allowed:#{locale}", "#{key_prefix}:allowed"], aggregate: 'max') - end - end + items.group_by { |item| item[:account_id] }.values.filter_map { |account_items| account_items.select { |item| item[:item].trendable? && item[:item].account.discoverable? }.max_by { |item| item[:score] } } end def would_be_trending?(id) diff --git a/app/models/trends/tags.rb b/app/models/trends/tags.rb index 3caa58815d..19ade52ba4 100644 --- a/app/models/trends/tags.rb +++ b/app/models/trends/tags.rb @@ -8,6 +8,7 @@ class Trends::Tags < Trends::Base review_threshold: 3, max_score_cooldown: 2.days.freeze, max_score_halflife: 4.hours.freeze, + decay_threshold: 1, } def register(status, at_time = Time.now.utc) @@ -26,7 +27,6 @@ class Trends::Tags < Trends::Base def refresh(at_time = Time.now.utc) tags = Tag.where(id: (recently_used_ids(at_time) + currently_trending_ids(false, -1)).uniq) calculate_scores(tags, at_time) - trim_older_items end def request_review @@ -53,6 +53,8 @@ class Trends::Tags < Trends::Base private def calculate_scores(tags, at_time) + items = [] + tags.each do |tag| expected = tag.history.get(at_time - 1.day).accounts.to_f expected = 1.0 if expected.zero? @@ -79,11 +81,16 @@ class Trends::Tags < Trends::Base decaying_score = max_score * (0.5**((at_time.to_f - max_time.to_f) / options[:max_score_halflife].to_f)) - add_to_and_remove_from_subsets(tag.id, decaying_score, { - all: true, - allowed: tag.trendable?, - }) + next unless decaying_score >= options[:decay_threshold] + + items << { score: decaying_score, item: tag } end + + replace_items('', items) + end + + def filter_for_allowed_items(items) + items.select { |item| item[:item].trendable? } end def would_be_trending?(id)