Change importers to avoid a few inefficiencies (#26721)
This commit is contained in:
		
							parent
							
								
									703178c117
								
							
						
					
					
						commit
						dcd8d25ac1
					
				
					 6 changed files with 34 additions and 33 deletions
				
			
		|  | @ -4,10 +4,10 @@ class Importer::AccountsIndexImporter < Importer::BaseImporter | |||
|   def import! | ||||
|     scope.includes(:account_stat).find_in_batches(batch_size: @batch_size) do |tmp| | ||||
|       in_work_unit(tmp) do |accounts| | ||||
|         bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: accounts).bulk_body | ||||
|         bulk = build_bulk_body(accounts) | ||||
| 
 | ||||
|         indexed = bulk.count { |entry| entry[:index] } | ||||
|         deleted = bulk.count { |entry| entry[:delete] } | ||||
|         indexed = bulk.size | ||||
|         deleted = 0 | ||||
| 
 | ||||
|         Chewy::Index::Import::BulkRequest.new(index).perform(bulk) | ||||
| 
 | ||||
|  |  | |||
|  | @ -68,6 +68,14 @@ class Importer::BaseImporter | |||
| 
 | ||||
|   protected | ||||
| 
 | ||||
|   def build_bulk_body(to_import) | ||||
|     # Specialize `Chewy::Index::Import::BulkBuilder#bulk_body` to avoid a few | ||||
|     # inefficiencies, as none of our fields or join fields and we do not need | ||||
|     # `BulkBuilder`'s versatility. | ||||
|     crutches = Chewy::Index::Crutch::Crutches.new index, to_import | ||||
|     to_import.map { |object| { index: { _id: object.id, data: index.compose(object, crutches, fields: []) } } } | ||||
|   end | ||||
| 
 | ||||
|   def in_work_unit(...) | ||||
|     work_unit = Concurrent::Promises.future_on(@executor, ...) | ||||
| 
 | ||||
|  |  | |||
|  | @ -4,10 +4,10 @@ class Importer::InstancesIndexImporter < Importer::BaseImporter | |||
|   def import! | ||||
|     index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| | ||||
|       in_work_unit(tmp) do |instances| | ||||
|         bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: instances).bulk_body | ||||
|         bulk = build_bulk_body(instances) | ||||
| 
 | ||||
|         indexed = bulk.count { |entry| entry[:index] } | ||||
|         deleted = bulk.count { |entry| entry[:delete] } | ||||
|         indexed = bulk.size | ||||
|         deleted = 0 | ||||
| 
 | ||||
|         Chewy::Index::Import::BulkRequest.new(index).perform(bulk) | ||||
| 
 | ||||
|  |  | |||
|  | @ -5,11 +5,11 @@ class Importer::PublicStatusesIndexImporter < Importer::BaseImporter | |||
|     scope.select(:id).find_in_batches(batch_size: @batch_size) do |batch| | ||||
|       in_work_unit(batch.pluck(:id)) do |status_ids| | ||||
|         bulk = ActiveRecord::Base.connection_pool.with_connection do | ||||
|           Chewy::Index::Import::BulkBuilder.new(index, to_index: Status.includes(:media_attachments, :preloadable_poll, :preview_cards).where(id: status_ids)).bulk_body | ||||
|           build_bulk_body(index.adapter.default_scope.where(id: status_ids)) | ||||
|         end | ||||
| 
 | ||||
|         indexed = bulk.count { |entry| entry[:index] } | ||||
|         deleted = bulk.count { |entry| entry[:delete] } | ||||
|         indexed = bulk.size | ||||
|         deleted = 0 | ||||
| 
 | ||||
|         Chewy::Index::Import::BulkRequest.new(index).perform(bulk) | ||||
| 
 | ||||
|  |  | |||
|  | @ -13,31 +13,24 @@ class Importer::StatusesIndexImporter < Importer::BaseImporter | |||
| 
 | ||||
|       scope.find_in_batches(batch_size: @batch_size) do |tmp| | ||||
|         in_work_unit(tmp.map(&:status_id)) do |status_ids| | ||||
|           bulk = ActiveRecord::Base.connection_pool.with_connection do | ||||
|             Chewy::Index::Import::BulkBuilder.new(index, to_index: index.adapter.default_scope.where(id: status_ids)).bulk_body | ||||
|           end | ||||
| 
 | ||||
|           indexed = 0 | ||||
|           deleted = 0 | ||||
| 
 | ||||
|           # We can't use the delete_if proc to do the filtering because delete_if | ||||
|           # is called before rendering the data and we need to filter based | ||||
|           # on the results of the filter, so this filtering happens here instead | ||||
|           bulk.map! do |entry| | ||||
|             new_entry = if entry[:index] && entry.dig(:index, :data, 'searchable_by').blank? | ||||
|                           { delete: entry[:index].except(:data) } | ||||
|                         else | ||||
|                           entry | ||||
|                         end | ||||
| 
 | ||||
|             if new_entry[:index] | ||||
|               indexed += 1 | ||||
|             else | ||||
|           bulk = ActiveRecord::Base.connection_pool.with_connection do | ||||
|             to_index = index.adapter.default_scope.where(id: status_ids) | ||||
|             crutches = Chewy::Index::Crutch::Crutches.new index, to_index | ||||
|             to_index.map do |object| | ||||
|               # This is unlikely to happen, but the post may have been | ||||
|               # un-interacted with since it was queued for indexing | ||||
|               if object.searchable_by.empty? | ||||
|                 deleted += 1 | ||||
|                 { delete: { _id: object.id } } | ||||
|               else | ||||
|                 { index: { _id: object.id, data: index.compose(object, crutches, fields: []) } } | ||||
|               end | ||||
|             end | ||||
|           end | ||||
| 
 | ||||
|             new_entry | ||||
|           end | ||||
|           indexed = bulk.size - deleted | ||||
| 
 | ||||
|           Chewy::Index::Import::BulkRequest.new(index).perform(bulk) | ||||
| 
 | ||||
|  |  | |||
|  | @ -4,10 +4,10 @@ class Importer::TagsIndexImporter < Importer::BaseImporter | |||
|   def import! | ||||
|     index.adapter.default_scope.find_in_batches(batch_size: @batch_size) do |tmp| | ||||
|       in_work_unit(tmp) do |tags| | ||||
|         bulk = Chewy::Index::Import::BulkBuilder.new(index, to_index: tags).bulk_body | ||||
|         bulk = build_bulk_body(tags) | ||||
| 
 | ||||
|         indexed = bulk.count { |entry| entry[:index] } | ||||
|         deleted = bulk.count { |entry| entry[:delete] } | ||||
|         indexed = bulk.size | ||||
|         deleted = 0 | ||||
| 
 | ||||
|         Chewy::Index::Import::BulkRequest.new(index).perform(bulk) | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in a new issue