1
|
|
|
# frozen_string_literal: true |
2
|
|
|
|
3
|
1 |
|
require 'formatador' |
4
|
1 |
|
require 'smarter_csv' |
5
|
1 |
|
require 'zlib' |
6
|
|
|
|
7
|
1 |
|
module NoSE |
8
|
1 |
|
module Loader |
9
|
|
|
# Load data into an index from a set of CSV files |
10
|
1 |
|
class CsvLoader < LoaderBase |
11
|
1 |
|
def initialize(workload = nil, backend = nil) |
12
|
1 |
|
super |
13
|
|
|
|
14
|
1 |
|
@logger = Logging.logger['nose::loader::csvloader'] |
15
|
|
|
end |
16
|
|
|
|
17
|
|
|
# Load data for all the indexes |
18
|
1 |
|
def load(indexes, config, show_progress = false, limit = nil, |
19
|
|
|
skip_existing = true) |
20
|
1 |
|
simple_indexes = find_simple_indexes indexes, skip_existing |
21
|
1 |
|
simple_indexes.each do |entity, simple_index_list| |
22
|
1 |
|
filename = File.join config[:directory], "#{entity.name}.csv" |
23
|
1 |
|
total_rows = (limit || 0) - 1 # account for header row |
24
|
4 |
|
File.open(filename) { |file| file.each_line { total_rows += 1 } } |
25
|
|
|
|
26
|
|
|
progress = initialize_progress entity, simple_index_list, |
27
|
1 |
|
total_rows if show_progress |
28
|
1 |
|
load_file_indexes filename, entity, simple_index_list, progress |
29
|
|
|
end |
30
|
|
|
end |
31
|
|
|
|
32
|
1 |
|
private |
33
|
|
|
|
34
|
|
|
# Find the simple indexes we should populate |
35
|
|
|
# @return [Hash<Entity, Index>] |
36
|
1 |
|
def find_simple_indexes(indexes, skip_existing) |
37
|
1 |
|
simple_indexes = indexes.select do |index| |
38
|
|
|
index.graph.size == 1 && |
39
|
1 |
|
!(skip_existing && [email protected]_empty?(index)) |
40
|
|
|
end |
41
|
|
|
|
42
|
1 |
|
simple_indexes.group_by do |index| |
43
|
1 |
|
index.hash_fields.first.parent |
44
|
|
|
end |
45
|
|
|
end |
46
|
|
|
|
47
|
|
|
# Initialize a progress bar to reporting loading results |
48
|
|
|
# @return [Formatador::ProgressBar] |
49
|
1 |
|
def initialize_progress(entity, simple_index_list, total_rows) |
50
|
|
|
@logger.info "Loading simple indexes for #{entity.name}" |
51
|
|
|
@logger.info simple_index_list.map(&:key).join(', ') |
52
|
|
|
|
53
|
|
|
Formatador.new.redisplay_progressbar 0, total_rows |
54
|
|
|
Formatador::ProgressBar.new total_rows, started_at: Time.now.utc |
55
|
|
|
end |
56
|
|
|
|
57
|
|
|
# Load all indexes for a given file |
58
|
|
|
# @return [void] |
59
|
1 |
|
def load_file_indexes(filename, entity, simple_index_list, progress) |
60
|
|
|
SmarterCSV.process(filename, |
61
|
|
|
downcase_header: false, |
62
|
|
|
chunk_size: 1000, |
63
|
1 |
|
convert_values_to_numeric: false) do |chunk| |
64
|
|
|
Parallel.each(chunk.each_slice(100), |
65
|
1 |
|
finish: (lambda do |_, _, _| |
66
|
1 |
|
next if progress.nil? |
67
|
|
|
inc = [progress.total - progress.current, 100].min |
68
|
|
|
progress.increment inc |
69
|
1 |
|
end)) do |minichunk| |
70
|
1 |
|
load_simple_chunk minichunk, entity, simple_index_list |
71
|
|
|
end |
72
|
|
|
end |
73
|
|
|
end |
74
|
|
|
|
75
|
|
|
# Load a chunk of data from a simple entity index |
76
|
|
|
# @return [void] |
77
|
1 |
|
def load_simple_chunk(chunk, entity, indexes) |
78
|
|
|
# Prefix all hash keys with the entity name and convert values |
79
|
1 |
|
chunk.map! do |row| |
80
|
1 |
|
index_row = {} |
81
|
1 |
|
row.each_key do |key| |
82
|
3 |
|
field_class = entity[key.to_s].class |
83
|
3 |
|
value = field_class.value_from_string row[key] |
84
|
3 |
|
index_row["#{entity.name}_#{key}"] = value |
85
|
|
|
end |
86
|
|
|
|
87
|
1 |
|
index_row |
88
|
|
|
end |
89
|
|
|
|
90
|
|
|
# Insert the batch into the index |
91
|
1 |
|
indexes.each do |index| |
92
|
1 |
|
@backend.index_insert_chunk index, chunk |
93
|
|
|
end |
94
|
|
|
end |
95
|
|
|
end |
96
|
|
|
end |
97
|
|
|
end |
98
|
|
|
|