Merge branch 'main' into glitch-soc/merge-upstream

Conflicts:
- `app/lib/activitypub/activity/create.rb`:
  Upstream refactored how `Create` activities are handled and how values are
  extracted from `Create`d objects. This conflicted with how glitch-soc
  supported the `directMessage` flag to explicitly distinguish between
  limited and direct messages.
  Ported glitch-soc's changes to latest upstream changes.
- `app/services/fan_out_on_write_service.rb`:
  Upstream largely refactored that file and changed some of the logic.
  This conflicted with glitch-soc's handling of the direct timeline and
  the options to allow replies and boosts in public feeds.
  Ported those glitch-soc changes on top of latest upstream changes.
- `app/services/process_mentions_service.rb`:
  Upstream refactored to move mention-related ActivityPub deliveries to
  `ActivityPub::DeliveryWorker`, while glitch-soc contained an extra check
  to not send local-only toots to remote mentioned users.
  Took upstream's version, as the check is not needed anymore, since it is
  performed at the `ActivityPub::DeliveryWorker` call site already.
- `app/workers/feed_insert_worker.rb`:
  Upstream added support for `update` toot events, while glitch-soc had
  support for an extra timeline support, `direct`.
  Ported upstream changes and extended them to the `direct` timeline.

Additional changes:
- `app/lib/activitypub/parser/status_parser.rb`:
  Added code to handle the `directMessage` flag and take it into account
  to compute visibility.
- `app/lib/feed_manager.rb`:
  Extended upstream's support of `update` toot events to glitch-soc's
  `direct` timeline.
main
Claire 3 years ago
commit fe89554a54

@ -0,0 +1,21 @@
# frozen_string_literal: true
class Api::V1::Statuses::HistoriesController < Api::BaseController
include Authorization
before_action -> { authorize_if_got_token! :read, :'read:statuses' }
before_action :set_status
def show
render json: @status.edits, each_serializer: REST::StatusEditSerializer
end
private
def set_status
@status = Status.find(params[:status_id])
authorize @status, :show?
rescue Mastodon::NotPermittedError
not_found
end
end

@ -0,0 +1,21 @@
# frozen_string_literal: true
class Api::V1::Statuses::SourcesController < Api::BaseController
include Authorization
before_action -> { doorkeeper_authorize! :read, :'read:statuses' }
before_action :set_status
def show
render json: @status, serializer: REST::StatusSourceSerializer
end
private
def set_status
@status = Status.find(params[:status_id])
authorize @status, :show?
rescue Mastodon::NotPermittedError
not_found
end
end

@ -34,7 +34,13 @@ module JsonLdHelper
end end
def as_array(value) def as_array(value)
value.is_a?(Array) ? value : [value] if value.nil?
[]
elsif value.is_a?(Array)
value
else
[value]
end
end end
def value_or_id(value) def value_or_id(value)

