2022-05-22 23:16:43 +03:00
# frozen_string_literal: true
class Importer :: BaseImporter
# @param [Integer] batch_size
# @param [Concurrent::ThreadPoolExecutor] executor
def initialize ( batch_size : , executor : )
@batch_size = batch_size
@executor = executor
@wait_for = Concurrent :: Set . new
end
# Callback to run when a concurrent work unit completes
# @param [Proc]
def on_progress ( & block )
@on_progress = block
end
# Callback to run when a concurrent work unit fails
# @param [Proc]
def on_failure ( & block )
@on_failure = block
end
# Reduce resource usage during and improve speed of indexing
def optimize_for_import!
Chewy . client . indices . put_settings index : index . index_name , body : { index : { refresh_interval : - 1 } }
end
# Restore original index settings
def optimize_for_search!
Chewy . client . indices . put_settings index : index . index_name , body : { index : { refresh_interval : index . settings_hash [ :settings ] [ :index ] [ :refresh_interval ] } }
end
# Estimate the amount of documents that would be indexed. Not exact!
# @returns [Integer]
def estimate!
2023-10-03 11:47:50 +03:00
reltuples = ActiveRecord :: Base . connection_pool . with_connection { | connection | connection . select_one ( " SELECT reltuples FROM pg_class WHERE relname = ' #{ index . adapter . target . table_name } ' " ) [ 'reltuples' ] . to_i }
# If the table has never yet been vacuumed or analyzed, reltuples contains -1
[ reltuples , 0 ] . max
2022-05-22 23:16:43 +03:00
end
# Import data from the database into the index
def import!
raise NotImplementedError
end
# Remove documents from the index that no longer exist in the database
def clean_up!
index . scroll_batches do | documents |
2023-07-31 12:17:41 +03:00
primary_key = index . adapter . target . primary_key
raise ActiveRecord :: UnknownPrimaryKey , index . adapter . target if primary_key . nil?
2023-02-20 03:28:40 +02:00
ids = documents . pluck ( '_id' )
2023-07-31 12:17:41 +03:00
existence_map = index . adapter . target . where ( primary_key = > ids ) . pluck ( primary_key ) . each_with_object ( { } ) { | id , map | map [ id . to_s ] = true }
2022-05-22 23:16:43 +03:00
tmp = ids . reject { | id | existence_map [ id ] }
next if tmp . empty?
in_work_unit ( tmp ) do | deleted_ids |
bulk = Chewy :: Index :: Import :: BulkBuilder . new ( index , delete : deleted_ids ) . bulk_body
Chewy :: Index :: Import :: BulkRequest . new ( index ) . perform ( bulk )
[ 0 , bulk . size ]
end
end
wait!
end
protected
2023-08-31 20:04:27 +03:00
def build_bulk_body ( to_import )
# Specialize `Chewy::Index::Import::BulkBuilder#bulk_body` to avoid a few
# inefficiencies, as none of our fields or join fields and we do not need
# `BulkBuilder`'s versatility.
crutches = Chewy :: Index :: Crutch :: Crutches . new index , to_import
to_import . map { | object | { index : { _id : object . id , data : index . compose ( object , crutches , fields : [ ] ) } } }
end
2023-08-22 10:31:40 +03:00
def in_work_unit ( ... )
work_unit = Concurrent :: Promises . future_on ( @executor , ... )
2022-05-22 23:16:43 +03:00
work_unit . on_fulfillment! ( & @on_progress )
work_unit . on_rejection! ( & @on_failure )
work_unit . on_resolution! { @wait_for . delete ( work_unit ) }
@wait_for << work_unit
rescue Concurrent :: RejectedExecutionError
sleep ( 0 . 1 ) && retry # Backpressure
end
def wait!
Concurrent :: Promises . zip ( * @wait_for ) . wait
end
def index
raise NotImplementedError
end
end