1 | # frozen_string_literal: true |
||
2 | |||
3 | 1 | module NoSE |
|
4 | # Communication with backends for index creation and statement execution |
||
5 | 1 | module Backend |
|
6 | # Superclass of all database backends |
||
7 | 1 | class Backend |
|
8 | 1 | include Listing |
|
9 | 1 | include Supertype |
|
10 | |||
11 | 1 | def initialize(model, indexes, plans, update_plans, _config) |
|
12 | 15 | @model = model |
|
13 | 15 | @indexes = indexes |
|
14 | 15 | @plans = plans |
|
15 | 15 | @update_plans = update_plans |
|
16 | end |
||
17 | |||
18 | # By default, do not use ID graphs |
||
19 | # @return [Boolean] |
||
20 | 1 | def by_id_graph |
|
21 | 4 | false |
|
22 | end |
||
23 | |||
24 | # @abstract Subclasses implement to check if an index is empty |
||
25 | # @return [Boolean] |
||
26 | 1 | def index_empty?(_index) |
|
27 | true |
||
28 | end |
||
29 | |||
30 | # @abstract Subclasses implement to check if an index already exists |
||
31 | # @return [Boolean] |
||
32 | 1 | def index_exists?(_index) |
|
33 | false |
||
34 | end |
||
35 | |||
36 | # @abstract Subclasses implement to remove existing indexes |
||
37 | # @return [void] |
||
38 | 1 | def drop_index |
|
0 ignored issues
–
show
coding-style
introduced
by
Loading history...
|
|||
39 | end |
||
40 | |||
41 | # @abstract Subclasses implement to allow inserting |
||
42 | # data into the backend database |
||
43 | # :nocov: |
||
44 | # @return [void] |
||
45 | 1 | def index_insert_chunk(_index, _chunk) |
|
46 | fail NotImplementedError |
||
47 | end |
||
48 | # :nocov: |
||
49 | |||
50 | # @abstract Subclasses implement to generate a new random ID |
||
51 | # :nocov: |
||
52 | # @return [Object] |
||
53 | 1 | def generate_id |
|
54 | fail NotImplementedError |
||
55 | end |
||
56 | # :nocov: |
||
57 | |||
58 | # @abstract Subclasses should create indexes |
||
59 | # :nocov: |
||
60 | # @return [Enumerable] |
||
61 | 1 | def indexes_ddl(_execute = false, _skip_existing = false, |
|
62 | _drop_existing = false) |
||
63 | fail NotImplementedError |
||
64 | end |
||
65 | # :nocov: |
||
66 | |||
67 | # @abstract Subclasses should return sample values from the index |
||
68 | # :nocov: |
||
69 | # @return [Array<Hash>] |
||
70 | 1 | def indexes_sample(_index, _count) |
|
71 | fail NotImplementedError |
||
72 | end |
||
73 | # :nocov: |
||
74 | |||
75 | # Prepare a query to be executed with the given plans |
||
76 | # @return [PreparedQuery] |
||
77 | 1 | def prepare_query(query, fields, conditions, plans = []) |
|
78 | 11 | plan = plans.empty? ? find_query_plan(query) : plans.first |
|
79 | |||
80 | 11 | state = Plans::QueryState.new(query, @model) unless query.nil? |
|
81 | 11 | first_step = Plans::RootPlanStep.new state |
|
82 | 11 | steps = [first_step] + plan.to_a + [nil] |
|
83 | 11 | PreparedQuery.new query, prepare_query_steps(steps, fields, conditions) |
|
84 | end |
||
85 | |||
86 | # Prepare a statement to be executed with the given plans |
||
87 | 1 | def prepare(statement, plans = []) |
|
88 | 7 | if statement.is_a? Query |
|
89 | 1 | prepare_query statement, statement.all_fields, |
|
90 | statement.conditions, plans |
||
91 | 6 | elsif statement.is_a? Delete |
|
92 | 3 | prepare_update statement, plans |
|
93 | 3 | elsif statement.is_a? Disconnect |
|
94 | 1 | prepare_update statement, plans |
|
95 | 2 | elsif statement.is_a? Connection |
|
96 | 1 | prepare_update statement, plans |
|
97 | else |
||
98 | 1 | prepare_update statement, plans |
|
99 | end |
||
100 | end |
||
101 | |||
102 | # Execute a query with the stored plans |
||
103 | # @return [Array<Hash>] |
||
104 | 1 | def query(query, plans = []) |
|
105 | prepared = prepare query, plans |
||
106 | prepared.execute query.conditions |
||
107 | end |
||
108 | |||
109 | # Prepare an update for execution |
||
110 | # @return [PreparedUpdate] |
||
111 | 1 | def prepare_update(update, plans) |
|
112 | # Search for plans if they were not given |
||
113 | 10 | plans = find_update_plans(update) if plans.empty? |
|
114 | 10 | fail PlanNotFound if plans.empty? |
|
115 | |||
116 | # Prepare each plan |
||
117 | 10 | plans.map do |plan| |
|
118 | 10 | delete = false |
|
119 | 10 | insert = false |
|
120 | 10 | plan.update_steps.each do |step| |
|
121 | 10 | delete = true if step.is_a?(Plans::DeletePlanStep) |
|
122 | 10 | insert = true if step.is_a?(Plans::InsertPlanStep) |
|
123 | end |
||
124 | |||
125 | 10 | steps = [] |
|
126 | 10 | add_delete_step(plan, steps) if delete |
|
127 | 10 | add_insert_step(plan, steps, plan.update_fields) if insert |
|
128 | |||
129 | 10 | PreparedUpdate.new update, prepare_support_plans(plan), steps |
|
130 | end |
||
131 | end |
||
132 | |||
133 | # Execute an update with the stored plans |
||
134 | # @return [void] |
||
135 | 1 | def update(update, plans = []) |
|
136 | prepared = prepare_update update, plans |
||
137 | prepared.each { |p| p.execute update.settings, update.conditions } |
||
138 | end |
||
139 | |||
140 | # Superclass for all statement execution steps |
||
141 | 1 | class StatementStep |
|
142 | 1 | include Supertype |
|
143 | 1 | attr_reader :index |
|
144 | end |
||
145 | |||
146 | # Look up data on an index in the backend |
||
147 | 1 | class IndexLookupStatementStep < StatementStep |
|
148 | 1 | def initialize(client, _select, _conditions, |
|
149 | step, next_step, prev_step) |
||
150 | 13 | @client = client |
|
151 | 13 | @step = step |
|
152 | 13 | @index = step.index |
|
153 | 13 | @prev_step = prev_step |
|
154 | 13 | @next_step = next_step |
|
155 | |||
156 | 13 | @eq_fields = step.eq_filter |
|
157 | 13 | @range_field = step.range_filter |
|
158 | end |
||
159 | |||
160 | 1 | protected |
|
161 | |||
162 | # Get lookup values from the query for the first step |
||
163 | 1 | def initial_results(conditions) |
|
164 | 15 | [Hash[conditions.map do |field_id, condition| |
|
165 | 15 | fail if condition.value.nil? |
|
166 | 15 | [field_id, condition.value] |
|
167 | end]] |
||
168 | end |
||
169 | |||
170 | # Construct a list of conditions from the results |
||
171 | 1 | def result_conditions(conditions, results) |
|
172 | 15 | results.map do |result| |
|
173 | 15 | result_condition = @eq_fields.map do |field| |
|
174 | 15 | Condition.new field, :'=', result[field.id] |
|
175 | end |
||
176 | |||
177 | 15 | unless @range_field.nil? |
|
178 | operator = conditions.each_value.find(&:range?).operator |
||
179 | result_condition << Condition.new(@range_field, operator, |
||
180 | result[@range_field.id]) |
||
181 | end |
||
182 | |||
183 | 15 | result_condition |
|
184 | end |
||
185 | end |
||
186 | |||
187 | # Decide which fields should be selected |
||
188 | 1 | def expand_selected_fields(select) |
|
189 | # We just pick whatever is contained in the index that is either |
||
190 | # mentioned in the query or required for the next lookup |
||
191 | # TODO: Potentially try query.all_fields for those not required |
||
192 | # It should be sufficient to check what is needed for future |
||
193 | # filtering and sorting and use only those + query.select |
||
194 | select += @next_step.index.hash_fields \ |
||
195 | 2 | unless @next_step.nil? || |
|
196 | !@next_step.is_a?(Plans::IndexLookupPlanStep) |
||
197 | 2 | select &= @step.index.all_fields |
|
198 | |||
199 | 2 | select |
|
200 | end |
||
201 | end |
||
202 | |||
203 | # Insert data into an index on the backend |
||
204 | 1 | class InsertStatementStep < StatementStep |
|
205 | 1 | def initialize(client, index, _fields) |
|
206 | 8 | @client = client |
|
207 | 8 | @index = index |
|
208 | end |
||
209 | end |
||
210 | |||
211 | # Delete data from an index on the backend |
||
212 | 1 | class DeleteStatementStep < StatementStep |
|
213 | 1 | def initialize(client, index) |
|
214 | 5 | @client = client |
|
215 | 5 | @index = index |
|
216 | end |
||
217 | end |
||
218 | |||
219 | # Perform filtering external to the backend |
||
220 | 1 | class FilterStatementStep < StatementStep |
|
221 | 1 | def initialize(_client, _fields, _conditions, |
|
222 | step, _next_step, _prev_step) |
||
223 | 2 | @step = step |
|
224 | end |
||
225 | |||
226 | # Filter results by a list of fields given in the step |
||
227 | # @return [Array<Hash>] |
||
228 | 1 | def process(conditions, results) |
|
229 | # Extract the equality conditions |
||
230 | 2 | eq_conditions = conditions.values.select do |condition| |
|
231 | 3 | !condition.range? && @step.eq.include?(condition.field) |
|
232 | end |
||
233 | |||
234 | # XXX: This assumes that the range filter step is the same as |
||
235 | # the one in the query, which is always true for now |
||
236 | 2 | range = @step.range && conditions.each_value.find(&:range?) |
|
237 | |||
238 | 6 | results.select! { |row| include_row?(row, eq_conditions, range) } |
|
239 | |||
240 | 2 | results |
|
241 | end |
||
242 | |||
243 | 1 | private |
|
244 | |||
245 | # Check if the row should be included in the result |
||
246 | # @return [Boolean] |
||
247 | 1 | def include_row?(row, eq_conditions, range) |
|
248 | 4 | select = eq_conditions.all? do |condition| |
|
249 | 2 | row[condition.field.id] == condition.value |
|
250 | end |
||
251 | |||
252 | 4 | if range |
|
253 | 2 | range_check = row[range.field.id].method(range.operator) |
|
254 | 2 | select &&= range_check.call range.value |
|
255 | end |
||
256 | |||
257 | 4 | select |
|
258 | end |
||
259 | end |
||
260 | |||
261 | # Perform sorting external to the backend |
||
262 | 1 | class SortStatementStep < StatementStep |
|
263 | 1 | def initialize(_client, _fields, _conditions, |
|
264 | step, _next_step, _prev_step) |
||
265 | 1 | @step = step |
|
266 | end |
||
267 | |||
268 | # Sort results by a list of fields given in the step |
||
269 | # @return [Array<Hash>] |
||
270 | 1 | def process(_conditions, results) |
|
271 | 1 | results.sort_by! do |row| |
|
272 | 2 | @step.sort_fields.map do |field| |
|
273 | 2 | row[field.id] |
|
274 | end |
||
275 | end |
||
276 | end |
||
277 | end |
||
278 | |||
279 | # Perform a client-side limit of the result set size |
||
280 | 1 | class LimitStatementStep < StatementStep |
|
281 | 1 | def initialize(_client, _fields, _conditions, |
|
282 | step, _next_step, _prev_step) |
||
283 | 1 | @limit = step.limit |
|
284 | end |
||
285 | |||
286 | # Remove results past the limit |
||
287 | # @return [Array<Hash>] |
||
288 | 1 | def process(_conditions, results) |
|
289 | 1 | results[0..@limit - 1] |
|
290 | end |
||
291 | end |
||
292 | |||
293 | 1 | private |
|
294 | |||
295 | # Find plans for a given query |
||
296 | # @return [Plans::QueryPlan] |
||
297 | 1 | def find_query_plan(query) |
|
298 | plan = @plans.find do |possible_plan| |
||
0 ignored issues
–
show
|
|||
299 | possible_plan.query == query |
||
300 | end unless query.nil? |
||
301 | fail PlanNotFound if plan.nil? |
||
302 | |||
303 | plan |
||
304 | end |
||
305 | |||
306 | # Prepare all the steps for executing a given query |
||
307 | # @return [Array<StatementStep>] |
||
308 | 1 | def prepare_query_steps(steps, fields, conditions) |
|
309 | 11 | steps.each_cons(3).map do |prev_step, step, next_step| |
|
310 | 11 | step_class = StatementStep.subtype_class step.subtype_name |
|
311 | |||
312 | # Check if the subclass has overridden this step |
||
313 | 11 | subclass_step_name = step_class.name.sub \ |
|
314 | 'NoSE::Backend::Backend', self.class.name |
||
315 | 11 | step_class = Object.const_get subclass_step_name |
|
316 | 11 | step_class.new client, fields, conditions, |
|
317 | step, next_step, prev_step |
||
318 | end |
||
319 | end |
||
320 | |||
321 | # Find plans for a given update |
||
322 | # @return [Array<Plans::UpdatePlan>] |
||
323 | 1 | def find_update_plans(update) |
|
324 | @update_plans.select do |possible_plan| |
||
325 | possible_plan.statement == update |
||
326 | end |
||
327 | end |
||
328 | |||
329 | # Add a delete step to a prepared update plan |
||
330 | # @return [void] |
||
331 | 1 | def add_delete_step(plan, steps) |
|
332 | 4 | step_class = DeleteStatementStep |
|
333 | 4 | subclass_step_name = step_class.name.sub \ |
|
334 | 'NoSE::Backend::Backend', self.class.name |
||
335 | 4 | step_class = Object.const_get subclass_step_name |
|
336 | 4 | steps << step_class.new(client, plan.index) |
|
337 | end |
||
338 | |||
339 | # Add an insert step to a prepared update plan |
||
340 | # @return [void] |
||
341 | 1 | def add_insert_step(plan, steps, fields) |
|
342 | 6 | step_class = InsertStatementStep |
|
343 | 6 | subclass_step_name = step_class.name.sub \ |
|
344 | 'NoSE::Backend::Backend', self.class.name |
||
345 | 6 | step_class = Object.const_get subclass_step_name |
|
346 | 6 | steps << step_class.new(client, plan.index, fields) |
|
347 | end |
||
348 | |||
349 | # Prepare plans for each support query |
||
350 | # @return [Array<PreparedQuery>] |
||
351 | 1 | def prepare_support_plans(plan) |
|
352 | 10 | plan.query_plans.map do |query_plan| |
|
353 | 8 | query = query_plan.instance_variable_get(:@query) |
|
354 | 8 | prepare_query query, query_plan.select_fields, query_plan.params, |
|
355 | [query_plan.steps] |
||
356 | end |
||
357 | end |
||
358 | end |
||
359 | |||
360 | # A prepared query which can be executed against the backend |
||
361 | 1 | class PreparedQuery |
|
362 | 1 | attr_reader :query, :steps |
|
363 | |||
364 | 1 | def initialize(query, steps) |
|
365 | 11 | @query = query |
|
366 | 11 | @steps = steps |
|
367 | end |
||
368 | |||
369 | # Execute the query for the given set of conditions |
||
370 | # @return [Array<Hash>] |
||
371 | 1 | def execute(conditions) |
|
372 | 13 | results = nil |
|
373 | |||
374 | 13 | @steps.each do |step| |
|
375 | 13 | if step.is_a?(Backend::IndexLookupStatementStep) |
|
376 | 13 | field_ids = step.index.all_fields.map(&:id) |
|
377 | 28 | field_conds = conditions.select { |key| field_ids.include? key } |
|
378 | else |
||
379 | field_conds = conditions |
||
380 | end |
||
381 | 13 | results = step.process field_conds, results |
|
382 | |||
383 | # The query can't return any results at this point, so we're done |
||
384 | 13 | break if results.empty? |
|
385 | end |
||
386 | |||
387 | # Only return fields selected by the query if one is given |
||
388 | # (we have no query to refer to for manually-defined plans) |
||
389 | 13 | unless @query.nil? |
|
390 | 11 | select_ids = @query.select.map(&:id).to_set |
|
391 | 40 | results.map { |row| row.select! { |k, _| select_ids.include? k } } |
|
392 | end |
||
393 | |||
394 | 13 | results |
|
395 | end |
||
396 | end |
||
397 | |||
398 | # An update prepared with a backend which is ready to execute |
||
399 | 1 | class PreparedUpdate |
|
400 | 1 | attr_reader :statement, :steps |
|
401 | |||
402 | 1 | def initialize(statement, support_plans, steps) |
|
403 | 10 | @statement = statement |
|
404 | 10 | @support_plans = support_plans |
|
405 | 10 | @delete_step = steps.find do |step| |
|
406 | 10 | step.is_a? Backend::DeleteStatementStep |
|
407 | end |
||
408 | 10 | @insert_step = steps.find do |step| |
|
409 | 10 | step.is_a? Backend::InsertStatementStep |
|
410 | end |
||
411 | end |
||
412 | |||
413 | # Execute the statement for the given set of conditions |
||
414 | # @return [void] |
||
415 | 1 | def execute(update_settings, update_conditions) |
|
416 | # Execute all the support queries |
||
417 | 10 | settings = initial_update_settings update_settings, update_conditions |
|
418 | |||
419 | # Execute the support queries for this update |
||
420 | 10 | support = support_results update_conditions |
|
421 | |||
422 | # Perform the deletion |
||
423 | 10 | @delete_step.process support unless support.empty? || @delete_step.nil? |
|
424 | 10 | return if @insert_step.nil? |
|
425 | |||
426 | # Get the fields which should be used from the original statement |
||
427 | # If we didn't delete old entries, then we just need the primary key |
||
428 | # attributes of the index, otherwise we need everything |
||
429 | 6 | index = @insert_step.index |
|
430 | 6 | include_fields = if @delete_step.nil? |
|
431 | 6 | index.hash_fields + index.order_fields |
|
432 | else |
||
433 | index.all_fields |
||
434 | end |
||
435 | |||
436 | # Add fields from the original statement |
||
437 | 6 | update_conditions.each_value do |condition| |
|
438 | 5 | next unless include_fields.include? condition.field |
|
439 | 5 | settings.merge! condition.field.id => condition.value |
|
440 | end |
||
441 | |||
442 | 6 | if support.empty? |
|
443 | 5 | support = [settings] |
|
444 | else |
||
445 | 1 | support.each do |row| |
|
446 | 3 | row.merge!(settings) { |_, value, _| value } |
|
447 | end |
||
448 | end |
||
449 | |||
450 | # Stop if we have nothing to insert, otherwise insert |
||
451 | 6 | return if support.empty? |
|
452 | 6 | @insert_step.process support |
|
453 | end |
||
454 | |||
455 | 1 | private |
|
456 | |||
457 | # Get the initial values which will be used in the first plan step |
||
458 | # @return [Hash] |
||
459 | 1 | def initial_update_settings(update_settings, update_conditions) |
|
460 | 10 | if !@insert_step.nil? && @delete_step.nil? |
|
0 ignored issues
–
show
|
|||
461 | # Populate the data to insert for Insert statements |
||
462 | 6 | settings = Hash[update_settings.map do |setting| |
|
463 | 11 | [setting.field.id, setting.value] |
|
464 | end] |
||
465 | else |
||
466 | # Get values for updates and deletes |
||
467 | 4 | settings = Hash[update_conditions.map do |field_id, condition| |
|
468 | 5 | [field_id, condition.value] |
|
469 | end] |
||
470 | end |
||
471 | |||
472 | 10 | settings |
|
473 | end |
||
474 | |||
475 | # Execute all the support queries |
||
476 | # @return [Array<Hash>] |
||
477 | 1 | def support_results(settings) |
|
478 | 10 | return [] if @support_plans.empty? |
|
479 | |||
480 | # Get a hash of values used in settings, first |
||
481 | # resolving any settings which specify foreign keys |
||
482 | 5 | settings = Hash[settings.map do |k, v| |
|
483 | 7 | new_condition = v.resolve_foreign_key |
|
484 | 7 | [new_condition.field.id, new_condition] |
|
485 | end] |
||
486 | 12 | setting_values = Hash[settings.map { |k, v| [k, v.value] }] |
|
487 | |||
488 | # If we have no query for IDs on the first entity, we must |
||
489 | # have the fields we need to execute the other support queries |
||
490 | 5 | if [email protected]? && |
|
491 | @support_plans.first.query.entity != @statement.entity |
||
492 | 2 | support = @support_plans.map do |plan| |
|
493 | 2 | plan.execute settings |
|
494 | end |
||
495 | |||
496 | # Combine the results from multiple support queries |
||
497 | 2 | unless support.empty? |
|
498 | 2 | support = support.first.product(*support[1..-1]) |
|
499 | 2 | support.map! do |results| |
|
500 | 2 | results.reduce(&:merge!).merge!(setting_values) |
|
501 | end |
||
502 | end |
||
503 | else |
||
504 | # Execute the first support query to get a list of IDs |
||
505 | 3 | first_query = @support_plans.first.query |
|
506 | |||
507 | # We may not have a statement if this is manually defined |
||
508 | 3 | if @statement.nil? |
|
509 | select_key = false |
||
510 | entity_fields = nil |
||
511 | else |
||
512 | 3 | id = @statement.entity.id_field |
|
513 | 3 | select_key = first_query.select.include? id |
|
514 | |||
515 | # Select any fields from the entity being modified if required |
||
516 | entity_fields = @support_plans.first.execute settings \ |
||
517 | 3 | if first_query.graph.size == 1 && \ |
|
518 | first_query.graph.entities.first == @statement.entity |
||
519 | end |
||
520 | |||
521 | 3 | if select_key |
|
0 ignored issues
–
show
|
|||
522 | # Pull the IDs from the first support query |
||
523 | 1 | conditions = entity_fields.map do |row| |
|
524 | 1 | { id.id => Condition.new(id, :'=', row[id.id]) } |
|
525 | end |
||
526 | else |
||
527 | # Use the ID specified in the statement conditions |
||
528 | 2 | conditions = [settings] |
|
529 | end |
||
530 | |||
531 | # Execute the support queries for each ID |
||
532 | 3 | support = conditions.each_with_index.flat_map do |condition, i| |
|
533 | 3 | results = @support_plans[(select_key ? 1 : 0)..-1].map do |plan| |
|
534 | 5 | plan.execute condition |
|
535 | end |
||
536 | |||
537 | # Combine the results of the different support queries |
||
538 | 3 | results[0].product(*results[1..-1]).map do |result| |
|
539 | 2 | row = result.reduce(&:merge!) |
|
540 | 2 | row.merge!(entity_fields[i]) unless entity_fields.nil? |
|
541 | 2 | row.merge!(setting_values) |
|
542 | |||
543 | 2 | row |
|
544 | end |
||
545 | end |
||
546 | end |
||
547 | |||
548 | 5 | support |
|
549 | end |
||
550 | end |
||
551 | |||
552 | # Raised when a statement is executed that we have no plan for |
||
553 | 1 | class PlanNotFound < StandardError |
|
554 | end |
||
555 | |||
556 | # Raised when a backend attempts to create an index that already exists |
||
557 | 1 | class IndexAlreadyExists < StandardError |
|
558 | end |
||
559 | end |
||
560 | end |
||
561 | |||
562 | 1 | require_relative 'backend/cassandra' |
|
563 | 1 | require_relative 'backend/file' |
|
564 | require_relative 'backend/mongo' |
||
565 |