@ -54,9 +54,10 @@ export function normalizeStatus(status, normalOldStatus) {
normalStatus.poll = status.poll.id; normalStatus.poll = status.poll.id;
} }
// Only calculate these values when status first encountered // Only calculate these values when status first encountered and
// Otherwise keep the ones already in the reducer // when the underlying values change. Otherwise keep the ones
if (normalOldStatus) { // already in the reducer
if (normalOldStatus && normalOldStatus.get('content') === normalStatus.content && normalOldStatus.get('spoiler_text') === normalStatus.spoiler_text) {
normalStatus.search_index = normalOldStatus.get('search_index'); normalStatus.search_index = normalOldStatus.get('search_index');
normalStatus.contentHtml = normalOldStatus.get('contentHtml'); normalStatus.contentHtml = normalOldStatus.get('contentHtml');
normalStatus.spoilerHtml = normalOldStatus.get('spoilerHtml'); normalStatus.spoilerHtml = normalOldStatus.get('spoilerHtml');

@ -131,6 +131,9 @@ export function deleteStatusFail(id, error) {
}; };
}; };
export const updateStatus = status => dispatch =>
dispatch(importFetchedStatus(status));
export function fetchContext(id) { export function fetchContext(id) {
return (dispatch, getState) => { return (dispatch, getState) => {
dispatch(fetchContextRequest(id)); dispatch(fetchContextRequest(id));

@ -10,6 +10,7 @@ import {
} from './timelines'; } from './timelines';
import { updateNotifications, expandNotifications } from './notifications'; import { updateNotifications, expandNotifications } from './notifications';
import { updateConversations } from './conversations'; import { updateConversations } from './conversations';
import { updateStatus } from './statuses';
import { import {
fetchAnnouncements, fetchAnnouncements,
updateAnnouncements, updateAnnouncements,
@ -75,6 +76,9 @@ export const connectTimelineStream = (timelineId, channelName, params = {}, opti
case 'update': case 'update':
dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept)); dispatch(updateTimeline(timelineId, JSON.parse(data.payload), options.accept));
break; break;
case 'status.update':
dispatch(updateStatus(JSON.parse(data.payload)));
break;
case 'delete': case 'delete':
dispatch(deleteFromTimelines(data.payload)); dispatch(deleteFromTimelines(data.payload));
break; break;

@ -57,6 +57,7 @@ const messages = defineMessages({
unlisted_short: { id: 'privacy.unlisted.short', defaultMessage: 'Unlisted' }, unlisted_short: { id: 'privacy.unlisted.short', defaultMessage: 'Unlisted' },
private_short: { id: 'privacy.private.short', defaultMessage: 'Followers-only' }, private_short: { id: 'privacy.private.short', defaultMessage: 'Followers-only' },
direct_short: { id: 'privacy.direct.short', defaultMessage: 'Direct' }, direct_short: { id: 'privacy.direct.short', defaultMessage: 'Direct' },
edited: { id: 'status.edited', defaultMessage: 'Edited {date}' },
}); });
export default @injectIntl export default @injectIntl
@ -483,7 +484,7 @@ class Status extends ImmutablePureComponent {
<div className='status__info'> <div className='status__info'>
<a onClick={this.handleClick} href={status.get('url')} className='status__relative-time' target='_blank' rel='noopener noreferrer'> <a onClick={this.handleClick} href={status.get('url')} className='status__relative-time' target='_blank' rel='noopener noreferrer'>
<span className='status__visibility-icon'><Icon id={visibilityIcon.icon} title={visibilityIcon.text} /></span> <span className='status__visibility-icon'><Icon id={visibilityIcon.icon} title={visibilityIcon.text} /></span>
<RelativeTimestamp timestamp={status.get('created_at')} /> <RelativeTimestamp timestamp={status.get('created_at')} />{status.get('edited_at') && <abbr title={intl.formatMessage(messages.edited, { date: intl.formatDate(status.get('edited_at'), { hour12: false, year: 'numeric', month: 'short', day: '2-digit', hour: '2-digit', minute: '2-digit' }) })}> *</abbr>}
</a> </a>
<a onClick={this.handleAccountClick} href={status.getIn(['account', 'url'])} title={status.getIn(['account', 'acct'])} className='status__display-name' target='_blank' rel='noopener noreferrer'> <a onClick={this.handleAccountClick} href={status.getIn(['account', 'url'])} title={status.getIn(['account', 'acct'])} className='status__display-name' target='_blank' rel='noopener noreferrer'>

@ -6,7 +6,7 @@ import DisplayName from '../../../components/display_name';
import StatusContent from '../../../components/status_content'; import StatusContent from '../../../components/status_content';
import MediaGallery from '../../../components/media_gallery'; import MediaGallery from '../../../components/media_gallery';
import { Link } from 'react-router-dom'; import { Link } from 'react-router-dom';
import { injectIntl, defineMessages, FormattedDate } from 'react-intl'; import { injectIntl, defineMessages, FormattedDate, FormattedMessage } from 'react-intl';
import Card from './card'; import Card from './card';
import ImmutablePureComponent from 'react-immutable-pure-component'; import ImmutablePureComponent from 'react-immutable-pure-component';
import Video from '../../video'; import Video from '../../video';
@ -116,6 +116,7 @@ class DetailedStatus extends ImmutablePureComponent {
let reblogLink = ''; let reblogLink = '';
let reblogIcon = 'retweet'; let reblogIcon = 'retweet';
let favouriteLink = ''; let favouriteLink = '';
let edited = '';
if (this.props.measureHeight) { if (this.props.measureHeight) {
outerStyle.height = `${this.state.height}px`; outerStyle.height = `${this.state.height}px`;
@ -237,6 +238,15 @@ class DetailedStatus extends ImmutablePureComponent {
); );
} }
if (status.get('edited_at')) {
edited = (
<React.Fragment>
<React.Fragment> · </React.Fragment>
<FormattedMessage id='status.edited' defaultMessage='Edited {date}' values={{ date: intl.formatDate(status.get('edited_at'), { hour12: false, month: 'short', day: '2-digit', hour: '2-digit', minute: '2-digit' }) }} />
</React.Fragment>
);
}
return ( return (
<div style={outerStyle}> <div style={outerStyle}>
<div ref={this.setRef} className={classNames('detailed-status', `detailed-status-${status.get('visibility')}`, { compact })}> <div ref={this.setRef} className={classNames('detailed-status', `detailed-status-${status.get('visibility')}`, { compact })}>
@ -252,7 +262,7 @@ class DetailedStatus extends ImmutablePureComponent {
<div className='detailed-status__meta'> <div className='detailed-status__meta'>
<a className='detailed-status__datetime' href={status.get('url')} target='_blank' rel='noopener noreferrer'> <a className='detailed-status__datetime' href={status.get('url')} target='_blank' rel='noopener noreferrer'>
<FormattedDate value={new Date(status.get('created_at'))} hour12={false} year='numeric' month='short' day='2-digit' hour='2-digit' minute='2-digit' /> <FormattedDate value={new Date(status.get('created_at'))} hour12={false} year='numeric' month='short' day='2-digit' hour='2-digit' minute='2-digit' />
</a>{visibilityLink}{applicationLink}{reblogLink} · {favouriteLink} </a>{edited}{visibilityLink}{applicationLink}{reblogLink} · {favouriteLink}
</div> </div>
</div> </div>
</div> </div>

@ -967,6 +967,17 @@
} }
} }
.status__content__edited-label {
display: block;
cursor: default;
font-size: 15px;
line-height: 20px;
padding: 0;
padding-top: 8px;
color: $dark-text-color;
font-weight: 500;
}
.status__content__spoiler-link { .status__content__spoiler-link {
display: inline-block; display: inline-block;
border-radius: 2px; border-radius: 2px;

@ -94,49 +94,6 @@ class ActivityPub::Activity
equals_or_includes_any?(@object['type'], CONVERTED_TYPES) equals_or_includes_any?(@object['type'], CONVERTED_TYPES)
end end
def distribute(status)
crawl_links(status)
notify_about_reblog(status) if reblog_of_local_account?(status) && !reblog_by_following_group_account?(status)
notify_about_mentions(status)
# Only continue if the status is supposed to have arrived in real-time.
# Note that if @options[:override_timestamps] isn't set, the status
# may have a lower snowflake id than other existing statuses, potentially
# "hiding" it from paginated API calls
return unless @options[:override_timestamps] || status.within_realtime_window?
distribute_to_followers(status)
end
def reblog_of_local_account?(status)
status.reblog? && status.reblog.account.local?
end
def reblog_by_following_group_account?(status)
status.reblog? && status.account.group? && status.reblog.account.following?(status.account)
end
def notify_about_reblog(status)
NotifyService.new.call(status.reblog.account, :reblog, status)
end
def notify_about_mentions(status)
status.active_mentions.includes(:account).each do |mention|
next unless mention.account.local? && audience_includes?(mention.account)
NotifyService.new.call(mention.account, :mention, mention)
end
end
def crawl_links(status)
# Spread out crawling randomly to avoid DDoSing the link
LinkCrawlWorker.perform_in(rand(1..59).seconds, status.id)
end
def distribute_to_followers(status)
::DistributionWorker.perform_async(status.id)
end
def delete_arrived_first?(uri) def delete_arrived_first?(uri)
redis.exists?("delete_upon_arrival:#{@account.id}:#{uri}") redis.exists?("delete_upon_arrival:#{@account.id}:#{uri}")
end end

@ -25,7 +25,7 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
Trends.tags.register(@status) Trends.tags.register(@status)
Trends.links.register(@status) Trends.links.register(@status)
distribute(@status) distribute
end end
@status @status
@ -33,6 +33,22 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
private private
def distribute
# Notify the author of the original status if that status is local
NotifyService.new.call(@status.reblog.account, :reblog, @status) if reblog_of_local_account?(@status) && !reblog_by_following_group_account?(@status)
# Distribute into home and list feeds
::DistributionWorker.perform_async(@status.id) if @options[:override_timestamps] || @status.within_realtime_window?
end
def reblog_of_local_account?(status)
status.reblog? && status.reblog.account.local?
end
def reblog_by_following_group_account?(status)
status.reblog? && status.account.group? && status.reblog.account.following?(status.account)
end
def audience_to def audience_to
as_array(@json['to']).map { |x| value_or_id(x) } as_array(@json['to']).map { |x| value_or_id(x) }
end end

@ -71,6 +71,7 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def process_status def process_status
@tags = [] @tags = []
@mentions = [] @mentions = []
@silenced_account_ids = []
@params = {} @params = {}
process_status_params process_status_params
@ -84,10 +85,18 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
resolve_thread(@status) resolve_thread(@status)
fetch_replies(@status) fetch_replies(@status)
distribute(@status) distribute
forward_for_reply forward_for_reply
end end
def distribute
# Spread out crawling randomly to avoid DDoSing the link
LinkCrawlWorker.perform_in(rand(1..59).seconds, @status.id)
# Distribute into home and list feeds and notify mentioned accounts
::DistributionWorker.perform_async(@status.id, silenced_account_ids: @silenced_account_ids) if @options[:override_timestamps] || @status.within_realtime_window?
end
def find_existing_status def find_existing_status
status = status_from_uri(object_uri) status = status_from_uri(object_uri)
status ||= Status.find_by(uri: @object['atomUri']) if @object['atomUri'].present? status ||= Status.find_by(uri: @object['atomUri']) if @object['atomUri'].present?
@ -95,19 +104,22 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end end
def process_status_params def process_status_params
@status_parser = ActivityPub::Parser::StatusParser.new(@json, followers_collection: @account.followers_url)
@params = begin @params = begin
{ {
uri: object_uri, uri: @status_parser.uri,
url: object_url || object_uri, url: @status_parser.url || @status_parser.uri,
account: @account, account: @account,
text: text_from_content || '', text: converted_object_type? ? converted_text : (@status_parser.text || ''),
language: detected_language, language: @status_parser.language || detected_language,
spoiler_text: converted_object_type? ? '' : (text_from_summary || ''), spoiler_text: converted_object_type? ? '' : (@status_parser.spoiler_text || ''),
created_at: @object['published'], created_at: @status_parser.created_at,
edited_at: @status_parser.edited_at,
override_timestamps: @options[:override_timestamps], override_timestamps: @options[:override_timestamps],
reply: @object['inReplyTo'].present?, reply: @status_parser.reply,
sensitive: @account.sensitized? || @object['sensitive'] || false, sensitive: @account.sensitized? || @status_parser.sensitive || false,
visibility: visibility_from_audience, visibility: @status_parser.visibility,
thread: replied_to_status, thread: replied_to_status,
conversation: conversation_from_uri(@object['conversation']), conversation: conversation_from_uri(@object['conversation']),
media_attachment_ids: process_attachments.take(4).map(&:id), media_attachment_ids: process_attachments.take(4).map(&:id),
@ -117,50 +129,52 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end end
def process_audience def process_audience
(audience_to + audience_cc).uniq.each do |audience|
next if ActivityPub::TagManager.instance.public_collection?(audience)
# Unlike with tags, there is no point in resolving accounts we don't already # Unlike with tags, there is no point in resolving accounts we don't already
# know here, because silent mentions would only be used for local access # know here, because silent mentions would only be used for local access control anyway
# control anyway accounts_in_audience = (audience_to + audience_cc).uniq.filter_map do |audience|
account = account_from_uri(audience) account_from_uri(audience) unless ActivityPub::TagManager.instance.public_collection?(audience)
end
# If the payload was delivered to a specific inbox, the inbox owner must have
# access to it, unless they already have access to it anyway
if @options[:delivered_to_account_id]
accounts_in_audience << delivered_to_account
accounts_in_audience.uniq!
end
next if account.nil? || @mentions.any? { |mention| mention.account_id == account.id } accounts_in_audience.each do |account|
# This runs after tags are processed, and those translate into non-silent
# mentions, which take precedence
next if @mentions.any? { |mention| mention.account_id == account.id }
@mentions << Mention.new(account: account, silent: true) @mentions << Mention.new(account: account, silent: true)
# If there is at least one silent mention, then the status can be considered # If there is at least one silent mention, then the status can be considered
# as a limited-audience status, and not strictly a direct message, but only # as a limited-audience status, and not strictly a direct message, but only
# if we considered a direct message in the first place # if we considered a direct message in the first place
next unless @params[:visibility] == :direct && direct_message.nil? @params[:visibility] = :limited if @params[:visibility] == :direct && !@object['directMessage']
@params[:visibility] = :limited
end end
# If the payload was delivered to a specific inbox, the inbox owner must have # Accounts that are tagged but are not in the audience are not
# access to it, unless they already have access to it anyway # supposed to be notified explicitly
return if @options[:delivered_to_account_id].nil? || @mentions.any? { |mention| mention.account_id == @options[:delivered_to_account_id] } @silenced_account_ids = @mentions.map(&:account_id) - accounts_in_audience.map(&:id)
@mentions << Mention.new(account_id: @options[:delivered_to_account_id], silent: true)
return unless @params[:visibility] == :direct && direct_message.nil?
@params[:visibility] = :limited
end end
def postprocess_audience_and_deliver def postprocess_audience_and_deliver
return if @status.mentions.find_by(account_id: @options[:delivered_to_account_id]) return if @status.mentions.find_by(account_id: @options[:delivered_to_account_id])
delivered_to_account = Account.find(@options[:delivered_to_account_id])
@status.mentions.create(account: delivered_to_account, silent: true) @status.mentions.create(account: delivered_to_account, silent: true)
@status.update(visibility: :limited) if @status.direct_visibility? && direct_message.nil? @status.update(visibility: :limited) if @status.direct_visibility? && !@object['directMessage']
return unless delivered_to_account.following?(@account) return unless delivered_to_account.following?(@account)
FeedInsertWorker.perform_async(@status.id, delivered_to_account.id, :home) FeedInsertWorker.perform_async(@status.id, delivered_to_account.id, :home)
end end
def delivered_to_account
@delivered_to_account ||= Account.find(@options[:delivered_to_account_id])
end
def attach_tags(status) def attach_tags(status)
@tags.each do |tag| @tags.each do |tag|
status.tags << tag status.tags << tag
@ -215,22 +229,23 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
def process_emoji(tag) def process_emoji(tag)
return if skip_download? return if skip_download?
return if tag['name'].blank? || tag['icon'].blank? || tag['icon']['url'].blank?
shortcode = tag['name'].delete(':') custom_emoji_parser = ActivityPub::Parser::CustomEmojiParser.new(tag)
image_url = tag['icon']['url']
uri = tag['id'] return if custom_emoji_parser.shortcode.blank? || custom_emoji_parser.image_remote_url.blank?
updated = tag['updated']
emoji = CustomEmoji.find_by(shortcode: shortcode, domain: @account.domain) emoji = CustomEmoji.find_by(shortcode: custom_emoji_parser.shortcode, domain: @account.domain)
return unless emoji.nil? || image_url != emoji.image_remote_url || (updated && updated >= emoji.updated_at) return unless emoji.nil? || custom_emoji_parser.image_remote_url != emoji.image_remote_url || (custom_emoji_parser.updated_at && custom_emoji_parser.updated_at >= emoji.updated_at)
emoji ||= CustomEmoji.new(domain: @account.domain, shortcode: shortcode, uri: uri) begin
emoji.image_remote_url = image_url emoji ||= CustomEmoji.new(domain: @account.domain, shortcode: custom_emoji_parser.shortcode, uri: custom_emoji_parser.uri)
emoji.image_remote_url = custom_emoji_parser.image_remote_url
emoji.save emoji.save
rescue Seahorse::Client::NetworkingError => e rescue Seahorse::Client::NetworkingError => e
Rails.logger.warn "Error storing emoji: #{e}" Rails.logger.warn "Error storing emoji: #{e}"
end end
end
def process_attachments def process_attachments
return [] if @object['attachment'].nil? return [] if @object['attachment'].nil?
@ -238,14 +253,23 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
media_attachments = [] media_attachments = []
as_array(@object['attachment']).each do |attachment| as_array(@object['attachment']).each do |attachment|
next if attachment['url'].blank? || media_attachments.size >= 4 media_attachment_parser = ActivityPub::Parser::MediaAttachmentParser.new(attachment)
next if media_attachment_parser.remote_url.blank? || media_attachments.size >= 4
begin begin
href = Addressable::URI.parse(attachment['url']).normalize.to_s media_attachment = MediaAttachment.create(
media_attachment = MediaAttachment.create(account: @account, remote_url: href, thumbnail_remote_url: icon_url_from_attachment(attachment), description: attachment['summary'].presence || attachment['name'].presence, focus: attachment['focalPoint'], blurhash: supported_blurhash?(attachment['blurhash']) ? attachment['blurhash'] : nil) account: @account,
remote_url: media_attachment_parser.remote_url,
thumbnail_remote_url: media_attachment_parser.thumbnail_remote_url,
description: media_attachment_parser.description,
focus: media_attachment_parser.focus,
blurhash: media_attachment_parser.blurhash
)
media_attachments << media_attachment media_attachments << media_attachment
next if unsupported_media_type?(attachment['mediaType']) || skip_download? next if unsupported_media_type?(media_attachment_parser.file_content_type) || skip_download?
media_attachment.download_file! media_attachment.download_file!
media_attachment.download_thumbnail! media_attachment.download_thumbnail!
@ -263,42 +287,17 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
media_attachments media_attachments
end end
def icon_url_from_attachment(attachment)
url = attachment['icon'].is_a?(Hash) ? attachment['icon']['url'] : attachment['icon']
Addressable::URI.parse(url).normalize.to_s if url.present?
rescue Addressable::URI::InvalidURIError
nil
end
def process_poll def process_poll
return unless @object['type'] == 'Question' && (@object['anyOf'].is_a?(Array) || @object['oneOf'].is_a?(Array)) poll_parser = ActivityPub::Parser::PollParser.new(@object)
expires_at = begin return unless poll_parser.valid?
if @object['closed'].is_a?(String)
@object['closed']
elsif !@object['closed'].nil? && !@object['closed'].is_a?(FalseClass)
Time.now.utc
else
@object['endTime']
end
end
if @object['anyOf'].is_a?(Array)
multiple = true
items = @object['anyOf']
else
multiple = false
items = @object['oneOf']
end
voters_count = @object['votersCount']
@account.polls.new( @account.polls.new(
multiple: multiple, multiple: poll_parser.multiple,
expires_at: expires_at, expires_at: poll_parser.expires_at,
options: items.map { |item| item['name'].presence || item['content'] }.compact, options: poll_parser.options,
cached_tallies: items.map { |item| item.dig('replies', 'totalItems') || 0 }, cached_tallies: poll_parser.cached_tallies,
voters_count: voters_count voters_count: poll_parser.voters_count
) )
end end
@ -351,29 +350,6 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
end end
end end
def visibility_from_audience
if audience_to.any? { |to| ActivityPub::TagManager.instance.public_collection?(to) }
:public
elsif audience_cc.any? { |cc| ActivityPub::TagManager.instance.public_collection?(cc) }
:unlisted
elsif audience_to.include?(@account.followers_url)
:private
elsif direct_message == false
:limited
else
:direct
end
end
def audience_includes?(account)
uri = ActivityPub::TagManager.instance.uri_for(account)
audience_to.include?(uri) || audience_cc.include?(uri)
end
def direct_message
@object['directMessage']
end
def replied_to_status def replied_to_status
return @replied_to_status if defined?(@replied_to_status) return @replied_to_status if defined?(@replied_to_status)
@ -390,81 +366,18 @@ class ActivityPub::Activity::Create < ActivityPub::Activity
value_or_id(@object['inReplyTo']) value_or_id(@object['inReplyTo'])
end end
def text_from_content def converted_text
return Formatter.instance.linkify([[text_from_name, text_from_summary.presence].compact.join("\n\n"), object_url || object_uri].join(' ')) if converted_object_type? Formatter.instance.linkify([@status_parser.title.presence, @status_parser.spoiler_text.presence, @status_parser.url || @status_parser.uri].compact.join("\n\n"))
if @object['content'].present?
@object['content']
elsif content_language_map?
@object['contentMap'].values.first
end
end
def text_from_summary
if @object['summary'].present?
@object['summary']
elsif summary_language_map?
@object['summaryMap'].values.first
end
end
def text_from_name
if @object['name'].present?
@object['name']
elsif name_language_map?
@object['nameMap'].values.first
end
end end
def detected_language def detected_language
if content_language_map? LanguageDetector.instance.detect(@status_parser.text, @account) if supported_object_type?
@object['contentMap'].keys.first
elsif name_language_map?
@object['nameMap'].keys.first
elsif summary_language_map?
@object['summaryMap'].keys.first
elsif supported_object_type?
LanguageDetector.instance.detect(text_from_content, @account)
end
end
def object_url
return if @object['url'].blank?
url_candidate = url_to_href(@object['url'], 'text/html')
if invalid_origin?(url_candidate)
nil
else
url_candidate
end
end
def summary_language_map?
@object['summaryMap'].is_a?(Hash) && !@object['summaryMap'].empty?
end
def content_language_map?
@object['contentMap'].is_a?(Hash) && !@object['contentMap'].empty?
end
def name_language_map?
@object['nameMap'].is_a?(Hash) && !@object['nameMap'].empty?
end end
def unsupported_media_type?(mime_type) def unsupported_media_type?(mime_type)
mime_type.present? && !MediaAttachment.supported_mime_types.include?(mime_type) mime_type.present? && !MediaAttachment.supported_mime_types.include?(mime_type)
end end
def supported_blurhash?(blurhash)
components = blurhash.blank? || !blurhash_valid_chars?(blurhash) ? nil : Blurhash.components(blurhash)
components.present? && components.none? { |comp| comp > 5 }
end
def blurhash_valid_chars?(blurhash)
/^[\w#$%*+-.:;=?@\[\]^{|}~]+$/.match?(blurhash)
end
def skip_download? def skip_download?
return @skip_download if defined?(@skip_download) return @skip_download if defined?(@skip_download)

@ -1,32 +1,31 @@
# frozen_string_literal: true # frozen_string_literal: true
class ActivityPub::Activity::Update < ActivityPub::Activity class ActivityPub::Activity::Update < ActivityPub::Activity
SUPPORTED_TYPES = %w(Application Group Organization Person Service).freeze
def perform def perform
dereference_object! dereference_object!
if equals_or_includes_any?(@object['type'], SUPPORTED_TYPES) if equals_or_includes_any?(@object['type'], %w(Application Group Organization Person Service))
update_account update_account
elsif equals_or_includes_any?(@object['type'], %w(Question)) elsif equals_or_includes_any?(@object['type'], %w(Note Question))
update_poll update_status
end end
end end
private private
def update_account def update_account
return if @account.uri != object_uri return reject_payload! if @account.uri != object_uri
ActivityPub::ProcessAccountService.new.call(@account.username, @account.domain, @object, signed_with_known_key: true) ActivityPub::ProcessAccountService.new.call(@account.username, @account.domain, @object, signed_with_known_key: true)
end end
def update_poll def update_status
return reject_payload! if invalid_origin?(@object['id']) return reject_payload! if invalid_origin?(@object['id'])
status = Status.find_by(uri: object_uri, account_id: @account.id) status = Status.find_by(uri: object_uri, account_id: @account.id)
return if status.nil? || status.preloadable_poll.nil?
ActivityPub::ProcessPollService.new.call(status.preloadable_poll, @object) return if status.nil?
ActivityPub::ProcessStatusUpdateService.new.call(status, @object)
end end
end end

@ -0,0 +1,27 @@
# frozen_string_literal: true
class ActivityPub::Parser::CustomEmojiParser
include JsonLdHelper
def initialize(json)
@json = json
end
def uri
@json['id']
end
def shortcode
@json['name']&.delete(':')
end
def image_remote_url
@json.dig('icon', 'url')
end
def updated_at
@json['updated']&.to_datetime
rescue ArgumentError
nil
end
end

@ -0,0 +1,58 @@
# frozen_string_literal: true
class ActivityPub::Parser::MediaAttachmentParser
include JsonLdHelper
def initialize(json)
@json = json
end
# @param [MediaAttachment] previous_record
def significantly_changes?(previous_record)
remote_url != previous_record.remote_url ||
thumbnail_remote_url != previous_record.thumbnail_remote_url ||
description != previous_record.description
end
def remote_url
Addressable::URI.parse(@json['url'])&.normalize&.to_s
rescue Addressable::URI::InvalidURIError
nil
end
def thumbnail_remote_url
Addressable::URI.parse(@json['icon'].is_a?(Hash) ? @json['icon']['url'] : @json['icon'])&.normalize&.to_s
rescue Addressable::URI::InvalidURIError
nil
end
def description
@json['summary'].presence || @json['name'].presence
end
def focus
@json['focalPoint']
end
def blurhash
supported_blurhash? ? @json['blurhash'] : nil
end
def file_content_type
@json['mediaType']
end
private
def supported_blurhash?
components = begin
blurhash = @json['blurhash']
if blurhash.present? && /^[\w#$%*+-.:;=?@\[\]^{|}~]+$/.match?(blurhash)
Blurhash.components(blurhash)
end
end
components.present? && components.none? { |comp| comp > 5 }
end
end

@ -0,0 +1,53 @@
# frozen_string_literal: true
class ActivityPub::Parser::PollParser
include JsonLdHelper
def initialize(json)
@json = json
end
def valid?
equals_or_includes?(@json['type'], 'Question') && items.is_a?(Array)
end
# @param [Poll] previous_record
def significantly_changes?(previous_record)
options != previous_record.options ||
multiple != previous_record.multiple
end
def options
items.filter_map { |item| item['name'].presence || item['content'] }
end
def multiple
@json['anyOf'].is_a?(Array)
end
def expires_at
if @json['closed'].is_a?(String)
@json['closed'].to_datetime
elsif !@json['closed'].nil? && !@json['closed'].is_a?(FalseClass)
Time.now.utc
else
@json['endTime']&.to_datetime
end
rescue ArgumentError
nil
end
def voters_count
@json['votersCount']
end
def cached_tallies
items.map { |item| item.dig('replies', 'totalItems') || 0 }
end
private
def items
@json['anyOf'] || @json['oneOf']
end
end

@ -0,0 +1,124 @@
# frozen_string_literal: true
class ActivityPub::Parser::StatusParser
include JsonLdHelper
# @param [Hash] json
# @param [Hash] magic_values
# @option magic_values [String] :followers_collection
def initialize(json, magic_values = {})
@json = json
@object = json['object'] || json
@magic_values = magic_values
end
def uri
id = @object['id']
if id&.start_with?('bear:')
Addressable::URI.parse(id).query_values['u']
else
id
end
rescue Addressable::URI::InvalidURIError
id
end
def url
url_to_href(@object['url'], 'text/html') if @object['url'].present?
end
def text
if @object['content'].present?
@object['content']
elsif content_language_map?
@object['contentMap'].values.first
end
end
def spoiler_text
if @object['summary'].present?
@object['summary']
elsif summary_language_map?
@object['summaryMap'].values.first
end
end
def title
if @object['name'].present?
@object['name']
elsif name_language_map?
@object['nameMap'].values.first
end
end
def created_at
@object['published']&.to_datetime
rescue ArgumentError
nil
end
def edited_at
@object['updated']&.to_datetime
rescue ArgumentError
nil
end
def reply
@object['inReplyTo'].present?
end
def sensitive
@object['sensitive']
end
def visibility
if audience_to.any? { |to| ActivityPub::TagManager.instance.public_collection?(to) }
:public
elsif audience_cc.any? { |cc| ActivityPub::TagManager.instance.public_collection?(cc) }
:unlisted
elsif audience_to.include?(@magic_values[:followers_collection])
:private
elsif direct_message == false
:limited
else
:direct
end
end
def language
if content_language_map?
@object['contentMap'].keys.first
elsif name_language_map?
@object['nameMap'].keys.first
elsif summary_language_map?
@object['summaryMap'].keys.first
end
end
def direct_message
@object['directMessage']
end
private
def audience_to
as_array(@object['to'] || @json['to']).map { |x| value_or_id(x) }
end
def audience_cc
as_array(@object['cc'] || @json['cc']).map { |x| value_or_id(x) }
end
def summary_language_map?
@object['summaryMap'].is_a?(Hash) && !@object['summaryMap'].empty?
end
def content_language_map?
@object['contentMap'].is_a?(Hash) && !@object['contentMap'].empty?
end
def name_language_map?
@object['nameMap'].is_a?(Hash) && !@object['nameMap'].empty?
end
end

@ -55,46 +55,50 @@ class FeedManager
# Add a status to a home feed and send a streaming API update # Add a status to a home feed and send a streaming API update
# @param [Account] account # @param [Account] account
# @param [Status] status # @param [Status] status
# @param [Boolean] update
# @return [Boolean] # @return [Boolean]
def push_to_home(account, status) def push_to_home(account, status, update: false)
return false unless add_to_feed(:home, account.id, status, account.user&.aggregates_reblogs?) return false unless add_to_feed(:home, account.id, status, account.user&.aggregates_reblogs?)
trim(:home, account.id) trim(:home, account.id)
PushUpdateWorker.perform_async(account.id, status.id, "timeline:#{account.id}") if push_update_required?("timeline:#{account.id}") PushUpdateWorker.perform_async(account.id, status.id, "timeline:#{account.id}", update: update) if push_update_required?("timeline:#{account.id}")
true true
end end
# Remove a status from a home feed and send a streaming API update # Remove a status from a home feed and send a streaming API update
# @param [Account] account # @param [Account] account
# @param [Status] status # @param [Status] status
# @param [Boolean] update
# @return [Boolean] # @return [Boolean]
def unpush_from_home(account, status) def unpush_from_home(account, status, update: false)
return false unless remove_from_feed(:home, account.id, status, account.user&.aggregates_reblogs?) return false unless remove_from_feed(:home, account.id, status, account.user&.aggregates_reblogs?)
redis.publish("timeline:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) redis.publish("timeline:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) unless update
true true
end end
# Add a status to a list feed and send a streaming API update # Add a status to a list feed and send a streaming API update
# @param [List] list # @param [List] list
# @param [Status] status # @param [Status] status
# @param [Boolean] update
# @return [Boolean] # @return [Boolean]
def push_to_list(list, status) def push_to_list(list, status, update: false)
return false if filter_from_list?(status, list) || !add_to_feed(:list, list.id, status, list.account.user&.aggregates_reblogs?) return false if filter_from_list?(status, list) || !add_to_feed(:list, list.id, status, list.account.user&.aggregates_reblogs?)
trim(:list, list.id) trim(:list, list.id)
PushUpdateWorker.perform_async(list.account_id, status.id, "timeline:list:#{list.id}") if push_update_required?("timeline:list:#{list.id}") PushUpdateWorker.perform_async(list.account_id, status.id, "timeline:list:#{list.id}", update: update) if push_update_required?("timeline:list:#{list.id}")
true true
end end
# Remove a status from a list feed and send a streaming API update # Remove a status from a list feed and send a streaming API update
# @param [List] list # @param [List] list
# @param [Status] status # @param [Status] status
# @param [Boolean] update
# @return [Boolean] # @return [Boolean]
def unpush_from_list(list, status) def unpush_from_list(list, status, update: false)
return false unless remove_from_feed(:list, list.id, status, list.account.user&.aggregates_reblogs?) return false unless remove_from_feed(:list, list.id, status, list.account.user&.aggregates_reblogs?)
redis.publish("timeline:list:#{list.id}", Oj.dump(event: :delete, payload: status.id.to_s)) redis.publish("timeline:list:#{list.id}", Oj.dump(event: :delete, payload: status.id.to_s)) unless update
true true
end end
@ -102,11 +106,11 @@ class FeedManager
# @param [Account] account # @param [Account] account
# @param [Status] status # @param [Status] status
# @return [Boolean] # @return [Boolean]
def push_to_direct(account, status) def push_to_direct(account, status, update: false)
return false unless add_to_feed(:direct, account.id, status) return false unless add_to_feed(:direct, account.id, status)
trim(:direct, account.id) trim(:direct, account.id)
PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}") PushUpdateWorker.perform_async(account.id, status.id, "timeline:direct:#{account.id}") unless update
true true
end end
@ -114,10 +118,10 @@ class FeedManager
# @param [List] list # @param [List] list
# @param [Status] status # @param [Status] status
# @return [Boolean] # @return [Boolean]
def unpush_from_direct(account, status) def unpush_from_direct(account, status, update: false)
return false unless remove_from_feed(:direct, account.id, status) return false unless remove_from_feed(:direct, account.id, status)
redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) redis.publish("timeline:direct:#{account.id}", Oj.dump(event: :delete, payload: status.id.to_s)) unless update
true true
end end

@ -1,8 +1,12 @@
# frozen_string_literal: true # frozen_string_literal: true
class StatusReachFinder class StatusReachFinder
def initialize(status) # @param [Status] status
# @param [Hash] options
# @option options [Boolean] :unsafe
def initialize(status, options = {})
@status = status @status = status
@options = options
end end
def inboxes def inboxes
@ -38,7 +42,7 @@ class StatusReachFinder
end end
def replied_to_account_id def replied_to_account_id
@status.in_reply_to_account_id @status.in_reply_to_account_id if distributable?
end end
def reblog_of_account_id def reblog_of_account_id
@ -49,21 +53,26 @@ class StatusReachFinder
@status.mentions.pluck(:account_id) @status.mentions.pluck(:account_id)
end end
# Beware: Reblogs can be created without the author having had access to the status
def reblogs_account_ids def reblogs_account_ids
@status.reblogs.pluck(:account_id) @status.reblogs.pluck(:account_id) if distributable? || unsafe?
end end
# Beware: Favourites can be created without the author having had access to the status
def favourites_account_ids def favourites_account_ids
@status.favourites.pluck(:account_id) @status.favourites.pluck(:account_id) if distributable? || unsafe?
end end
# Beware: Replies can be created without the author having had access to the status
def replies_account_ids def replies_account_ids
@status.replies.pluck(:account_id) @status.replies.pluck(:account_id) if distributable? || unsafe?
end end
def followers_inboxes def followers_inboxes
if @status.in_reply_to_local_account? && @status.distributable? if @status.in_reply_to_local_account? && distributable?
@status.account.followers.or(@status.thread.account.followers).inboxes @status.account.followers.or(@status.thread.account.followers).inboxes
elsif @status.direct_visibility? || @status.limited_visibility?
[]
else else
@status.account.followers.inboxes @status.account.followers.inboxes
end end
@ -76,4 +85,12 @@ class StatusReachFinder
[] []
end end
end end
def distributable?
@status.public_visibility? || @status.unlisted_visibility?
end
def unsafe?
@options[:unsafe]
end
end end

@ -26,6 +26,7 @@ class Poll < ApplicationRecord
belongs_to :status belongs_to :status
has_many :votes, class_name: 'PollVote', inverse_of: :poll, dependent: :delete_all has_many :votes, class_name: 'PollVote', inverse_of: :poll, dependent: :delete_all
has_many :voters, -> { group('accounts.id') }, through: :votes, class_name: 'Account', source: :account
has_many :notifications, as: :activity, dependent: :destroy has_many :notifications, as: :activity, dependent: :destroy

@ -26,6 +26,7 @@
# poll_id :bigint(8) # poll_id :bigint(8)
# content_type :string # content_type :string
# deleted_at :datetime # deleted_at :datetime
# edited_at :datetime
# #
class Status < ApplicationRecord class Status < ApplicationRecord
@ -59,6 +60,8 @@ class Status < ApplicationRecord
belongs_to :thread, foreign_key: 'in_reply_to_id', class_name: 'Status', inverse_of: :replies, optional: true belongs_to :thread, foreign_key: 'in_reply_to_id', class_name: 'Status', inverse_of: :replies, optional: true
belongs_to :reblog, foreign_key: 'reblog_of_id', class_name: 'Status', inverse_of: :reblogs, optional: true belongs_to :reblog, foreign_key: 'reblog_of_id', class_name: 'Status', inverse_of: :reblogs, optional: true
has_many :edits, class_name: 'StatusEdit', inverse_of: :status, dependent: :destroy
has_many :favourites, inverse_of: :status, dependent: :destroy has_many :favourites, inverse_of: :status, dependent: :destroy
has_many :bookmarks, inverse_of: :status, dependent: :destroy has_many :bookmarks, inverse_of: :status, dependent: :destroy
has_many :reblogs, foreign_key: 'reblog_of_id', class_name: 'Status', inverse_of: :reblog, dependent: :destroy has_many :reblogs, foreign_key: 'reblog_of_id', class_name: 'Status', inverse_of: :reblog, dependent: :destroy
@ -215,6 +218,10 @@ class Status < ApplicationRecord
public_visibility? || unlisted_visibility? public_visibility? || unlisted_visibility?
end end
def edited?
edited_at.present?
end
alias sign? distributable? alias sign? distributable?
def with_media? def with_media?

@ -0,0 +1,23 @@
# frozen_string_literal: true
# == Schema Information
#
# Table name: status_edits
#
# id :bigint(8) not null, primary key
# status_id :bigint(8) not null
# account_id :bigint(8)
# text :text default(""), not null
# spoiler_text :text default(""), not null
# media_attachments_changed :boolean default(FALSE), not null
# created_at :datetime not null
# updated_at :datetime not null
#
class StatusEdit < ApplicationRecord
belongs_to :status
belongs_to :account, optional: true
default_scope { order(id: :asc) }
delegate :local?, to: :status
end

@ -11,6 +11,7 @@ class ActivityPub::NoteSerializer < ActivityPub::Serializer
attribute :content attribute :content
attribute :content_map, if: :language? attribute :content_map, if: :language?
attribute :updated, if: :edited?
attribute :direct_message, if: :non_public? attribute :direct_message, if: :non_public?
@ -76,6 +77,8 @@ class ActivityPub::NoteSerializer < ActivityPub::Serializer
object.language.present? object.language.present?
end end
delegate :edited?, to: :object
def in_reply_to def in_reply_to
return unless object.reply? && !object.thread.nil? return unless object.reply? && !object.thread.nil?
@ -90,6 +93,10 @@ class ActivityPub::NoteSerializer < ActivityPub::Serializer
object.created_at.iso8601 object.created_at.iso8601
end end
def updated
object.edited_at.iso8601
end
def url def url
ActivityPub::TagManager.instance.url_for(object) ActivityPub::TagManager.instance.url_for(object)
end end

@ -0,0 +1,6 @@
# frozen_string_literal: true
class REST::StatusEditSerializer < ActiveModel::Serializer
attributes :text, :spoiler_text, :media_attachments_changed,
:created_at
end

@ -4,7 +4,7 @@ class REST::StatusSerializer < ActiveModel::Serializer
attributes :id, :created_at, :in_reply_to_id, :in_reply_to_account_id, attributes :id, :created_at, :in_reply_to_id, :in_reply_to_account_id,
:sensitive, :spoiler_text, :visibility, :language, :sensitive, :spoiler_text, :visibility, :language,
:uri, :url, :replies_count, :reblogs_count, :uri, :url, :replies_count, :reblogs_count,
:favourites_count :favourites_count, :edited_at
attribute :favourited, if: :current_user? attribute :favourited, if: :current_user?
attribute :reblogged, if: :current_user? attribute :reblogged, if: :current_user?

@ -0,0 +1,9 @@
# frozen_string_literal: true
class REST::StatusSourceSerializer < ActiveModel::Serializer
attributes :id, :text, :spoiler_text
def id
object.id.to_s
end
end

@ -8,6 +8,6 @@ class ActivityPub::FetchRemotePollService < BaseService
return unless supported_context?(json) return unless supported_context?(json)
ActivityPub::ProcessPollService.new.call(poll, json) ActivityPub::ProcessStatusUpdateService.new.call(poll.status, json)
end end
end end

@ -1,64 +0,0 @@
# frozen_string_literal: true
class ActivityPub::ProcessPollService < BaseService
include JsonLdHelper
def call(poll, json)
@json = json
return unless expected_type?
previous_expires_at = poll.expires_at
expires_at = begin
if @json['closed'].is_a?(String)
@json['closed']
elsif !@json['closed'].nil? && !@json['closed'].is_a?(FalseClass)
Time.now.utc
else
@json['endTime']
end
end
items = begin
if @json['anyOf'].is_a?(Array)
@json['anyOf']
else
@json['oneOf']
end
end
voters_count = @json['votersCount']
latest_options = items.filter_map { |item| item['name'].presence || item['content'] }
# If for some reasons the options were changed, it invalidates all previous
# votes, so we need to remove them
poll.votes.delete_all if latest_options != poll.options
begin
poll.update!(
last_fetched_at: Time.now.utc,
expires_at: expires_at,
options: latest_options,
cached_tallies: items.map { |item| item.dig('replies', 'totalItems') || 0 },
voters_count: voters_count
)
rescue ActiveRecord::StaleObjectError
poll.reload
retry
end
# If the poll had no expiration date set but now has, and people have voted,
# schedule a notification.
if previous_expires_at.nil? && poll.expires_at.present? && poll.votes.exists?
PollExpirationNotifyWorker.perform_at(poll.expires_at + 5.minutes, poll.id)
end
end
private
def expected_type?
equals_or_includes_any?(@json['type'], %w(Question))
end
end

@ -0,0 +1,275 @@
# frozen_string_literal: true
class ActivityPub::ProcessStatusUpdateService < BaseService
include JsonLdHelper
def call(status, json)
@json = json
@status_parser = ActivityPub::Parser::StatusParser.new(@json)
@uri = @status_parser.uri
@status = status
@account = status.account
@media_attachments_changed = false
# Only native types can be updated at the moment
return if !expected_type? || already_updated_more_recently?
# Only allow processing one create/update per status at a time
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
Status.transaction do
create_previous_edit!
update_media_attachments!
update_poll!
update_immediate_attributes!
update_metadata!
create_edit!
end
queue_poll_notifications!
reset_preview_card!
broadcast_updates!
else
raise Mastodon::RaceConditionError
end
end
end
private
def update_media_attachments!
previous_media_attachments = @status.media_attachments.to_a
next_media_attachments = []
as_array(@json['attachment']).each do |attachment|
media_attachment_parser = ActivityPub::Parser::MediaAttachmentParser.new(attachment)
next if media_attachment_parser.remote_url.blank? || next_media_attachments.size > 4
begin
media_attachment = previous_media_attachments.find { |previous_media_attachment| previous_media_attachment.remote_url == media_attachment_parser.remote_url }
media_attachment ||= MediaAttachment.new(account: @account, remote_url: media_attachment_parser.remote_url)
# If a previously existing media attachment was significantly updated, mark
# media attachments as changed even if none were added or removed
if media_attachment_parser.significantly_changes?(media_attachment)
@media_attachments_changed = true
end
media_attachment.description = media_attachment_parser.description
media_attachment.focus = media_attachment_parser.focus
media_attachment.thumbnail_remote_url = media_attachment_parser.thumbnail_remote_url
media_attachment.blurhash = media_attachment_parser.blurhash
media_attachment.save!
next_media_attachments << media_attachment
next if unsupported_media_type?(media_attachment_parser.file_content_type) || skip_download?
RedownloadMediaWorker.perform_async(media_attachment.id) if media_attachment.remote_url_previously_changed? || media_attachment.thumbnail_remote_url_previously_changed?
rescue Addressable::URI::InvalidURIError => e
Rails.logger.debug "Invalid URL in attachment: #{e}"
end
end
removed_media_attachments = previous_media_attachments - next_media_attachments
added_media_attachments = next_media_attachments - previous_media_attachments
MediaAttachment.where(id: removed_media_attachments.map(&:id)).update_all(status_id: nil)
MediaAttachment.where(id: added_media_attachments.map(&:id)).update_all(status_id: @status.id)
@media_attachments_changed = true if removed_media_attachments.any? || added_media_attachments.any?
end
def update_poll!
previous_poll = @status.preloadable_poll
@previous_expires_at = previous_poll&.expires_at
poll_parser = ActivityPub::Parser::PollParser.new(@json)
if poll_parser.valid?
poll = previous_poll || @account.polls.new(status: @status)
# If for some reasons the options were changed, it invalidates all previous
# votes, so we need to remove them
if poll_parser.significantly_changes?(poll)
@media_attachments_changed = true
poll.votes.delete_all unless poll.new_record?
end
poll.last_fetched_at = Time.now.utc
poll.options = poll_parser.options
poll.multiple = poll_parser.multiple
poll.expires_at = poll_parser.expires_at
poll.voters_count = poll_parser.voters_count
poll.cached_tallies = poll_parser.cached_tallies
poll.save!
@status.poll_id = poll.id
elsif previous_poll.present?
previous_poll.destroy!
@media_attachments_changed = true
@status.poll_id = nil
end
end
def update_immediate_attributes!
@status.text = @status_parser.text || ''
@status.spoiler_text = @status_parser.spoiler_text || ''
@status.sensitive = @account.sensitized? || @status_parser.sensitive || false
@status.language = @status_parser.language || detected_language
@status.edited_at = @status_parser.edited_at || Time.now.utc
@status.save!
end
def update_metadata!
@raw_tags = []
@raw_mentions = []
@raw_emojis = []
as_array(@json['tag']).each do |tag|
if equals_or_includes?(tag['type'], 'Hashtag')
@raw_tags << tag['name']
elsif equals_or_includes?(tag['type'], 'Mention')
@raw_mentions << tag['href']
elsif equals_or_includes?(tag['type'], 'Emoji')
@raw_emojis << tag
end
end
update_tags!
update_mentions!
update_emojis!
end
def update_tags!
@status.tags = Tag.find_or_create_by_names(@raw_tags)
end
def update_mentions!
previous_mentions = @status.active_mentions.includes(:account).to_a
current_mentions = []
@raw_mentions.each do |href|
next if href.blank?
account = ActivityPub::TagManager.instance.uri_to_resource(href, Account)
account ||= ActivityPub::FetchRemoteAccountService.new.call(href)
next if account.nil?
mention = previous_mentions.find { |x| x.account_id == account.id }
mention ||= account.mentions.new(status: @status)
current_mentions << mention
end
current_mentions.each do |mention|
mention.save if mention.new_record?
end
# If previous mentions are no longer contained in the text, convert them
# to silent mentions, since withdrawing access from someone who already
# received a notification might be more confusing
removed_mentions = previous_mentions - current_mentions
Mention.where(id: removed_mentions.map(&:id)).update_all(silent: true) unless removed_mentions.empty?
end
def update_emojis!
return if skip_download?
@raw_emojis.each do |raw_emoji|
custom_emoji_parser = ActivityPub::Parser::CustomEmojiParser.new(raw_emoji)
next if custom_emoji_parser.shortcode.blank? || custom_emoji_parser.image_remote_url.blank?
emoji = CustomEmoji.find_by(shortcode: custom_emoji_parser.shortcode, domain: @account.domain)
next unless emoji.nil? || custom_emoji_parser.image_remote_url != emoji.image_remote_url || (custom_emoji_parser.updated_at && custom_emoji_parser.updated_at >= emoji.updated_at)
begin
emoji ||= CustomEmoji.new(domain: @account.domain, shortcode: custom_emoji_parser.shortcode, uri: custom_emoji_parser.uri)
emoji.image_remote_url = custom_emoji_parser.image_remote_url
emoji.save
rescue Seahorse::Client::NetworkingError => e
Rails.logger.warn "Error storing emoji: #{e}"
end
end
end
def expected_type?
equals_or_includes_any?(@json['type'], %w(Note Question))
end
def lock_options
{ redis: Redis.current, key: "create:#{@uri}", autorelease: 15.minutes.seconds }
end
def detected_language
LanguageDetector.instance.detect(@status_parser.text, @account)
end
def create_previous_edit!
# We only need to create a previous edit when no previous edits exist, e.g.
# when the status has never been edited. For other cases, we always create
# an edit, so the step can be skipped
return if @status.edits.any?
@status.edits.create(
text: @status.text,
spoiler_text: @status.spoiler_text,
media_attachments_changed: false,
account_id: @account.id,
created_at: @status.created_at
)
end
def create_edit!
return unless @status.text_previously_changed? || @status.spoiler_text_previously_changed? || @media_attachments_changed
@status_edit = @status.edits.create(
text: @status.text,
spoiler_text: @status.spoiler_text,
media_attachments_changed: @media_attachments_changed,
account_id: @account.id,
created_at: @status.edited_at
)
end
def skip_download?
return @skip_download if defined?(@skip_download)
@skip_download ||= DomainBlock.reject_media?(@account.domain)
end
def unsupported_media_type?(mime_type)
mime_type.present? && !MediaAttachment.supported_mime_types.include?(mime_type)
end
def already_updated_more_recently?
@status.edited_at.present? && @status_parser.edited_at.present? && @status.edited_at > @status_parser.edited_at
end
def reset_preview_card!
@status.preview_cards.clear if @status.text_previously_changed? || @status.spoiler_text.present?
LinkCrawlWorker.perform_in(rand(1..59).seconds, @status.id) if @status.spoiler_text.blank?
end
def broadcast_updates!
::DistributionWorker.perform_async(@status.id, update: true)
end
def queue_poll_notifications!
poll = @status.preloadable_poll
# If the poll had no expiration date set but now has, or now has a sooner
# expiration date, and people have voted, schedule a notification
return unless poll.present? && poll.expires_at.present? && poll.votes.exists?
PollExpirationNotifyWorker.remove_from_scheduled(poll.id) if @previous_expires_at.present? && @previous_expires_at > poll.expires_at
PollExpirationNotifyWorker.perform_at(poll.expires_at + 5.minutes, poll.id)
end
end

@ -3,118 +3,134 @@
class FanOutOnWriteService < BaseService class FanOutOnWriteService < BaseService
# Push a status into home and mentions feeds # Push a status into home and mentions feeds
# @param [Status] status # @param [Status] status
def call(status) # @param [Hash] options
raise Mastodon::RaceConditionError if status.visibility.nil? # @option options [Boolean] update
# @option options [Array<Integer>] silenced_account_ids
def call(status, options = {})
@status = status
@account = status.account
@options = options
deliver_to_self(status) if status.account.local? check_race_condition!
if status.direct_visibility? fan_out_to_local_recipients!
deliver_to_mentioned_followers(status) fan_out_to_public_streams! if broadcastable?
deliver_to_direct_timelines(status)
deliver_to_own_conversation(status)
elsif status.limited_visibility?
deliver_to_mentioned_followers(status)
else
deliver_to_followers(status)
deliver_to_lists(status)
end end
return if status.account.silenced? || !status.public_visibility? private
return if status.reblog? && !Setting.show_reblogs_in_public_timelines
render_anonymous_payload(status) def check_race_condition!
# I don't know why but at some point we had an issue where
# this service was being executed with status objects
# that had a null visibility - which should not be possible
# since the column in the database is not nullable.
#
# This check re-queues the service to be run at a later time
# with the full object, if something like it occurs
deliver_to_hashtags(status) raise Mastodon::RaceConditionError if @status.visibility.nil?
end
return if status.reply? && status.in_reply_to_account_id != status.account_id && !Setting.show_replies_in_public_timelines def fan_out_to_local_recipients!
deliver_to_self!
notify_mentioned_accounts!
deliver_to_public(status) case @status.visibility.to_sym
deliver_to_media(status) if status.media_attachments.any? when :public, :unlisted, :private
deliver_to_all_followers!
deliver_to_lists!
when :limited
deliver_to_mentioned_followers!
else
deliver_to_mentioned_followers!
deliver_to_conversation!
deliver_to_direct_timelines!
end
end end
private def fan_out_to_public_streams!
broadcast_to_hashtag_streams!
broadcast_to_public_streams!
end
def deliver_to_self(status) def deliver_to_self!
Rails.logger.debug "Delivering status #{status.id} to author" FeedManager.instance.push_to_home(@account, @status, update: update?) if @account.local?
FeedManager.instance.push_to_home(status.account, status) FeedManager.instance.push_to_direct(@account, @status, update: update?) if @account.local? && @status.direct_visibility?
FeedManager.instance.push_to_direct(status.account, status) if status.direct_visibility?
end end
def deliver_to_followers(status) def notify_mentioned_accounts!
Rails.logger.debug "Delivering status #{status.id} to followers" @status.active_mentions.where.not(id: @options[:silenced_account_ids] || []).joins(:account).merge(Account.local).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
LocalNotificationWorker.push_bulk(mentions) do |mention|
[mention.account_id, mention.id, 'Mention', :mention]
end
end
end
status.account.followers_for_local_distribution.select(:id).reorder(nil).find_in_batches do |followers| def deliver_to_all_followers!
@account.followers_for_local_distribution.select(:id).reorder(nil).find_in_batches do |followers|
FeedInsertWorker.push_bulk(followers) do |follower| FeedInsertWorker.push_bulk(followers) do |follower|
[status.id, follower.id, :home] [@status.id, follower.id, :home, update: update?]
end end
end end
end end
def deliver_to_lists(status) def deliver_to_lists!
Rails.logger.debug "Delivering status #{status.id} to lists" @account.lists_for_local_distribution.select(:id).reorder(nil).find_in_batches do |lists|
status.account.lists_for_local_distribution.select(:id).reorder(nil).find_in_batches do |lists|
FeedInsertWorker.push_bulk(lists) do |list| FeedInsertWorker.push_bulk(lists) do |list|
[status.id, list.id, :list] [@status.id, list.id, :list, update: update?]
end end
end end
end end
def deliver_to_mentioned_followers(status) def deliver_to_mentioned_followers!
Rails.logger.debug "Delivering status #{status.id} to limited followers" @status.mentions.joins(:account).merge(@account.followers_for_local_distribution).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
status.mentions.joins(:account).merge(status.account.followers_for_local_distribution).select(:id, :account_id).reorder(nil).find_in_batches do |mentions|
FeedInsertWorker.push_bulk(mentions) do |mention| FeedInsertWorker.push_bulk(mentions) do |mention|
[status.id, mention.account_id, :home] [@status.id, mention.account_id, :home, update: update?]
end end
end end
end end
def render_anonymous_payload(status) def deliver_to_direct_timelines!
@payload = InlineRenderer.render(status, nil, :status) FeedInsertWorker.push_bulk(@status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account|
@payload = Oj.dump(event: :update, payload: @payload) [@status.id, account.id, :direct, update: update?]
end end
def deliver_to_hashtags(status)
Rails.logger.debug "Delivering status #{status.id} to hashtags"
status.tags.pluck(:name).each do |hashtag|
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", @payload)
Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", @payload) if status.local?
end end
end
def deliver_to_public(status)
Rails.logger.debug "Delivering status #{status.id} to public timeline"
Redis.current.publish('timeline:public', @payload) def broadcast_to_hashtag_streams!
if status.local? @status.tags.pluck(:name).each do |hashtag|
Redis.current.publish('timeline:public:local', @payload) Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}", anonymous_payload)
else Redis.current.publish("timeline:hashtag:#{hashtag.mb_chars.downcase}:local", anonymous_payload) if @status.local?
Redis.current.publish('timeline:public:remote', @payload)
end end
end end
def deliver_to_media(status) def broadcast_to_public_streams!
Rails.logger.debug "Delivering status #{status.id} to media timeline" return if @status.reply? && @status.in_reply_to_account_id != @account.id && !Setting.show_replies_in_public_timelines
Redis.current.publish('timeline:public:media', @payload) Redis.current.publish('timeline:public', anonymous_payload)
if status.local? Redis.current.publish(@status.local? ? 'timeline:public:local' : 'timeline:public:remote', anonymous_payload)
Redis.current.publish('timeline:public:local:media', @payload)
else if @status.media_attachments.any?
Redis.current.publish('timeline:public:remote:media', @payload) Redis.current.publish('timeline:public:media', anonymous_payload)
Redis.current.publish(@status.local? ? 'timeline:public:local:media' : 'timeline:public:remote:media', anonymous_payload)
end end
end end
def deliver_to_direct_timelines(status) def deliver_to_conversation!
Rails.logger.debug "Delivering status #{status.id} to direct timelines" AccountConversation.add_status(@account, @status) unless update?
end
FeedInsertWorker.push_bulk(status.mentions.includes(:account).map(&:account).select { |mentioned_account| mentioned_account.local? }) do |account| def anonymous_payload
[status.id, account.id, :direct] @anonymous_payload ||= Oj.dump(
event: update? ? :'status.update' : :update,
payload: InlineRenderer.render(@status, nil, :status)
)
end end
def update?
@is_update
end end
def deliver_to_own_conversation(status) def broadcastable?
AccountConversation.add_status(status.account, status) @status.public_visibility? && !@account.silenced? && (!@status.reblog? || Setting.show_reblogs_in_public_timelines)
end end
end end

@ -8,12 +8,23 @@ class ProcessMentionsService < BaseService
# remote users # remote users
# @param [Status] status # @param [Status] status
def call(status) def call(status)
return unless status.local?
@status = status @status = status
mentions = []
status.text = status.text.gsub(Account::MENTION_RE) do |match| return unless @status.local?
@previous_mentions = @status.active_mentions.includes(:account).to_a
@current_mentions = []
Status.transaction do
scan_text!
assign_mentions!
end
end
private
def scan_text!
@status.text = @status.text.gsub(Account::MENTION_RE) do |match|
username, domain = Regexp.last_match(1).split('@') username, domain = Regexp.last_match(1).split('@')
domain = begin domain = begin
@ -26,49 +37,45 @@ class ProcessMentionsService < BaseService
mentioned_account = Account.find_remote(username, domain) mentioned_account = Account.find_remote(username, domain)
# If the account cannot be found or isn't the right protocol,
# first try to resolve it
if mention_undeliverable?(mentioned_account) if mention_undeliverable?(mentioned_account)
begin begin
mentioned_account = resolve_account_service.call(Regexp.last_match(1)) mentioned_account = ResolveAccountService.new.call(Regexp.last_match(1))
rescue Webfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError, Mastodon::UnexpectedResponseError rescue Webfinger::Error, HTTP::Error, OpenSSL::SSL::SSLError, Mastodon::UnexpectedResponseError
mentioned_account = nil mentioned_account = nil
end end
end end
# If after resolving it still isn't found or isn't the right
# protocol, then give up
next match if mention_undeliverable?(mentioned_account) || mentioned_account&.suspended? next match if mention_undeliverable?(mentioned_account) || mentioned_account&.suspended?
mention = mentioned_account.mentions.new(status: status) mention = @previous_mentions.find { |x| x.account_id == mentioned_account.id }
mentions << mention if mention.save mention ||= mentioned_account.mentions.new(status: @status)
@current_mentions << mention
"@#{mentioned_account.acct}" "@#{mentioned_account.acct}"
end end
status.save! @status.save!
mentions.each { |mention| create_notification(mention) }
end end
private def assign_mentions!
@current_mentions.each do |mention|
def mention_undeliverable?(mentioned_account) mention.save if mention.new_record?
mentioned_account.nil? || (!mentioned_account.local? && mentioned_account.ostatus?)
end end
def create_notification(mention) # If previous mentions are no longer contained in the text, convert them
mentioned_account = mention.account # to silent mentions, since withdrawing access from someone who already
# received a notification might be more confusing
removed_mentions = @previous_mentions - @current_mentions
if mentioned_account.local? Mention.where(id: removed_mentions.map(&:id)).update_all(silent: true) unless removed_mentions.empty?
LocalNotificationWorker.perform_async(mentioned_account.id, mention.id, mention.class.name, :mention)
elsif mentioned_account.activitypub? && !@status.local_only?
ActivityPub::DeliveryWorker.perform_async(activitypub_json, mention.status.account_id, mentioned_account.inbox_url, { synchronize_followers: !mention.status.distributable? })
end
end
def activitypub_json
return @activitypub_json if defined?(@activitypub_json)
@activitypub_json = Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
end end
def resolve_account_service def mention_undeliverable?(mentioned_account)
ResolveAccountService.new mentioned_account.nil? || (!mentioned_account.local? && !mentioned_account.activitypub?)
end end
end end

@ -89,7 +89,7 @@ class RemoveStatusService < BaseService
# the author and wouldn't normally receive the delete # the author and wouldn't normally receive the delete
# notification - so here, we explicitly send it to them # notification - so here, we explicitly send it to them
status_reach_finder = StatusReachFinder.new(@status) status_reach_finder = StatusReachFinder.new(@status, unsafe: true)
ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes) do |inbox_url| ActivityPub::DeliveryWorker.push_bulk(status_reach_finder.inboxes) do |inbox_url|
[signed_activity_json, @account.id, inbox_url] [signed_activity_json, @account.id, inbox_url]

@ -1,54 +1,32 @@
# frozen_string_literal: true # frozen_string_literal: true
class ActivityPub::DistributionWorker class ActivityPub::DistributionWorker < ActivityPub::RawDistributionWorker
include Sidekiq::Worker # Distribute a new status or an edit of a status to all the places
include Payloadable # where the status is supposed to go or where it was interacted with
sidekiq_options queue: 'push'
def perform(status_id) def perform(status_id)
@status = Status.find(status_id) @status = Status.find(status_id)
@account = @status.account @account = @status.account
return if skip_distribution? distribute!
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @account.id, inbox_url, { synchronize_followers: !@status.distributable? }]
end
relay! if relayable?
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
private protected
def skip_distribution?
@status.direct_visibility? || @status.limited_visibility?
end
def relayable?
@status.public_visibility?
end
def inboxes def inboxes
# Deliver the status to all followers. @inboxes ||= StatusReachFinder.new(@status).inboxes
# If the status is a reply to another local status, also forward it to that
# status' authors' followers.
@inboxes ||= if @status.in_reply_to_local_account? && @status.distributable?
@account.followers.or(@status.thread.account.followers).inboxes
else
@account.followers.inboxes
end
end end
def payload def payload
@payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @account)) @payload ||= Oj.dump(serialize_payload(activity, ActivityPub::ActivitySerializer, signer: @account))
end end
def relay! def activity
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url| ActivityPub::ActivityPresenter.from_status(@status)
[payload, @account.id, inbox_url]
end end
def options
{ synchronize_followers: @status.private_visibility? }
end end
end end

@ -2,22 +2,47 @@
class ActivityPub::RawDistributionWorker class ActivityPub::RawDistributionWorker
include Sidekiq::Worker include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push' sidekiq_options queue: 'push'
# Base worker for when you want to queue up a bunch of deliveries of
# some payload. In this case, we have already generated JSON and
# we are going to distribute it to the account's followers minus
# the explicitly provided inboxes
def perform(json, source_account_id, exclude_inboxes = []) def perform(json, source_account_id, exclude_inboxes = [])
@account = Account.find(source_account_id) @account = Account.find(source_account_id)
@json = json
@exclude_inboxes = exclude_inboxes
ActivityPub::DeliveryWorker.push_bulk(inboxes - exclude_inboxes) do |inbox_url| distribute!
[json, @account.id, inbox_url]
end
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
private protected
def distribute!
return if inboxes.empty?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, source_account_id, inbox_url, options]
end
end
def payload
@json
end
def source_account_id
@account.id
end
def inboxes def inboxes
@inboxes ||= @account.followers.inboxes @inboxes ||= @account.followers.inboxes - @exclude_inboxes
end
def options
nil
end end
end end

@ -1,34 +0,0 @@
# frozen_string_literal: true
# Obsolete but kept around to make sure existing jobs do not fail after upgrade.
# Should be removed in a subsequent release.
class ActivityPub::ReplyDistributionWorker
include Sidekiq::Worker
include Payloadable
sidekiq_options queue: 'push'
def perform(status_id)
@status = Status.find(status_id)
@account = @status.thread&.account
return unless @account.present? && @status.distributable?
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url|
[payload, @status.account_id, inbox_url]
end
rescue ActiveRecord::RecordNotFound
true
end
private
def inboxes
@inboxes ||= @account.followers.inboxes
end
def payload
@payload ||= Oj.dump(serialize_payload(ActivityPub::ActivityPresenter.from_status(@status), ActivityPub::ActivitySerializer, signer: @status.account))
end
end

@ -1,33 +1,24 @@
# frozen_string_literal: true # frozen_string_literal: true
class ActivityPub::UpdateDistributionWorker class ActivityPub::UpdateDistributionWorker < ActivityPub::RawDistributionWorker
include Sidekiq::Worker # Distribute an profile update to servers that might have a copy
include Payloadable # of the account in question
sidekiq_options queue: 'push'
def perform(account_id, options = {}) def perform(account_id, options = {})
@options = options.with_indifferent_access @options = options.with_indifferent_access
@account = Account.find(account_id) @account = Account.find(account_id)
ActivityPub::DeliveryWorker.push_bulk(inboxes) do |inbox_url| distribute!
[signed_payload, @account.id, inbox_url]
end
ActivityPub::DeliveryWorker.push_bulk(Relay.enabled.pluck(:inbox_url)) do |inbox_url|
[signed_payload, @account.id, inbox_url]
end
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
private protected
def inboxes def inboxes
@inboxes ||= @account.followers.inboxes @inboxes ||= AccountReachFinder.new(@account).inboxes
end end
def signed_payload def payload
@signed_payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with])) @payload ||= Oj.dump(serialize_payload(@account, ActivityPub::UpdateSerializer, signer: @account, sign_with: @options[:sign_with]))
end end
end end

