1 | # frozen_string_literal: true |
||
2 | |||
3 | # This is optional so other things can run under JRuby, |
||
4 | # however this loader won't work so we need to use MRI |
||
5 | begin |
||
6 | 1 | require 'mysql2' |
|
7 | rescue LoadError |
||
8 | require 'mysql' |
||
9 | end |
||
10 | |||
11 | 1 | module NoSE |
|
12 | 1 | module Loader |
|
13 | # Load data from a MySQL database into a backend |
||
14 | 1 | class MysqlLoader < LoaderBase |
|
15 | 1 | def initialize(workload = nil, backend = nil) |
|
16 | 3 | @logger = Logging.logger['nose::loader::mysqlloader'] |
|
17 | |||
18 | 3 | @workload = workload |
|
19 | 3 | @backend = backend |
|
20 | end |
||
21 | |||
22 | # Load a generated set of indexes with data from MySQL |
||
23 | 1 | def load(indexes, config, show_progress = false, limit = nil, |
|
24 | skip_existing = true) |
||
25 | 2 | indexes.map!(&:to_id_graph).uniq! if @backend.by_id_graph |
|
26 | |||
27 | # XXX Assuming backend is thread-safe |
||
28 | 2 | Parallel.each(indexes, in_threads: 2) do |index| |
|
29 | 2 | load_index index, config, show_progress, limit, skip_existing |
|
30 | end |
||
31 | end |
||
32 | |||
33 | # Read all tables in the database and construct a workload object |
||
34 | 1 | def workload(config) |
|
35 | 1 | client = new_client config |
|
36 | |||
37 | 1 | workload = Workload.new |
|
38 | 1 | results = if @array_options |
|
39 | client.query('SHOW TABLES').each(**@array_options) |
||
40 | else |
||
41 | 1 | client.query('SHOW TABLES').each |
|
42 | end |
||
43 | |||
44 | 1 | results.each do |table, *| |
|
45 | # TODO: Handle foreign keys |
||
46 | 1 | workload << entity_for_table(client, table) |
|
47 | end |
||
48 | |||
49 | 1 | workload |
|
50 | end |
||
51 | |||
52 | 1 | private |
|
53 | |||
54 | # Create a new client from the given configuration |
||
55 | 1 | def new_client(config) |
|
56 | 2 | if Object.const_defined?(:Mysql2) |
|
57 | 2 | @query_options = { stream: true, cache_rows: false } |
|
58 | 2 | @array_options = { as: :array } |
|
59 | 2 | Mysql2::Client.new host: config[:host], |
|
60 | username: config[:username], |
||
61 | password: config[:password], |
||
62 | database: config[:database] |
||
63 | else |
||
64 | @query_options = false |
||
65 | @array_options = false |
||
66 | Mysql.connect config[:host], config[:username], config[:password], |
||
67 | config[:database] |
||
68 | end |
||
69 | end |
||
70 | |||
71 | # Load a single index into the backend |
||
72 | # @return [void] |
||
73 | 1 | def load_index(index, config, show_progress, limit, skip_existing) |
|
0 ignored issues
–
show
Coding Style
introduced
by
Loading history...
|
|||
74 | 2 | client = new_client config |
|
75 | |||
76 | # Skip this index if it's not empty |
||
77 | 2 | if skip_existing && [email protected]_empty?(index) |
|
78 | @logger.info "Skipping index #{index.inspect}" if show_progress |
||
79 | return |
||
80 | end |
||
81 | 2 | @logger.info index.inspect if show_progress |
|
82 | |||
83 | 2 | sql, fields = index_sql index, limit |
|
84 | 2 | results = if @query_options |
|
85 | 2 | client.query(sql, **@query_options) |
|
86 | else |
||
87 | client.query(sql).map { |row| hash_from_row row, fields } |
||
88 | end |
||
89 | |||
90 | 2 | result_chunk = [] |
|
91 | 2 | results.each do |result| |
|
92 | 2 | result_chunk.push result |
|
93 | 2 | next if result_chunk.length < 1000 |
|
94 | |||
95 | @backend.index_insert_chunk index, result_chunk |
||
96 | result_chunk = [] |
||
97 | end |
||
98 | 2 | @backend.index_insert_chunk index, result_chunk \ |
|
99 | 2 | unless result_chunk.empty? |
|
100 | end |
||
101 | |||
102 | # Construct a hash from the given row returned by the client |
||
103 | # @return [Hash] |
||
104 | 1 | def hash_from_row(row, fields) |
|
105 | row_hash = {} |
||
106 | fields.each_with_index do |field, i| |
||
107 | value = field.class.value_from_string row[i] |
||
108 | row_hash[field.id] = value |
||
109 | end |
||
110 | |||
111 | row_hash |
||
112 | end |
||
113 | |||
114 | # Get all the fields selected by this index |
||
115 | 1 | def index_sql_select(index) |
|
116 | 2 | fields = index.hash_fields.to_a + index.order_fields + index.extra.to_a |
|
117 | |||
118 | 2 | [fields, fields.map do |field| |
|
119 | 5 | "#{field.parent.name}.#{field.name} AS " \ |
|
120 | "#{field.parent.name}_#{field.name}" |
||
121 | end] |
||
122 | end |
||
123 | |||
124 | # Get the list of tables along with the join condition |
||
125 | # for a query to fetch index data |
||
126 | # @return [String] |
||
127 | 1 | def index_sql_tables(index) |
|
0 ignored issues
–
show
|
|||
128 | # Create JOIN statements |
||
129 | 2 | tables = index.graph.entities.map(&:name).join ' JOIN ' |
|
130 | 2 | return tables if index.graph.size == 1 |
|
131 | |||
132 | 1 | tables << ' WHERE ' |
|
133 | 1 | tables << index.path.each_cons(2).map do |_prev_key, key| |
|
134 | 1 | key = key.reverse if key.relationship == :many |
|
135 | 1 | "#{key.parent.name}.#{key.name}=" \ |
|
136 | "#{key.entity.name}.#{key.entity.id_field.name}" |
||
137 | end.join(' AND ') |
||
138 | |||
139 | 1 | tables |
|
140 | end |
||
141 | |||
142 | # Construct a SQL statement to fetch the data to populate this index |
||
143 | # @return [String] |
||
144 | 1 | def index_sql(index, limit = nil) |
|
145 | # Get all the necessary fields |
||
146 | 2 | fields, select = index_sql_select index |
|
147 | |||
148 | # Construct the join condition |
||
149 | 2 | tables = index_sql_tables index |
|
150 | |||
151 | 2 | query = "SELECT #{select.join ', '} FROM #{tables}" |
|
152 | 2 | query += " LIMIT #{limit}" unless limit.nil? |
|
153 | |||
154 | 2 | @logger.debug query |
|
155 | 2 | [query, fields] |
|
156 | end |
||
157 | |||
158 | # Generate an entity definition from a given table |
||
159 | # @return [Entity] |
||
160 | 1 | def entity_for_table(client, table) |
|
161 | 1 | entity = Entity.new table |
|
162 | 1 | count = client.query("SELECT COUNT(*) FROM #{table}").first |
|
163 | 1 | entity.count = count.is_a?(Hash) ? count.values.first : count |
|
164 | |||
165 | 1 | describe = if @array_options |
|
166 | client.query("DESCRIBE #{table}").each(**@array_options) |
||
167 | else |
||
168 | 1 | client.query("DESCRIBE #{table}").each |
|
169 | end |
||
170 | |||
171 | 1 | describe.each do |name, type, _, key| |
|
172 | 6 | field_class = key == 'PRI' ? Fields::IDField : field_class(type) |
|
173 | 6 | entity << field_class.new(name) |
|
174 | end |
||
175 | |||
176 | 1 | entity |
|
177 | end |
||
178 | |||
179 | # Produce the Ruby class used to represent a MySQL type |
||
180 | # @return [Class] |
||
181 | 1 | def field_class(type) |
|
182 | 5 | case type |
|
183 | when /datetime/ |
||
184 | 1 | Fields::DateField |
|
185 | when /float/ |
||
186 | 1 | Fields::FloatField |
|
187 | when /text/ |
||
188 | # TODO: Get length |
||
189 | 1 | Fields::StringField |
|
190 | when /varchar\(([0-9]+)\)/ |
||
191 | # TODO: Use length |
||
192 | 1 | Fields::StringField |
|
193 | when /(tiny)?int/ |
||
194 | 1 | Fields::IntegerField |
|
195 | end |
||
196 | end |
||
197 | end |
||
198 | end |
||
199 | end |
||
200 |