|
1
|
|
|
# frozen_string_literal: true |
|
2
|
|
|
|
|
3
|
1 |
|
require 'formatador' |
|
4
|
1 |
|
require 'smarter_csv' |
|
5
|
1 |
|
require 'zlib' |
|
6
|
|
|
|
|
7
|
1 |
|
module NoSE |
|
8
|
1 |
|
module Loader |
|
9
|
|
|
# Load data into an index from a set of CSV files |
|
10
|
1 |
|
class CsvLoader < LoaderBase |
|
11
|
1 |
|
def initialize(workload = nil, backend = nil) |
|
12
|
1 |
|
super |
|
13
|
|
|
|
|
14
|
1 |
|
@logger = Logging.logger['nose::loader::csvloader'] |
|
15
|
|
|
end |
|
16
|
|
|
|
|
17
|
|
|
# Load data for all the indexes |
|
18
|
1 |
|
def load(indexes, config, show_progress = false, limit = nil, |
|
19
|
|
|
skip_existing = true) |
|
20
|
1 |
|
simple_indexes = find_simple_indexes indexes, skip_existing |
|
21
|
1 |
|
simple_indexes.each do |entity, simple_index_list| |
|
22
|
1 |
|
filename = File.join config[:directory], "#{entity.name}.csv" |
|
23
|
1 |
|
total_rows = (limit || 0) - 1 # account for header row |
|
24
|
4 |
|
File.open(filename) { |file| file.each_line { total_rows += 1 } } |
|
25
|
|
|
|
|
26
|
|
|
progress = initialize_progress entity, simple_index_list, |
|
27
|
1 |
|
total_rows if show_progress |
|
28
|
1 |
|
load_file_indexes filename, entity, simple_index_list, progress |
|
29
|
|
|
end |
|
30
|
|
|
end |
|
31
|
|
|
|
|
32
|
1 |
|
private |
|
33
|
|
|
|
|
34
|
|
|
# Find the simple indexes we should populate |
|
35
|
|
|
# @return [Hash<Entity, Index>] |
|
36
|
1 |
|
def find_simple_indexes(indexes, skip_existing) |
|
37
|
1 |
|
simple_indexes = indexes.select do |index| |
|
38
|
|
|
index.graph.size == 1 && |
|
39
|
1 |
|
!(skip_existing && [email protected]_empty?(index)) |
|
40
|
|
|
end |
|
41
|
|
|
|
|
42
|
1 |
|
simple_indexes.group_by do |index| |
|
43
|
1 |
|
index.hash_fields.first.parent |
|
44
|
|
|
end |
|
45
|
|
|
end |
|
46
|
|
|
|
|
47
|
|
|
# Initialize a progress bar to reporting loading results |
|
48
|
|
|
# @return [Formatador::ProgressBar] |
|
49
|
1 |
|
def initialize_progress(entity, simple_index_list, total_rows) |
|
50
|
|
|
@logger.info "Loading simple indexes for #{entity.name}" |
|
51
|
|
|
@logger.info simple_index_list.map(&:key).join(', ') |
|
52
|
|
|
|
|
53
|
|
|
Formatador.new.redisplay_progressbar 0, total_rows |
|
54
|
|
|
Formatador::ProgressBar.new total_rows, started_at: Time.now.utc |
|
55
|
|
|
end |
|
56
|
|
|
|
|
57
|
|
|
# Load all indexes for a given file |
|
58
|
|
|
# @return [void] |
|
59
|
1 |
|
def load_file_indexes(filename, entity, simple_index_list, progress) |
|
60
|
|
|
SmarterCSV.process(filename, |
|
61
|
|
|
downcase_header: false, |
|
62
|
|
|
chunk_size: 1000, |
|
63
|
1 |
|
convert_values_to_numeric: false) do |chunk| |
|
64
|
|
|
Parallel.each(chunk.each_slice(100), |
|
65
|
1 |
|
finish: (lambda do |_, _, _| |
|
66
|
1 |
|
next if progress.nil? |
|
67
|
|
|
inc = [progress.total - progress.current, 100].min |
|
68
|
|
|
progress.increment inc |
|
69
|
1 |
|
end)) do |minichunk| |
|
70
|
1 |
|
load_simple_chunk minichunk, entity, simple_index_list |
|
71
|
|
|
end |
|
72
|
|
|
end |
|
73
|
|
|
end |
|
74
|
|
|
|
|
75
|
|
|
# Load a chunk of data from a simple entity index |
|
76
|
|
|
# @return [void] |
|
77
|
1 |
|
def load_simple_chunk(chunk, entity, indexes) |
|
78
|
|
|
# Prefix all hash keys with the entity name and convert values |
|
79
|
1 |
|
chunk.map! do |row| |
|
80
|
1 |
|
index_row = {} |
|
81
|
1 |
|
row.each_key do |key| |
|
82
|
3 |
|
field_class = entity[key.to_s].class |
|
83
|
3 |
|
value = field_class.value_from_string row[key] |
|
84
|
3 |
|
index_row["#{entity.name}_#{key}"] = value |
|
85
|
|
|
end |
|
86
|
|
|
|
|
87
|
1 |
|
index_row |
|
88
|
|
|
end |
|
89
|
|
|
|
|
90
|
|
|
# Insert the batch into the index |
|
91
|
1 |
|
indexes.each do |index| |
|
92
|
1 |
|
@backend.index_insert_chunk index, chunk |
|
93
|
|
|
end |
|
94
|
|
|
end |
|
95
|
|
|
end |
|
96
|
|
|
end |
|
97
|
|
|
end |
|
98
|
|
|
|