@ -3,10 +3,10 @@
class DistributionWorker class DistributionWorker
include Sidekiq::Worker include Sidekiq::Worker
def perform(status_id) def perform(status_id, options = {})
RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock| RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}", autorelease: 5.minutes.seconds) do |lock|
if lock.acquired? if lock.acquired?
FanOutOnWriteService.new.call(Status.find(status_id)) FanOutOnWriteService.new.call(Status.find(status_id), **options.symbolize_keys)
else else
raise Mastodon::RaceConditionError raise Mastodon::RaceConditionError
end end

@ -3,9 +3,10 @@
class FeedInsertWorker class FeedInsertWorker
include Sidekiq::Worker include Sidekiq::Worker
def perform(status_id, id, type = :home) def perform(status_id, id, type = :home, options = {})
@type = type.to_sym @type = type.to_sym
@status = Status.find(status_id) @status = Status.find(status_id)
@options = options.symbolize_keys
case @type case @type
when :home when :home
@ -25,11 +26,13 @@ class FeedInsertWorker
private private
def check_and_insert def check_and_insert
return if feed_filtered? if feed_filtered?
perform_unpush if update?
else
perform_push perform_push
perform_notify if notify? perform_notify if notify?
end end
end
def feed_filtered? def feed_filtered?
case @type case @type
@ -51,15 +54,30 @@ class FeedInsertWorker
def perform_push def perform_push
case @type case @type
when :home when :home
FeedManager.instance.push_to_home(@follower, @status) FeedManager.instance.push_to_home(@follower, @status, update: update?)
when :list when :list
FeedManager.instance.push_to_list(@list, @status) FeedManager.instance.push_to_list(@list, @status, update: update?)
when :direct when :direct
FeedManager.instance.push_to_direct(@account, @status) FeedManager.instance.push_to_direct(@account, @status, update: update?)
end
end
def perform_unpush
case @type
when :home
FeedManager.instance.unpush_from_home(@follower, @status, update: true)
when :list
FeedManager.instance.unpush_from_list(@list, @status, update: true)
when :direct
FeedManager.instance.unpush_from_direct(@account, @status, update: true)
end end
end end
def perform_notify def perform_notify
NotifyService.new.call(@follower, :status, @status) NotifyService.new.call(@follower, :status, @status)
end end
def update?
@options[:update]
end
end end

