Add follower synchronization mechanism (#14510)

* Add support for followers synchronization on the receiving end

Check the `collectionSynchronization` attribute on `Create` and `Announce`
activities and synchronize followers from provided collection if possible.

* Add tests for followers synchronization on the receiving end

* Add support for follower synchronization on the sender's end

* Add tests for the sending end

* Switch from AS attributes to HTTP header

Replace the custom `collectionSynchronization` ActivityStreams attribute by
an HTTP header (`X-AS-Collection-Synchronization`) with the same syntax as
the `Signature` header and the following fields:
- `collectionId` to specify which collection to synchronize
- `digest` for the SHA256 hex-digest of the list of followers known on the
   receiving instance (where “receiving instance” is determined by accounts
   sharing the same host name for their ActivityPub actor `id`)
- `url` of a collection that should be fetched by the instance actor

Internally, move away from the webfinger-based `domain` attribute and use
account `uri` prefix to group accounts.

* Add environment variable to disable followers synchronization

Since the whole mechanism relies on some new preconditions that, in some
extremely rare cases, might not be met, add an environment variable
(DISABLE_FOLLOWERS_SYNCHRONIZATION) to disable the mechanism altogether and
avoid followers being incorrectly removed.

The current conditions are:
1. all managed accounts' actor `id` and inbox URL have the same URI scheme and
   netloc.
2. all accounts whose actor `id` or inbox URL share the same URI scheme and
   netloc as a managed account must be managed by the same Mastodon instance
   as well.

As far as Mastodon is concerned, breaking those preconditions require extensive
configuration changes in the reverse proxy and might also cause other issues.

Therefore, this environment variable provides a way out for people with highly
unusual configurations, and can be safely ignored for the overwhelming majority
of Mastodon administrators.

* Only set follower synchronization header on non-public statuses

This is to avoid unnecessary computations and allow Follow-related
activities to be handled by the usual codepath instead of going through
the synchronization mechanism (otherwise, any Follow/Undo/Accept activity
would trigger the synchronization mechanism even if processing the activity
itself would be enough to re-introduce synchronization)

* Change how ActivityPub::SynchronizeFollowersService handles follow requests

If the remote lists a local follower which we only know has sent a follow
request, consider the follow request as accepted instead of sending an Undo.

* Integrate review feeback

- rename X-AS-Collection-Synchronization to Collection-Synchronization
- various minor refactoring and code style changes

* Only select required fields when computing followers_hash

* Use actor URI rather than webfinger domain in synchronization endpoint

* Change hash computation to be a XOR of individual hashes

Makes it much easier to be memory-efficient, and avoid sorting discrepancy issues.

* Marginally improve followers_hash computation speed

* Further improve hash computation performances by using pluck_each
th-downstream
ThibG 4 years ago committed by GitHub
parent 875a771c0b
commit 694a5e33d5

@ -159,3 +159,6 @@ end
gem 'concurrent-ruby', require: false
gem 'connection_pool', require: false
gem 'xorcist', '~> 1.1'
gem 'pluck_each', '~> 0.1.3'

@ -403,6 +403,9 @@ GEM
pghero (2.7.2)
activerecord (>= 5)
pkg-config (1.4.4)
pluck_each (0.1.3)
activerecord (> 3.2.0)
activesupport (> 3.0.0)
posix-spawn (0.3.15)
premailer (1.14.2)
addressable
@ -662,6 +665,7 @@ GEM
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.5)
wisper (2.0.1)
xorcist (1.1.2)
xpath (3.2.0)
nokogiri (~> 1.8)
@ -748,6 +752,7 @@ DEPENDENCIES
pg (~> 1.2)
pghero (~> 2.7)
pkg-config (~> 1.4)
pluck_each (~> 0.1.3)
posix-spawn
premailer-rails
private_address_check (~> 0.5)
@ -796,3 +801,4 @@ DEPENDENCIES
webmock (~> 3.9)
webpacker (~> 5.2)
webpush
xorcist (~> 1.1)

