Completed
Push — master ( e56411...e397a9 )
by Michael
03:06
created

CsvLoader.initialize_progress()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 7
ccs 1
cts 5
cp 0.2
crap 1.512
rs 9.4285
c 0
b 0
f 0
1
# frozen_string_literal: true
2
3 1
require 'formatador'
4 1
require 'smarter_csv'
5 1
require 'zlib'
6
7 1
module NoSE
8 1
  module Loader
9
    # Load data into an index from a set of CSV files
10 1
    class CsvLoader < LoaderBase
11 1
      def initialize(workload = nil, backend = nil)
12 1
        super
13
14 1
        @logger = Logging.logger['nose::loader::csvloader']
15
      end
16
17
      # Load data for all the indexes
18 1
      def load(indexes, config, show_progress = false, limit = nil,
19
               skip_existing = true)
20 1
        simple_indexes = find_simple_indexes indexes, skip_existing
21 1
        simple_indexes.each do |entity, simple_index_list|
22 1
          filename = File.join config[:directory], "#{entity.name}.csv"
23 1
          total_rows = (limit || 0) - 1 # account for header row
24 4
          File.open(filename) { |file| file.each_line { total_rows += 1 } }
25
26
          progress = initialize_progress entity, simple_index_list,
27 1
                                         total_rows if show_progress
28 1
          load_file_indexes filename, entity, simple_index_list, progress
29
        end
30
      end
31
32 1
      private
33
34
      # Find the simple indexes we should populate
35
      # @return [Hash<Entity, Index>]
36 1
      def find_simple_indexes(indexes, skip_existing)
37 1
        simple_indexes = indexes.select do |index|
38
          index.graph.size == 1 &&
39 1
            !(skip_existing && [email protected]_empty?(index))
40
        end
41
42 1
        simple_indexes.group_by do |index|
43 1
          index.hash_fields.first.parent
44
        end
45
      end
46
47
      # Initialize a progress bar to reporting loading results
48
      # @return [Formatador::ProgressBar]
49 1
      def initialize_progress(entity, simple_index_list, total_rows)
50
        @logger.info "Loading simple indexes for #{entity.name}"
51
        @logger.info simple_index_list.map(&:key).join(', ')
52
53
        Formatador.new.redisplay_progressbar 0, total_rows
54
        Formatador::ProgressBar.new total_rows, started_at: Time.now.utc
55
      end
56
57
      # Load all indexes for a given file
58
      # @return [void]
59 1
      def load_file_indexes(filename, entity, simple_index_list, progress)
60
        SmarterCSV.process(filename,
61
                           downcase_header: false,
62
                           chunk_size: 1000,
63 1
                           convert_values_to_numeric: false) do |chunk|
64
          Parallel.each(chunk.each_slice(100),
65 1
                        finish: (lambda do |_, _, _|
66 1
                          next if progress.nil?
67
                          inc = [progress.total - progress.current, 100].min
68
                          progress.increment inc
69 1
                        end)) do |minichunk|
70 1
            load_simple_chunk minichunk, entity, simple_index_list
71
          end
72
        end
73
      end
74
75
      # Load a chunk of data from a simple entity index
76
      # @return [void]
77 1
      def load_simple_chunk(chunk, entity, indexes)
78
        # Prefix all hash keys with the entity name and convert values
79 1
        chunk.map! do |row|
80 1
          index_row = {}
81 1
          row.each_key do |key|
82 3
            field_class = entity[key.to_s].class
83 3
            value = field_class.value_from_string row[key]
84 3
            index_row["#{entity.name}_#{key}"] = value
85
          end
86
87 1
          index_row
88
        end
89
90
        # Insert the batch into the index
91 1
        indexes.each do |index|
92 1
          @backend.index_insert_chunk index, chunk
93
        end
94
      end
95
    end
96
  end
97
end
98