@ -12,6 +12,8 @@ class LocalNotificationWorker
activity = activity_class_name.constantize.find(activity_id) activity = activity_class_name.constantize.find(activity_id)
end end
return if Notification.where(account: receiver, activity: activity).any?
NotifyService.new.call(receiver, type || activity_class_name.underscore, activity) NotifyService.new.call(receiver, type || activity_class_name.underscore, activity)
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true

@ -6,19 +6,44 @@ class PollExpirationNotifyWorker
sidekiq_options lock: :until_executed sidekiq_options lock: :until_executed
def perform(poll_id) def perform(poll_id)
poll = Poll.find(poll_id) @poll = Poll.find(poll_id)
# Notify poll owner and remote voters return if does_not_expire?
if poll.local? requeue! && return if not_due_yet?
ActivityPub::DistributePollUpdateWorker.perform_async(poll.status.id)
NotifyService.new.call(poll.account, :poll, poll)
end
# Notify local voters notify_remote_voters_and_owner! if @poll.local?
poll.votes.includes(:account).group(:account_id).select(:account_id).map(&:account).select(&:local?).each do |account| notify_local_voters!
NotifyService.new.call(account, :poll, poll)
end
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
def self.remove_from_scheduled(poll_id)
queue = Sidekiq::ScheduledSet.new
queue.select { |scheduled| scheduled.klass == name && scheduled.args[0] == poll_id }.map(&:delete)
end
private
def does_not_expire?
@poll.expires_at.nil?
end
def not_due_yet?
@poll.expires_at.present? && !@poll.expired?
end
def requeue!
PollExpirationNotifyWorker.perform_at(@poll.expires_at + 5.minutes, @poll.id)
end
def notify_remote_voters_and_owner!
ActivityPub::DistributePollUpdateWorker.perform_async(@poll.status.id)
NotifyService.new.call(@poll.account, :poll, @poll)
end
def notify_local_voters!
@poll.voters.merge(Account.local).find_each do |account|
NotifyService.new.call(account, :poll, @poll)
end
end
end end