@ -0,0 +1,36 @@
# frozen_string_literal: true
class ActivityPub::FollowersSynchronizationsController < ActivityPub::BaseController
include SignatureVerification
include AccountOwnedConcern
before_action :require_signature!
before_action :set_items
before_action :set_cache_headers
def show
expires_in 0, public: false
render json: collection_presenter,
serializer: ActivityPub::CollectionSerializer,
adapter: ActivityPub::Adapter,
content_type: 'application/activity+json'
end
private
def uri_prefix
signed_request_account.uri[/http(s?):\/\/[^\/]+\//]
end
def set_items
@items = @account.followers.where(Account.arel_table[:uri].matches(uri_prefix + '%', false, true)).pluck(:uri)
end
def collection_presenter
ActivityPub::CollectionPresenter.new(
id: account_followers_synchronization_url(@account),
type: :ordered,
items: @items
)
end
end

@ -11,6 +11,7 @@ class ActivityPub::InboxesController < ActivityPub::BaseController
def create
upgrade_account
process_collection_synchronization
process_payload
head 202
end
@ -52,6 +53,19 @@ class ActivityPub::InboxesController < ActivityPub::BaseController
DeliveryFailureTracker.reset!(signed_request_account.inbox_url)
end
def process_collection_synchronization
raw_params = request.headers['Collection-Synchronization']
return if raw_params.blank? || ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] == 'true'
# Re-using the syntax for signature parameters
tree = SignatureParamsParser.new.parse(raw_params)
params = SignatureParamsTransformer.new.apply(tree)
ActivityPub::PrepareFollowersSynchronizationService.new.call(signed_request_account, params)
rescue Parslet::ParseFailed
Rails.logger.warn 'Error parsing Collection-Synchronization header'
end
def process_payload
ActivityPub::ProcessingWorker.perform_async(signed_request_account.id, body, @account&.id)
end

@ -40,6 +40,10 @@ class ActivityPub::TagManager
end
end
def uri_for_username(username)
account_url(username: username)
end
def generate_uri_for(_target)
URI.join(root_url, 'payloads', SecureRandom.uuid)
end

