@ -64,11 +64,7 @@ module Mastodon
progress . title = 'Estimating workload '
# Estimate the amount of data that has to be imported first
indices . each do | index |
index . types . each do | type |
progress . total = ( progress . total || 0 ) + type . adapter . default_scope . count
end
end
progress . total = indices . sum { | index | index . adapter . default_scope . count }
# Now import all the actual data. Mind that unlike chewy:sync, we don't
# fetch and compare all record IDs from the database and the index to
@ -80,8 +76,7 @@ module Mastodon
batch_size = 1_000
slice_size = ( batch_size / options [ :concurrency ] ) . ceil
index . types . each do | type |
type . adapter . default_scope . reorder ( nil ) . find_in_batches ( batch_size : batch_size ) do | batch |
index . adapter . default_scope . reorder ( nil ) . find_in_batches ( batch_size : batch_size ) do | batch |
futures = [ ]
batch . each_slice ( slice_size ) do | records |
@ -101,31 +96,34 @@ module Mastodon
delete_count = 0
ActiveRecord :: Base . connection_pool . with_connection do
grouped_records = type . adapter . send ( :grouped_objects , records )
bulk_body = Chewy :: Type :: Import :: BulkBuilder . new ( type , ** grouped_records ) . bulk_body
grouped_records = records . to_a . group_by do | record |
index. adapter . send ( :delete_from_index? , record ) ? :delete : :to_index
end
index_count = grouped_records [ :index ] . size if grouped_records . key? ( :index )
bulk_body = Chewy :: Index :: Import :: BulkBuilder . new ( index , ** grouped_records ) . bulk_body
end
index_count = grouped_records [ :to_index ] . size if grouped_records . key? ( :to_index )
delete_count = grouped_records [ :delete ] . size if grouped_records . key? ( :delete )
# The following is an optimization for statuses specifically, since
# we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if type == StatusesIndex :: Status
if index == StatusesIndex
bulk_body . map! do | entry |
if entry [ : index] && entry . dig ( : index, :data , 'searchable_by' ) . blank?
if entry [ : to_ index] && entry . dig ( : to_ index, :data , 'searchable_by' ) . blank?
index_count -= 1
delete_count += 1
{ delete : entry [ : index] . except ( :data ) }
{ delete : entry [ : to_ index] . except ( :data ) }
else
entry
end
end
end
Chewy :: Type :: Import :: BulkRequest . new ( type ) . perform ( bulk_body )
Chewy :: Index :: Import :: BulkRequest . new ( index ) . perform ( bulk_body )
progress . progress += records . size
@ -142,7 +140,6 @@ module Mastodon
futures . map ( & :value )
end
end
end
progress . title = ''
progress . stop