@ -2,15 +2,38 @@
class PushUpdateWorker class PushUpdateWorker
include Sidekiq::Worker include Sidekiq::Worker
include Redisable
def perform(account_id, status_id, timeline_id = nil) def perform(account_id, status_id, timeline_id = nil, options = {})
account = Account.find(account_id) @account = Account.find(account_id)
status = Status.find(status_id) @status = Status.find(status_id)
message = InlineRenderer.render(status, account, :status) @timeline_id = timeline_id || "timeline:#{account.id}"
timeline_id = "timeline:#{account.id}" if timeline_id.nil? @options = options.symbolize_keys
Redis.current.publish(timeline_id, Oj.dump(event: :update, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) publish!
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end
private
def payload
InlineRenderer.render(@status, @account, :status)
end
def message
Oj.dump(
event: update? ? :'status.update' : :update,
payload: payload,
queued_at: (Time.now.to_f * 1000.0).to_i
)
end
def publish!
redis.publish(@timeline_id, message)
end
def update?
@options[:update]
end
end end

@ -352,6 +352,9 @@ Rails.application.routes.draw do
resource :pin, only: :create resource :pin, only: :create
post :unpin, to: 'pins#destroy' post :unpin, to: 'pins#destroy'
resource :history, only: :show
resource :source, only: :show
end end
member do member do

@ -0,0 +1,5 @@
class AddEditedAtToStatuses < ActiveRecord::Migration[6.1]
def change
add_column :statuses, :edited_at, :datetime
end
end

@ -0,0 +1,13 @@
class CreateStatusEdits < ActiveRecord::Migration[6.1]
def change
create_table :status_edits do |t|
t.belongs_to :status, null: false, foreign_key: { on_delete: :cascade }
t.belongs_to :account, null: true, foreign_key: { on_delete: :nullify }
t.text :text, null: false, default: ''
t.text :spoiler_text, null: false, default: ''
t.boolean :media_attachments_changed, null: false, default: false
t.timestamps
end
end
end

@ -816,6 +816,18 @@ ActiveRecord::Schema.define(version: 2022_01_16_202951) do
t.index ["var"], name: "index_site_uploads_on_var", unique: true t.index ["var"], name: "index_site_uploads_on_var", unique: true
end end
create_table "status_edits", force: :cascade do |t|
t.bigint "status_id", null: false
t.bigint "account_id"
t.text "text", default: "", null: false
t.text "spoiler_text", default: "", null: false
t.boolean "media_attachments_changed", default: false, null: false
t.datetime "created_at", precision: 6, null: false
t.datetime "updated_at", precision: 6, null: false
t.index ["account_id"], name: "index_status_edits_on_account_id"
t.index ["status_id"], name: "index_status_edits_on_status_id"
end
create_table "status_pins", force: :cascade do |t| create_table "status_pins", force: :cascade do |t|
t.bigint "account_id", null: false t.bigint "account_id", null: false
t.bigint "status_id", null: false t.bigint "status_id", null: false
@ -856,6 +868,7 @@ ActiveRecord::Schema.define(version: 2022_01_16_202951) do
t.bigint "poll_id" t.bigint "poll_id"
t.string "content_type" t.string "content_type"
t.datetime "deleted_at" t.datetime "deleted_at"
t.datetime "edited_at"
t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)" t.index ["account_id", "id", "visibility", "updated_at"], name: "index_statuses_20190820", order: { id: :desc }, where: "(deleted_at IS NULL)"
t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)" t.index ["deleted_at"], name: "index_statuses_on_deleted_at", where: "(deleted_at IS NOT NULL)"
t.index ["id", "account_id"], name: "index_statuses_local_20190824", order: { id: :desc }, where: "((local OR (uri IS NULL)) AND (deleted_at IS NULL) AND (visibility = 0) AND (reblog_of_id IS NULL) AND ((NOT reply) OR (in_reply_to_account_id = account_id)))" t.index ["id", "account_id"], name: "index_statuses_local_20190824", order: { id: :desc }, where: "((local OR (uri IS NULL)) AND (deleted_at IS NULL) AND (visibility = 0) AND (reblog_of_id IS NULL) AND ((NOT reply) OR (in_reply_to_account_id = account_id)))"
@ -1083,6 +1096,8 @@ ActiveRecord::Schema.define(version: 2022_01_16_202951) do
add_foreign_key "scheduled_statuses", "accounts", on_delete: :cascade add_foreign_key "scheduled_statuses", "accounts", on_delete: :cascade
add_foreign_key "session_activations", "oauth_access_tokens", column: "access_token_id", name: "fk_957e5bda89", on_delete: :cascade add_foreign_key "session_activations", "oauth_access_tokens", column: "access_token_id", name: "fk_957e5bda89", on_delete: :cascade
add_foreign_key "session_activations", "users", name: "fk_e5fda67334", on_delete: :cascade add_foreign_key "session_activations", "users", name: "fk_e5fda67334", on_delete: :cascade
add_foreign_key "status_edits", "accounts", on_delete: :nullify
add_foreign_key "status_edits", "statuses", on_delete: :cascade
add_foreign_key "status_pins", "accounts", name: "fk_d4cb435b62", on_delete: :cascade add_foreign_key "status_pins", "accounts", name: "fk_d4cb435b62", on_delete: :cascade
add_foreign_key "status_pins", "statuses", on_delete: :cascade add_foreign_key "status_pins", "statuses", on_delete: :cascade
add_foreign_key "status_stats", "statuses", on_delete: :cascade add_foreign_key "status_stats", "statuses", on_delete: :cascade

