Change `tootctl search deploy` algorithm (#14300)

main
Eugen Rochko 4 years ago committed by GitHub
parent 98b3b80d6b
commit 4abe3be321
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -31,7 +31,7 @@ class StatusesIndex < Chewy::Index
}, },
} }
define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments), delete_if: ->(status) { status.searchable_by.empty? } do define_type ::Status.unscoped.kept.without_reblogs.includes(:media_attachments, :preloadable_poll) do
crutch :mentions do |collection| crutch :mentions do |collection|
data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id) data = ::Mention.where(status_id: collection.map(&:id)).where(account: Account.local, silent: false).pluck(:status_id, :account_id)
data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) } data.each.with_object({}) { |(id, name), result| (result[id] ||= []).push(name) }

@ -7,6 +7,7 @@ ActiveRecord::Base.logger = dev_null
ActiveJob::Base.logger = dev_null ActiveJob::Base.logger = dev_null
HttpLog.configuration.logger = dev_null HttpLog.configuration.logger = dev_null
Paperclip.options[:log] = false Paperclip.options[:log] = false
Chewy.logger = dev_null
module Mastodon module Mastodon
module CLIHelper module CLIHelper

@ -6,8 +6,19 @@ require_relative 'cli_helper'
module Mastodon module Mastodon
class SearchCLI < Thor class SearchCLI < Thor
option :processes, default: 2, aliases: [:p] include CLIHelper
desc 'deploy', 'Create or update an ElasticSearch index and populate it'
# Indices are sorted by amount of data to be expected in each, so that
# smaller indices can go online sooner
INDICES = [
AccountsIndex,
TagsIndex,
StatusesIndex,
].freeze
option :concurrency, type: :numeric, default: 2, aliases: [:c], desc: 'Workload will be split between this number of threads'
option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
desc 'deploy', 'Create or upgrade ElasticSearch indices and populate them'
long_desc <<~LONG_DESC long_desc <<~LONG_DESC
If ElasticSearch is empty, this command will create the necessary indices If ElasticSearch is empty, this command will create the necessary indices
and then import data from the database into those indices. and then import data from the database into those indices.
@ -15,27 +26,126 @@ module Mastodon
This command will also upgrade indices if the underlying schema has been This command will also upgrade indices if the underlying schema has been
changed since the last run. changed since the last run.
With the --processes option, parallelize execution of the command. The Even if creating or upgrading indices is not necessary, data from the
default is 2. If "auto" is specified, the number is automatically database will be imported into the indices.
derived from available CPUs.
LONG_DESC LONG_DESC
def deploy def deploy
processed = Chewy::RakeHelper.upgrade(parallel: processes) if options[:concurrency] < 1
Chewy::RakeHelper.sync(except: processed, parallel: processes) say('Cannot run with this concurrency setting, must be at least 1', :red)
end exit(1)
end
indices = begin
if options[:only]
options[:only].map { |str| "#{str.camelize}Index".constantize }
else
INDICES
end
end
progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
# First, ensure all indices are created and have the correct
# structure, so that live data can already be written
indices.select { |index| index.specification.changed? }.each do |index|
progress.title = "Upgrading #{index} "
index.purge
index.specification.lock!
end
ActiveRecord::Base.configurations[Rails.env]['pool'] = options[:concurrency] + 1
pool = Concurrent::FixedThreadPool.new(options[:concurrency])
added = Concurrent::AtomicFixnum.new(0)
removed = Concurrent::AtomicFixnum.new(0)
progress.title = 'Estimating workload '
# Estimate the amount of data that has to be imported first
indices.each do |index|
index.types.each do |type|
progress.total = (progress.total || 0) + type.adapter.default_scope.count
end
end
# Now import all the actual data. Mind that unlike chewy:sync, we don't
# fetch and compare all record IDs from the database and the index to
# find out which to add and which to remove from the index. Because with
# potentially millions of rows, the memory footprint of such a calculation
# is uneconomical. So we only ever add.
indices.each do |index|
progress.title = "Importing #{index} "
batch_size = 1_000
slice_size = (batch_size / options[:concurrency]).ceil
index.types.each do |type|
type.adapter.default_scope.reorder(nil).find_in_batches(batch_size: batch_size) do |batch|
futures = []
batch.each_slice(slice_size) do |records|
futures << Concurrent::Future.execute(executor: pool) do
begin
if !progress.total.nil? && progress.progress + records.size > progress.total
# The number of items has changed between start and now,
# since there is no good way to predict the final count from
# here, just change the progress bar to an indeterminate one
progress.total = nil
end
grouped_records = nil
bulk_body = nil
index_count = 0
delete_count = 0
ActiveRecord::Base.connection_pool.with_connection do
grouped_records = type.adapter.send(:grouped_objects, records)
bulk_body = Chewy::Type::Import::BulkBuilder.new(type, grouped_records).bulk_body
end
private index_count = grouped_records[:index].size if grouped_records.key?(:index)
delete_count = grouped_records[:delete].size if grouped_records.key?(:delete)
def processes # The following is an optimization for statuses specifically, since
return true if options[:processes] == 'auto' # we want to de-index statuses that cannot be searched by anybody,
# but can't use Chewy's delete_if logic because it doesn't use
# crutches and our searchable_by logic depends on them
if type == StatusesIndex::Status
bulk_body.map! do |entry|
if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank?
index_count -= 1
delete_count += 1
num = options[:processes].to_i { delete: entry[:index].except(:data) }
else
entry
end
end
end
if num < 2 Chewy::Type::Import::BulkRequest.new(type).perform(bulk_body)
nil
else progress.progress += records.size
num
added.increment(index_count)
removed.increment(delete_count)
sleep 1
rescue => e
progress.log pastel.red("Error importing #{index}: #{e}")
end
end
end
futures.map(&:value)
end
end
end end
progress.title = ''
progress.stop
say("Indexed #{added.value} records, de-indexed #{removed.value}", :green, true)
end end
end end
end end

Loading…
Cancel
Save