@ -352,6 +352,12 @@ class Account < ApplicationRecord
shared_inbox_url.presence || inbox_url
end
def synchronization_uri_prefix
return 'local' if local?
@synchronization_uri_prefix ||= uri[/http(s?):\/\/[^\/]+\//]
end
class Field < ActiveModelSerializers::Model
attributes :name, :value, :verified_at, :account, :errors

@ -243,6 +243,26 @@ module AccountInteractions
.where('users.current_sign_in_at > ?', User::ACTIVE_DURATION.ago)
end
def remote_followers_hash(url_prefix)
Rails.cache.fetch("followers_hash:#{id}:#{url_prefix}") do
digest = "\x00" * 32
followers.where(Account.arel_table[:uri].matches(url_prefix + '%', false, true)).pluck_each(:uri) do |uri|
Xorcist.xor!(digest, Digest::SHA256.digest(uri))
end
digest.unpack('H*')[0]
end
end
def local_followers_hash
Rails.cache.fetch("followers_hash:#{id}:local") do
digest = "\x00" * 32
followers.where(domain: nil).pluck_each(:username) do |username|
Xorcist.xor!(digest, Digest::SHA256.digest(ActivityPub::TagManager.instance.uri_for_username(username)))
end
digest.unpack('H*')[0]
end
end
private
def remove_potential_friendship(other_account, mutual = false)

@ -41,8 +41,10 @@ class Follow < ApplicationRecord
before_validation :set_uri, only: :create
after_create :increment_cache_counters
after_create :invalidate_hash_cache
after_destroy :remove_endorsements
after_destroy :decrement_cache_counters
after_destroy :invalidate_hash_cache
private
@ -63,4 +65,10 @@ class Follow < ApplicationRecord
account&.decrement_count!(:following_count)
target_account&.decrement_count!(:followers_count)
end
def invalidate_hash_cache
return if account.local? && target_account.local?
Rails.cache.delete("followers_hash:#{target_account_id}:#{account.synchronization_uri_prefix}")
end
end

@ -0,0 +1,13 @@
# frozen_string_literal: true
class ActivityPub::PrepareFollowersSynchronizationService < BaseService
include JsonLdHelper
def call(account, params)
@account = account
return if params['collectionId'] != @account.followers_url || invalid_origin?(params['url']) || @account.local_followers_hash == params['digest']
ActivityPub::FollowersSynchronizationWorker.perform_async(@account.id, params['url'])
end
end

@ -0,0 +1,74 @@
# frozen_string_literal: true
class ActivityPub::SynchronizeFollowersService < BaseService
include JsonLdHelper
include Payloadable
def call(account, partial_collection_url)
@account = account
items = collection_items(partial_collection_url)
return if items.nil?
# There could be unresolved accounts (hence the call to .compact) but this
# should never happen in practice, since in almost all cases we keep an
# Account record, and should we not do that, we should have sent a Delete.
# In any case there is not much we can do if that occurs.
@expected_followers = items.map { |uri| ActivityPub::TagManager.instance.uri_to_resource(uri, Account) }.compact
remove_unexpected_local_followers!
handle_unexpected_outgoing_follows!
end
private
def remove_unexpected_local_followers!
@account.followers.local.where.not(id: @expected_followers.map(&:id)).each do |unexpected_follower|
UnfollowService.new.call(unexpected_follower, @account)
end
end
def handle_unexpected_outgoing_follows!
@expected_followers.each do |expected_follower|
next if expected_follower.following?(@account)
if expected_follower.requested?(@account)
# For some reason the follow request went through but we missed it
expected_follower.follow_requests.find_by(target_account: @account)&.authorize!
else
# Since we were not aware of the follow from our side, we do not have an
# ID for it that we can include in the Undo activity. For this reason,
# the Undo may not work with software that relies exclusively on
# matching activity IDs and not the actor and target
follow = Follow.new(account: expected_follower, target_account: @account)
ActivityPub::DeliveryWorker.perform_async(build_undo_follow_json(follow), follow.account_id, follow.target_account.inbox_url)
end
end
end
def build_undo_follow_json(follow)
Oj.dump(serialize_payload(follow, ActivityPub::UndoFollowSerializer))
end
def collection_items(collection_or_uri)
collection = fetch_collection(collection_or_uri)
return unless collection.is_a?(Hash)
collection = fetch_collection(collection['first']) if collection['first'].present?
return unless collection.is_a?(Hash)
case collection['type']
when 'Collection', 'CollectionPage'
collection['items']
when 'OrderedCollection', 'OrderedCollectionPage'
collection['orderedItems']
end
end
def fetch_collection(collection_or_uri)
return collection_or_uri if collection_or_uri.is_a?(Hash)
return if invalid_origin?(collection_or_uri)
fetch_resource_without_id_validation(collection_or_uri, nil, true)
end
end

@ -2,6 +2,7 @@
class ActivityPub::DeliveryWorker
include Sidekiq::Worker
include RoutingHelper
include JsonLdHelper
STOPLIGHT_FAILURE_THRESHOLD = 10
@ -38,9 +39,18 @@ class ActivityPub::DeliveryWorker
Request.new(:post, @inbox_url, body: @json, http_client: http_client).tap do |request|
request.on_behalf_of(@source_account, :uri, sign_with: @options[:sign_with])
request.add_headers(HEADERS)
request.add_headers({ 'Collection-Synchronization' => synchronization_header }) if ENV['DISABLE_FOLLOWERS_SYNCHRONIZATION'] != 'true' && @options[:synchronize_followers]
end
end
def synchronization_header
"collectionId=\"#{account_followers_url(@source_account)}\", digest=\"#{@source_account.remote_followers_hash(inbox_url_prefix)}\", url=\"#{account_followers_synchronization_url(@source_account)}\""
end
def inbox_url_prefix
@inbox_url[/http(s?):\/\/[^\/]+\//]
end
def perform_request
light = Stoplight(@inbox_url) do
request_pool.with(@host) do |http_client|

@ -13,7 +13,7 @@ class ActivityPub::DistributionWorker
return if skip_distribution?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @account.id, inbox_url]
[payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
end
relay! if relayable?

@ -0,0 +1,14 @@
# frozen_string_literal: true
class ActivityPub::FollowersSynchronizationWorker
include Sidekiq::Worker
sidekiq_options queue: 'push', lock: :until_executed
def perform(account_id, url)
@account = Account.find_by(id: account_id)
return true if @account.nil?
ActivityPub::SynchronizeFollowersService.new.call(@account, url)
end
end

@ -83,6 +83,7 @@ Rails.application.routes.draw do
resource :inbox, only: [:create], module: :activitypub
resource :claim, only: [:create], module: :activitypub
resources :collections, only: [:show], module: :activitypub
resource :followers_synchronization, only: [:show], module: :activitypub
end
resource :inbox, only: [:create], module: :activitypub

@ -0,0 +1,58 @@
require 'rails_helper'
RSpec.describe ActivityPub::FollowersSynchronizationsController, type: :controller do
let!(:account) { Fabricate(:account) }
let!(:follower_1) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/a') }
let!(:follower_2) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/users/b') }
let!(:follower_3) { Fabricate(:account, domain: 'foo.com', uri: 'https://foo.com/users/a') }
before do
follower_1.follow!(account)
follower_2.follow!(account)
follower_3.follow!(account)
end
before do
allow(controller).to receive(:signed_request_account).and_return(remote_account)
end
describe 'GET #show' do
context 'without signature' do
let(:remote_account) { nil }
before do
get :show, params: { account_username: account.username }
end
it 'returns http not authorized' do
expect(response).to have_http_status(401)
end
end
context 'with signature from example.com' do
let(:remote_account) { Fabricate(:account, domain: 'example.com', uri: 'https://example.com/instance') }
before do
get :show, params: { account_username: account.username }
end
it 'returns http success' do
expect(response).to have_http_status(200)
end
it 'returns application/activity+json' do
expect(response.content_type).to eq 'application/activity+json'
end
it 'returns orderedItems with followers from example.com' do
json = body_as_json
expect(json[:orderedItems]).to be_an Array
expect(json[:orderedItems].sort).to eq [follower_1.uri, follower_2.uri]
end
it 'returns private Cache-Control header' do
expect(response.headers['Cache-Control']).to eq 'max-age=0, private'
end
end
end
end

@ -22,6 +22,56 @@ RSpec.describe ActivityPub::InboxesController, type: :controller do
end
end
context 'with Collection-Synchronization header' do
let(:remote_account) { Fabricate(:account, followers_url: 'https://example.com/followers', domain: 'example.com', uri: 'https://example.com/actor', protocol: :activitypub) }
let(:synchronization_collection) { remote_account.followers_url }
let(:synchronization_url) { 'https://example.com/followers-for-domain' }
let(:synchronization_hash) { 'somehash' }
let(:synchronization_header) { "collectionId=\"#{synchronization_collection}\", digest=\"#{synchronization_hash}\", url=\"#{synchronization_url}\"" }
before do
allow(ActivityPub::FollowersSynchronizationWorker).to receive(:perform_async).and_return(nil)
allow_any_instance_of(Account).to receive(:local_followers_hash).and_return('somehash')
request.headers['Collection-Synchronization'] = synchronization_header
post :create, body: '{}'
end
context 'with mismatching target collection' do
let(:synchronization_collection) { 'https://example.com/followers2' }
it 'does not start a synchronization job' do
expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
end
end
context 'with mismatching domain in partial collection attribute' do
let(:synchronization_url) { 'https://example.org/followers' }
it 'does not start a synchronization job' do
expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
end
end
context 'with matching digest' do
it 'does not start a synchronization job' do
expect(ActivityPub::FollowersSynchronizationWorker).not_to have_received(:perform_async)
end
end
context 'with mismatching digest' do
let(:synchronization_hash) { 'wronghash' }
it 'starts a synchronization job' do
expect(ActivityPub::FollowersSynchronizationWorker).to have_received(:perform_async)
end
end
it 'returns http accepted' do
expect(response).to have_http_status(202)
end
end
context 'without signature' do
before do
post :create, body: '{}'

@ -539,6 +539,49 @@ describe AccountInteractions do
end
end
describe '#followers_hash' do
let(:me) { Fabricate(:account, username: 'Me') }
let(:remote_1) { Fabricate(:account, username: 'alice', domain: 'example.org', uri: 'https://example.org/users/alice') }
let(:remote_2) { Fabricate(:account, username: 'bob', domain: 'example.org', uri: 'https://example.org/users/bob') }
let(:remote_3) { Fabricate(:account, username: 'eve', domain: 'foo.org', uri: 'https://foo.org/users/eve') }
before do
remote_1.follow!(me)
remote_2.follow!(me)
remote_3.follow!(me)
me.follow!(remote_1)
end
context 'on a local user' do
it 'returns correct hash for remote domains' do
expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
expect(me.remote_followers_hash('https://foo.org/')).to eq 'ccb9c18a67134cfff9d62c7f7e7eb88e6b803446c244b84265565f4eba29df0e'
end
it 'invalidates cache as needed when removing or adding followers' do
expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
remote_1.unfollow!(me)
expect(me.remote_followers_hash('https://example.org/')).to eq '241b00794ce9b46aa864f3220afadef128318da2659782985bac5ed5bd436bff'
remote_1.follow!(me)
expect(me.remote_followers_hash('https://example.org/')).to eq '707962e297b7bd94468a21bc8e506a1bcea607a9142cd64e27c9b106b2a5f6ec'
end
end
context 'on a remote user' do
it 'returns correct hash for remote domains' do
expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
end
it 'invalidates cache as needed when removing or adding followers' do
expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
me.unfollow!(remote_1)
expect(remote_1.local_followers_hash).to eq '0000000000000000000000000000000000000000000000000000000000000000'
me.follow!(remote_1)
expect(remote_1.local_followers_hash).to eq Digest::SHA256.hexdigest(ActivityPub::TagManager.instance.uri_for(me))
end
end
end
describe 'muting an account' do
let(:me) { Fabricate(:account, username: 'Me') }
let(:you) { Fabricate(:account, username: 'You') }

@ -0,0 +1,105 @@
require 'rails_helper'
RSpec.describe ActivityPub::SynchronizeFollowersService, type: :service do
let(:actor) { Fabricate(:account, domain: 'example.com', uri: 'http://example.com/account', inbox_url: 'http://example.com/inbox') }
let(:alice) { Fabricate(:account, username: 'alice') }
let(:bob) { Fabricate(:account, username: 'bob') }
let(:eve) { Fabricate(:account, username: 'eve') }
let(:mallory) { Fabricate(:account, username: 'mallory') }
let(:collection_uri) { 'http://example.com/partial-followers' }
let(:items) do
[
ActivityPub::TagManager.instance.uri_for(alice),
ActivityPub::TagManager.instance.uri_for(eve),
ActivityPub::TagManager.instance.uri_for(mallory),
]
end
let(:payload) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
type: 'Collection',
id: collection_uri,
items: items,
}.with_indifferent_access
end
subject { described_class.new }
shared_examples 'synchronizes followers' do
before do
alice.follow!(actor)
bob.follow!(actor)
mallory.request_follow!(actor)
allow(ActivityPub::DeliveryWorker).to receive(:perform_async)
subject.call(actor, collection_uri)
end
it 'keeps expected followers' do
expect(alice.following?(actor)).to be true
end
it 'removes local followers not in the remote list' do
expect(bob.following?(actor)).to be false
end
it 'converts follow requests to follow relationships when they have been accepted' do
expect(mallory.following?(actor)).to be true
end
it 'sends an Undo Follow to the actor' do
expect(ActivityPub::DeliveryWorker).to have_received(:perform_async).with(anything, eve.id, actor.inbox_url)
end
end
describe '#call' do
context 'when the endpoint is a Collection of actor URIs' do
before do
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
end
it_behaves_like 'synchronizes followers'
end
context 'when the endpoint is an OrderedCollection of actor URIs' do
let(:payload) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
type: 'OrderedCollection',
id: collection_uri,
orderedItems: items,
}.with_indifferent_access
end
before do
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
end
it_behaves_like 'synchronizes followers'
end
context 'when the endpoint is a paginated Collection of actor URIs' do
let(:payload) do
{
'@context': 'https://www.w3.org/ns/activitystreams',
type: 'Collection',
id: collection_uri,
first: {
type: 'CollectionPage',
partOf: collection_uri,
items: items,
}
}.with_indifferent_access
end
before do
stub_request(:get, collection_uri).to_return(status: 200, body: Oj.dump(payload))
end
it_behaves_like 'synchronizes followers'
end
end
end

@ -3,16 +3,22 @@
require 'rails_helper'
describe ActivityPub::DeliveryWorker do
include RoutingHelper
subject { described_class.new }
let(:sender) { Fabricate(:account) }
let(:payload) { 'test' }
before do
allow_any_instance_of(Account).to receive(:remote_followers_hash).with('https://example.com/').and_return('somehash')
end
describe 'perform' do
it 'performs a request' do
stub_request(:post, 'https://example.com/api').to_return(status: 200)
subject.perform(payload, sender.id, 'https://example.com/api')
expect(a_request(:post, 'https://example.com/api')).to have_been_made.once
subject.perform(payload, sender.id, 'https://example.com/api', { synchronize_followers: true })
expect(a_request(:post, 'https://example.com/api').with(headers: { 'Collection-Synchronization' => "collectionId=\"#{account_followers_url(sender)}\", digest=\"somehash\", url=\"#{account_followers_synchronization_url(sender)}\"" })).to have_been_made.once
end
it 'raises when request fails' do

Loading…
Cancel
Save