@ -0,0 +1,29 @@
# frozen_string_literal: true
require 'rails_helper'
describe Api::V1::Statuses::HistoriesController do
render_views
let(:user) { Fabricate(:user, account: Fabricate(:account, username: 'alice')) }
let(:app) { Fabricate(:application, name: 'Test app', website: 'http://testapp.com') }
let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, scopes: 'read:statuses', application: app) }
context 'with an oauth token' do
before do
allow(controller).to receive(:doorkeeper_token) { token }
end
describe 'GET #show' do
let(:status) { Fabricate(:status, account: user.account) }
before do
get :show, params: { status_id: status.id }
end
it 'returns http success' do
expect(response).to have_http_status(200)
end
end
end
end

@ -0,0 +1,29 @@
# frozen_string_literal: true
require 'rails_helper'
describe Api::V1::Statuses::SourcesController do
render_views
let(:user) { Fabricate(:user, account: Fabricate(:account, username: 'alice')) }
let(:app) { Fabricate(:application, name: 'Test app', website: 'http://testapp.com') }
let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, scopes: 'read:statuses', application: app) }
context 'with an oauth token' do
before do
allow(controller).to receive(:doorkeeper_token) { token }
end
describe 'GET #show' do
let(:status) { Fabricate(:status, account: user.account) }
before do
get :show, params: { status_id: status.id }
end
it 'returns http success' do
expect(response).to have_http_status(200)
end
end
end
end

@ -0,0 +1,6 @@
Fabricator(:preview_card) do
url { Faker::Internet.url }
title { Faker::Lorem.sentence }
description { Faker::Lorem.paragraph }
type 'link'
end

@ -0,0 +1,7 @@
Fabricator(:status_edit) do
status nil
account nil
text "MyText"
spoiler_text "MyText"
media_attachments_changed false
end

@ -0,0 +1,109 @@
# frozen_string_literal: true
require 'rails_helper'
describe StatusReachFinder do
describe '#inboxes' do
context 'for a local status' do
let(:parent_status) { nil }
let(:visibility) { :public }
let(:alice) { Fabricate(:account, username: 'alice') }
let(:status) { Fabricate(:status, account: alice, thread: parent_status, visibility: visibility) }
subject { described_class.new(status) }
context 'when it contains mentions of remote accounts' do
let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') }
before do
status.mentions.create!(account: bob)
end
it 'includes the inbox of the mentioned account' do
expect(subject.inboxes).to include 'https://foo.bar/inbox'
end
end
context 'when it has been reblogged by a remote account' do
let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') }
before do
bob.statuses.create!(reblog: status)
end
it 'includes the inbox of the reblogger' do
expect(subject.inboxes).to include 'https://foo.bar/inbox'
end
context 'when status is not public' do
let(:visibility) { :private }
it 'does not include the inbox of the reblogger' do
expect(subject.inboxes).to_not include 'https://foo.bar/inbox'
end
end
end
context 'when it has been favourited by a remote account' do
let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') }
before do
bob.favourites.create!(status: status)
end
it 'includes the inbox of the favouriter' do
expect(subject.inboxes).to include 'https://foo.bar/inbox'
end
context 'when status is not public' do
let(:visibility) { :private }
it 'does not include the inbox of the favouriter' do
expect(subject.inboxes).to_not include 'https://foo.bar/inbox'
end
end
end
context 'when it has been replied to by a remote account' do
let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') }
before do
bob.statuses.create!(thread: status, text: 'Hoge')
end
context do
it 'includes the inbox of the replier' do
expect(subject.inboxes).to include 'https://foo.bar/inbox'
end
end
context 'when status is not public' do
let(:visibility) { :private }
it 'does not include the inbox of the replier' do
expect(subject.inboxes).to_not include 'https://foo.bar/inbox'
end
end
end
context 'when it is a reply to a remote account' do
let(:bob) { Fabricate(:account, username: 'bob', domain: 'foo.bar', protocol: :activitypub, inbox_url: 'https://foo.bar/inbox') }
let(:parent_status) { Fabricate(:status, account: bob) }
context do
it 'includes the inbox of the replied-to account' do
expect(subject.inboxes).to include 'https://foo.bar/inbox'
end
end
context 'when status is not public and replied-to account is not mentioned' do
let(:visibility) { :private }
it 'does not include the inbox of the replied-to account' do
expect(subject.inboxes).to_not include 'https://foo.bar/inbox'
end
end
end
end
end
end

@ -0,0 +1,5 @@
require 'rails_helper'
RSpec.describe StatusEdit, type: :model do
pending "add some examples to (or delete) #{__FILE__}"
end

@ -1,37 +1,112 @@
require 'rails_helper' require 'rails_helper'
RSpec.describe FanOutOnWriteService, type: :service do RSpec.describe FanOutOnWriteService, type: :service do
let(:author) { Fabricate(:account, username: 'tom') } let(:last_active_at) { Time.now.utc }
let(:status) { Fabricate(:status, text: 'Hello @alice #test', account: author) }
let(:alice) { Fabricate(:user, account: Fabricate(:account, username: 'alice')).account }
let(:follower) { Fabricate(:account, username: 'bob') }
subject { FanOutOnWriteService.new } let!(:alice) { Fabricate(:user, current_sign_in_at: last_active_at, account: Fabricate(:account, username: 'alice')).account }
let!(:bob) { Fabricate(:user, current_sign_in_at: last_active_at, account: Fabricate(:account, username: 'bob')).account }
let!(:tom) { Fabricate(:user, current_sign_in_at: last_active_at, account: Fabricate(:account, username: 'tom')).account }
subject { described_class.new }
let(:status) { Fabricate(:status, account: alice, visibility: visibility, text: 'Hello @bob #hoge') }
before do before do
alice bob.follow!(alice)
follower.follow!(author) tom.follow!(alice)
ProcessMentionsService.new.call(status) ProcessMentionsService.new.call(status)
ProcessHashtagsService.new.call(status) ProcessHashtagsService.new.call(status)
allow(Redis.current).to receive(:publish)
subject.call(status) subject.call(status)
end end
it 'delivers status to home timeline' do def home_feed_of(account)
expect(HomeFeed.new(author).get(10).map(&:id)).to include status.id HomeFeed.new(account).get(10).map(&:id)
end
context 'when status is public' do
let(:visibility) { 'public' }
it 'is added to the home feed of its author' do
expect(home_feed_of(alice)).to include status.id
end
it 'is added to the home feed of a follower' do
expect(home_feed_of(bob)).to include status.id
expect(home_feed_of(tom)).to include status.id
end
it 'is broadcast to the hashtag stream' do
expect(Redis.current).to have_received(:publish).with('timeline:hashtag:hoge', anything)
expect(Redis.current).to have_received(:publish).with('timeline:hashtag:hoge:local', anything)
end
it 'is broadcast to the public stream' do
expect(Redis.current).to have_received(:publish).with('timeline:public', anything)
expect(Redis.current).to have_received(:publish).with('timeline:public:local', anything)
end end
end
context 'when status is limited' do
let(:visibility) { 'limited' }
it 'delivers status to local followers' do it 'is added to the home feed of its author' do
pending 'some sort of problem in test environment causes this to sometimes fail' expect(home_feed_of(alice)).to include status.id
expect(HomeFeed.new(follower).get(10).map(&:id)).to include status.id
end end
it 'delivers status to hashtag' do it 'is added to the home feed of the mentioned follower' do
expect(TagFeed.new(Tag.find_by(name: 'test'), alice).get(20).map(&:id)).to include status.id expect(home_feed_of(bob)).to include status.id
end end
it 'delivers status to public timeline' do it 'is not added to the home feed of the other follower' do
expect(PublicFeed.new(alice).get(20).map(&:id)).to include status.id expect(home_feed_of(tom)).to_not include status.id
end
it 'is not broadcast publicly' do
expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything)
expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything)
end
end
context 'when status is private' do
let(:visibility) { 'private' }
it 'is added to the home feed of its author' do
expect(home_feed_of(alice)).to include status.id
end
it 'is added to the home feed of a follower' do
expect(home_feed_of(bob)).to include status.id
expect(home_feed_of(tom)).to include status.id
end
it 'is not broadcast publicly' do
expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything)
expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything)
end
end
context 'when status is direct' do
let(:visibility) { 'direct' }
it 'is added to the home feed of its author' do
expect(home_feed_of(alice)).to include status.id
end
it 'is added to the home feed of the mentioned follower' do
expect(home_feed_of(bob)).to include status.id
end
it 'is not added to the home feed of the other follower' do
expect(home_feed_of(tom)).to_not include status.id
end
it 'is not broadcast publicly' do
expect(Redis.current).to_not have_received(:publish).with('timeline:hashtag:hoge', anything)
expect(Redis.current).to_not have_received(:publish).with('timeline:public', anything)
end
end end
end end

@ -9,75 +9,55 @@ RSpec.describe ProcessMentionsService, type: :service do
context 'ActivityPub' do context 'ActivityPub' do
context do context do
let(:remote_user) { Fabricate(:account, username: 'remote_user', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox') } let!(:remote_user) { Fabricate(:account, username: 'remote_user', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox') }
before do before do
stub_request(:post, remote_user.inbox_url)
subject.call(status) subject.call(status)
end end
it 'creates a mention' do it 'creates a mention' do
expect(remote_user.mentions.where(status: status).count).to eq 1 expect(remote_user.mentions.where(status: status).count).to eq 1
end end
it 'sends activity to the inbox' do
expect(a_request(:post, remote_user.inbox_url)).to have_been_made.once
end
end end
context 'with an IDN domain' do context 'with an IDN domain' do
let(:remote_user) { Fabricate(:account, username: 'sneak', protocol: :activitypub, domain: 'xn--hresiar-mxa.ch', inbox_url: 'http://example.com/inbox') } let!(:remote_user) { Fabricate(:account, username: 'sneak', protocol: :activitypub, domain: 'xn--hresiar-mxa.ch', inbox_url: 'http://example.com/inbox') }
let(:status) { Fabricate(:status, account: account, text: "Hello @sneak@hæresiar.ch") } let!(:status) { Fabricate(:status, account: account, text: "Hello @sneak@hæresiar.ch") }
before do before do
stub_request(:post, remote_user.inbox_url)
subject.call(status) subject.call(status)
end end
it 'creates a mention' do it 'creates a mention' do
expect(remote_user.mentions.where(status: status).count).to eq 1 expect(remote_user.mentions.where(status: status).count).to eq 1
end end
it 'sends activity to the inbox' do
expect(a_request(:post, remote_user.inbox_url)).to have_been_made.once
end
end end
context 'with an IDN TLD' do context 'with an IDN TLD' do
let(:remote_user) { Fabricate(:account, username: 'foo', protocol: :activitypub, domain: 'xn--y9a3aq.xn--y9a3aq', inbox_url: 'http://example.com/inbox') } let!(:remote_user) { Fabricate(:account, username: 'foo', protocol: :activitypub, domain: 'xn--y9a3aq.xn--y9a3aq', inbox_url: 'http://example.com/inbox') }
let(:status) { Fabricate(:status, account: account, text: "Hello @foo@հայ.հայ") } let!(:status) { Fabricate(:status, account: account, text: "Hello @foo@հայ.հայ") }
before do before do
stub_request(:post, remote_user.inbox_url)
subject.call(status) subject.call(status)
end end
it 'creates a mention' do it 'creates a mention' do
expect(remote_user.mentions.where(status: status).count).to eq 1 expect(remote_user.mentions.where(status: status).count).to eq 1
end end
it 'sends activity to the inbox' do
expect(a_request(:post, remote_user.inbox_url)).to have_been_made.once
end
end end
end end
context 'Temporarily-unreachable ActivityPub user' do context 'Temporarily-unreachable ActivityPub user' do
let(:remote_user) { Fabricate(:account, username: 'remote_user', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox', last_webfingered_at: nil) } let!(:remote_user) { Fabricate(:account, username: 'remote_user', protocol: :activitypub, domain: 'example.com', inbox_url: 'http://example.com/inbox', last_webfingered_at: nil) }
before do before do
stub_request(:get, "https://example.com/.well-known/host-meta").to_return(status: 404) stub_request(:get, "https://example.com/.well-known/host-meta").to_return(status: 404)
stub_request(:get, "https://example.com/.well-known/webfinger?resource=acct:remote_user@example.com").to_return(status: 500) stub_request(:get, "https://example.com/.well-known/webfinger?resource=acct:remote_user@example.com").to_return(status: 500)
stub_request(:post, remote_user.inbox_url)
subject.call(status) subject.call(status)
end end
it 'creates a mention' do it 'creates a mention' do
expect(remote_user.mentions.where(status: status).count).to eq 1 expect(remote_user.mentions.where(status: status).count).to eq 1
end end
it 'sends activity to the inbox' do
expect(a_request(:post, remote_user.inbox_url)).to have_been_made.once
end
end end
end end

@ -35,13 +35,16 @@ describe ActivityPub::DistributionWorker do
end end
context 'with direct status' do context 'with direct status' do
let(:mentioned_account) { Fabricate(:account, protocol: :activitypub, inbox_url: 'https://foo.bar/inbox')}
before do before do
status.update(visibility: :direct) status.update(visibility: :direct)
status.mentions.create!(account: mentioned_account)
end end
it 'does nothing' do it 'delivers to mentioned accounts' do
subject.perform(status.id) subject.perform(status.id)
expect(ActivityPub::DeliveryWorker).to_not have_received(:push_bulk) expect(ActivityPub::DeliveryWorker).to have_received(:push_bulk).with(['https://foo.bar/inbox'])
end end
end end
end end

@ -45,7 +45,7 @@ describe FeedInsertWorker do
result = subject.perform(status.id, follower.id) result = subject.perform(status.id, follower.id)
expect(result).to be_nil expect(result).to be_nil
expect(instance).to have_received(:push_to_home).with(follower, status) expect(instance).to have_received(:push_to_home).with(follower, status, update: nil)
end end
end end
end end

Loading…
Cancel
Save