Completed
Push — master ( ed965e...44e1f8 )
by Michael
04:01
created

SortStatementStep.initialize()   A

Complexity

Conditions 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 4
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# frozen_string_literal: true
2
3 1
module NoSE
4
  # Communication with backends for index creation and statement execution
5 1
  module Backend
6
    # Superclass of all database backends
7 1
    class Backend
8 1
      include Listing
9 1
      include Supertype
10
11 1
      def initialize(model, indexes, plans, update_plans, _config)
12 15
        @model = model
13 15
        @indexes = indexes
14 15
        @plans = plans
15 15
        @update_plans = update_plans
16
      end
17
18
      # By default, do not use ID graphs
19
      # @return [Boolean]
20 1
      def by_id_graph
21 4
        false
22
      end
23
24
      # @abstract Subclasses implement to check if an index is empty
25
      # @return [Boolean]
26 1
      def index_empty?(_index)
27
        true
28
      end
29
30
      # @abstract Subclasses implement to check if an index already exists
31
      # @return [Boolean]
32 1
      def index_exists?(_index)
33
        false
34
      end
35
36
      # @abstract Subclasses implement to remove existing indexes
37
      # @return [void]
38 1
      def drop_index
39
      end
40
41
      # @abstract Subclasses implement to allow inserting
42
      #           data into the backend database
43
      # :nocov:
44
      # @return [void]
45 1
      def index_insert_chunk(_index, _chunk)
46
        fail NotImplementedError
47
      end
48
      # :nocov:
49
50
      # @abstract Subclasses implement to generate a new random ID
51
      # :nocov:
52
      # @return [Object]
53 1
      def generate_id
54
        fail NotImplementedError
55
      end
56
      # :nocov:
57
58
      # @abstract Subclasses should create indexes
59
      # :nocov:
60
      # @return [Enumerable]
61 1
      def indexes_ddl(_execute = false, _skip_existing = false,
62
                      _drop_existing = false)
63
        fail NotImplementedError
64
      end
65
      # :nocov:
66
67
      # @abstract Subclasses should return sample values from the index
68
      # :nocov:
69
      # @return [Array<Hash>]
70 1
      def indexes_sample(_index, _count)
71
        fail NotImplementedError
72
      end
73
      # :nocov:
74
75
      # Prepare a query to be executed with the given plans
76
      # @return [PreparedQuery]
77 1
      def prepare_query(query, fields, conditions, plans = [])
78 11
        plan = plans.empty? ? find_query_plan(query) : plans.first
79
80 11
        state = Plans::QueryState.new(query, @model) unless query.nil?
81 11
        first_step = Plans::RootPlanStep.new state
82 11
        steps = [first_step] + plan.to_a + [nil]
83 11
        PreparedQuery.new query, prepare_query_steps(steps, fields, conditions)
84
      end
85
86
      # Prepare a statement to be executed with the given plans
87 1
      def prepare(statement, plans = [])
88 7
        if statement.is_a? Query
89 1
          prepare_query statement, statement.all_fields,
90
                        statement.conditions, plans
91 6
        elsif statement.is_a? Delete
92 3
          prepare_update statement, plans
93 3
        elsif statement.is_a? Disconnect
94 1
          prepare_update statement, plans
95 2
        elsif statement.is_a? Connection
96 1
          prepare_update statement, plans
97
        else
98 1
          prepare_update statement, plans
99
        end
100
      end
101
102
      # Execute a query with the stored plans
103
      # @return [Array<Hash>]
104 1
      def query(query, plans = [])
105
        prepared = prepare query, plans
106
        prepared.execute query.conditions
107
      end
108
109
      # Prepare an update for execution
110
      # @return [PreparedUpdate]
111 1
      def prepare_update(update, plans)
112
        # Search for plans if they were not given
113 10
        plans = find_update_plans(update) if plans.empty?
114 10
        fail PlanNotFound if plans.empty?
115
116
        # Prepare each plan
117 10
        plans.map do |plan|
118 10
          delete = false
119 10
          insert = false
120 10
          plan.update_steps.each do |step|
121 10
            delete = true if step.is_a?(Plans::DeletePlanStep)
122 10
            insert = true if step.is_a?(Plans::InsertPlanStep)
123
          end
124
125 10
          steps = []
126 10
          add_delete_step(plan, steps) if delete
127 10
          add_insert_step(plan, steps, plan.update_fields) if insert
128
129 10
          PreparedUpdate.new update, prepare_support_plans(plan), steps
130
        end
131
      end
132
133
      # Execute an update with the stored plans
134
      # @return [void]
135 1
      def update(update, plans = [])
136
        prepared = prepare_update update, plans
137
        prepared.each { |p| p.execute update.settings, update.conditions }
138
      end
139
140
      # Superclass for all statement execution steps
141 1
      class StatementStep
142 1
        include Supertype
143 1
        attr_reader :index
144
      end
145
146
      # Look up data on an index in the backend
147 1
      class IndexLookupStatementStep < StatementStep
148 1
        def initialize(client, _select, _conditions,
0 ignored issues
show
Coding Style introduced by
Your code style disallows parameter lists longer than 5 parameters. Try using a configuration object instead.
Loading history...
149
                       step, next_step, prev_step)
150 13
          @client = client
151 13
          @step = step
152 13
          @index = step.index
153 13
          @prev_step = prev_step
154 13
          @next_step = next_step
155
156 13
          @eq_fields = step.eq_filter
157 13
          @range_field = step.range_filter
158
        end
159
160 1
        protected
161
162
        # Get lookup values from the query for the first step
163 1
        def initial_results(conditions)
164
          [Hash[conditions.map do |field_id, condition|
165 15
            fail if condition.value.nil?
166 15
            [field_id, condition.value]
167 15
          end]]
168
        end
169
170
        # Construct a list of conditions from the results
171 1
        def result_conditions(conditions, results)
172 15
          results.map do |result|
173 15
            result_condition = @eq_fields.map do |field|
174 15
              Condition.new field, :'=', result[field.id]
175
            end
176
177 15
            unless @range_field.nil?
178
              operator = conditions.each_value.find(&:range?).operator
179
              result_condition << Condition.new(@range_field, operator,
180
                                                result[@range_field.id])
181
            end
182
183 15
            result_condition
184
          end
185
        end
186
187
        # Decide which fields should be selected
188 1
        def expand_selected_fields(select)
189
          # We just pick whatever is contained in the index that is either
190
          # mentioned in the query or required for the next lookup
191
          # TODO: Potentially try query.all_fields for those not required
192
          #       It should be sufficient to check what is needed for future
193
          #       filtering and sorting and use only those + query.select
194
          select += @next_step.index.hash_fields \
195
            unless @next_step.nil? ||
196 2
                   !@next_step.is_a?(Plans::IndexLookupPlanStep)
197 2
          select &= @step.index.all_fields
198
199 2
          select
200
        end
201
      end
202
203
      # Insert data into an index on the backend
204 1
      class InsertStatementStep < StatementStep
205 1
        def initialize(client, index, _fields)
206 8
          @client = client
207 8
          @index = index
208
        end
209
      end
210
211
      # Delete data from an index on the backend
212 1
      class DeleteStatementStep < StatementStep
213 1
        def initialize(client, index)
214 5
          @client = client
215 5
          @index = index
216
        end
217
      end
218
219
      # Perform filtering external to the backend
220 1
      class FilterStatementStep < StatementStep
221 1
        def initialize(_client, _fields, _conditions,
0 ignored issues
show
Coding Style introduced by
Your code style disallows parameter lists longer than 5 parameters. Try using a configuration object instead.
Loading history...
222
                       step, _next_step, _prev_step)
223 2
          @step = step
224
        end
225
226
        # Filter results by a list of fields given in the step
227
        # @return [Array<Hash>]
228 1
        def process(conditions, results)
229
          # Extract the equality conditions
230 2
          eq_conditions = conditions.values.select do |condition|
231 3
            !condition.range? && @step.eq.include?(condition.field)
232
          end
233
234
          # XXX: This assumes that the range filter step is the same as
235
          #      the one in the query, which is always true for now
236 2
          range = @step.range && conditions.each_value.find(&:range?)
237
238 6
          results.select! { |row| include_row?(row, eq_conditions, range) }
239
240 2
          results
241
        end
242
243 1
        private
244
245
        # Check if the row should be included in the result
246
        # @return [Boolean]
247 1
        def include_row?(row, eq_conditions, range)
248 4
          select = eq_conditions.all? do |condition|
249 2
            row[condition.field.id] == condition.value
250
          end
251
252 4
          if range
253 2
            range_check = row[range.field.id].method(range.operator)
254 2
            select &&= range_check.call range.value
255
          end
256
257 4
          select
258
        end
259
      end
260
261
      # Perform sorting external to the backend
262 1
      class SortStatementStep < StatementStep
263 1
        def initialize(_client, _fields, _conditions,
0 ignored issues
show
Coding Style introduced by
Your code style disallows parameter lists longer than 5 parameters. Try using a configuration object instead.
Loading history...
264
                       step, _next_step, _prev_step)
265 1
          @step = step
266
        end
267
268
        # Sort results by a list of fields given in the step
269
        # @return [Array<Hash>]
270 1
        def process(_conditions, results)
271 1
          results.sort_by! do |row|
272 2
            @step.sort_fields.map do |field|
273 2
              row[field.id]
274
            end
275
          end
276
        end
277
      end
278
279
      # Perform a client-side limit of the result set size
280 1
      class LimitStatementStep < StatementStep
281 1
        def initialize(_client, _fields, _conditions,
0 ignored issues
show
Coding Style introduced by
Your code style disallows parameter lists longer than 5 parameters. Try using a configuration object instead.
Loading history...
282
                       step, _next_step, _prev_step)
283 1
          @limit = step.limit
284
        end
285
286
        # Remove results past the limit
287
        # @return [Array<Hash>]
288 1
        def process(_conditions, results)
289 1
          results[0..@limit - 1]
290
        end
291
      end
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
292
293 1
      private
294
295
      # Find plans for a given query
296
      # @return [Plans::QueryPlan]
297 1
      def find_query_plan(query)
298
        plan = @plans.find do |possible_plan|
299
          possible_plan.query == query
300
        end unless query.nil?
301
        fail PlanNotFound if plan.nil?
302
303
        plan
304
      end
305
306
      # Prepare all the steps for executing a given query
307
      # @return [Array<StatementStep>]
308 1
      def prepare_query_steps(steps, fields, conditions)
309 11
        steps.each_cons(3).map do |prev_step, step, next_step|
310 11
          step_class = StatementStep.subtype_class step.subtype_name
311
312
          # Check if the subclass has overridden this step
313 11
          subclass_step_name = step_class.name.sub \
314
            'NoSE::Backend::Backend', self.class.name
315 11
          step_class = Object.const_get subclass_step_name
316 11
          step_class.new client, fields, conditions,
317
                         step, next_step, prev_step
318
        end
319
      end
320
321
      # Find plans for a given update
322
      # @return [Array<Plans::UpdatePlan>]
323 1
      def find_update_plans(update)
324
        @update_plans.select do |possible_plan|
325
          possible_plan.statement == update
326
        end
327
      end
328 View Code Duplication
329
      # Add a delete step to a prepared update plan
330
      # @return [void]
331 1
      def add_delete_step(plan, steps)
332 4
        step_class = DeleteStatementStep
333 4
        subclass_step_name = step_class.name.sub \
334
          'NoSE::Backend::Backend', self.class.name
335 4
        step_class = Object.const_get subclass_step_name
336 4
        steps << step_class.new(client, plan.index)
337
      end
338 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
339
      # Add an insert step to a prepared update plan
340
      # @return [void]
341 1
      def add_insert_step(plan, steps, fields)
342 6
        step_class = InsertStatementStep
343 6
        subclass_step_name = step_class.name.sub \
344
          'NoSE::Backend::Backend', self.class.name
345 6
        step_class = Object.const_get subclass_step_name
346 6
        steps << step_class.new(client, plan.index, fields)
347
      end
348
349
      # Prepare plans for each support query
350
      # @return [Array<PreparedQuery>]
351 1
      def prepare_support_plans(plan)
352 10
        plan.query_plans.map do |query_plan|
353 8
          query = query_plan.instance_variable_get(:@query)
354 8
          prepare_query query, query_plan.select_fields, query_plan.params,
355
                        [query_plan.steps]
356
        end
357
      end
358
    end
359
360
    # A prepared query which can be executed against the backend
361 1
    class PreparedQuery
362 1
      attr_reader :query, :steps
363
364 1
      def initialize(query, steps)
365 11
        @query = query
366 11
        @steps = steps
367
      end
368
369
      # Execute the query for the given set of conditions
370
      # @return [Array<Hash>]
371 1
      def execute(conditions)
372 13
        results = nil
373
374 13
        @steps.each do |step|
375 13
          if step.is_a?(Backend::IndexLookupStatementStep)
376 13
            field_ids = step.index.all_fields.map(&:id)
377 28
            field_conds = conditions.select { |key| field_ids.include? key }
378
          else
379
            field_conds = conditions
380
          end
381 13
          results = step.process field_conds, results
382
383
          # The query can't return any results at this point, so we're done
384 13
          break if results.empty?
385
        end
386
387
        # Only return fields selected by the query if one is given
388
        # (we have no query to refer to for manually-defined plans)
389 13
        unless @query.nil?
390 11
          select_ids = @query.select.map(&:id).to_set
391 40
          results.map { |row| row.select! { |k, _| select_ids.include? k } }
392
        end
393
394 13
        results
395
      end
396
    end
397
398
    # An update prepared with a backend which is ready to execute
399 1
    class PreparedUpdate
400 1
      attr_reader :statement, :steps
401
402 1
      def initialize(statement, support_plans, steps)
403 10
        @statement = statement
404 10
        @support_plans = support_plans
405 10
        @delete_step = steps.find do |step|
406 10
          step.is_a? Backend::DeleteStatementStep
407
        end
408 10
        @insert_step = steps.find do |step|
409 10
          step.is_a? Backend::InsertStatementStep
410
        end
411
      end
412
413
      # Execute the statement for the given set of conditions
414
      # @return [void]
415 1
      def execute(update_settings, update_conditions)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for execute is considered too high. [25.5/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
Coding Style introduced by
This method is 23 lines long. Your coding style permits a maximum length of 20.
Loading history...
416
        # Execute all the support queries
417 10
        settings = initial_update_settings update_settings, update_conditions
418
419
        # Execute the support queries for this update
420 10
        support = support_results update_conditions
421
422
        # Perform the deletion
423 10
        @delete_step.process support unless support.empty? || @delete_step.nil?
424 10
        return if @insert_step.nil?
425
426
        # Get the fields which should be used from the original statement
427
        # If we didn't delete old entries, then we just need the primary key
428
        # attributes of the index, otherwise we need everything
429 6
        index = @insert_step.index
430 6
        include_fields = if @delete_step.nil?
431 6
                           index.hash_fields + index.order_fields
432
                         else
433
                           index.all_fields
434
                         end
435
436
        # Add fields from the original statement
437 6
        update_conditions.each_value do |condition|
438 5
          next unless include_fields.include? condition.field
439 5
          settings.merge! condition.field.id => condition.value
440
        end
441
442 6
        if support.empty?
443 5
          support = [settings]
444
        else
445 1
          support.each do |row|
446 3
            row.merge!(settings) { |_, value, _| value }
447
          end
448
        end
449
450
        # Stop if we have nothing to insert, otherwise insert
451 6
        return if support.empty?
452 6
        @insert_step.process support
453
      end
454
455 1
      private
456
457
      # Get the initial values which will be used in the first plan step
458
      # @return [Hash]
459 1
      def initial_update_settings(update_settings, update_conditions)
460 10
        if !@insert_step.nil? && @delete_step.nil?
461
          # Populate the data to insert for Insert statements
462 6
          settings = Hash[update_settings.map do |setting|
463 11
            [setting.field.id, setting.value]
464
          end]
465
        else
466
          # Get values for updates and deletes
467 4
          settings = Hash[update_conditions.map do |field_id, condition|
468 5
            [field_id, condition.value]
469
          end]
470
        end
471
472 10
        settings
473
      end
474
475
      # Execute all the support queries
476
      # @return [Array<Hash>]
477 1
      def support_results(settings)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for support_results is considered too high. [63.85/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
Coding Style introduced by
The method support_results seems to be too complex. Perceived cyclomatic complexity is 11 with a maxiumum of 10 permitted.
Loading history...
Coding Style introduced by
This method is 49 lines long. Your coding style permits a maximum length of 20.
Loading history...
Complexity Coding Style introduced by
The method support_results seems to be too complex. Perceived complexity is 14 with a maxiumum of 10 permitted.
Loading history...
478 10
        return [] if @support_plans.empty?
479
480
        # Get a hash of values used in settings, first
481
        # resolving any settings which specify foreign keys
482 5
        settings = Hash[settings.map do |k, v|
0 ignored issues
show
Unused Code introduced by
You are passing the argument k into your block, which is not used by it. If this argument is necessary, name it _ or _kto mark it as deliberately unused.
Loading history...
483 7
          new_condition = v.resolve_foreign_key
484 7
          [new_condition.field.id, new_condition]
485
        end]
486 12
        setting_values = Hash[settings.map { |k, v| [k, v.value] }]
487
488
        # If we have no query for IDs on the first entity, we must
489
        # have the fields we need to execute the other support queries
490
        if [email protected]? &&
491 5
           @support_plans.first.query.entity != @statement.entity
492 2
          support = @support_plans.map do |plan|
493 2
            plan.execute settings
494
          end
495
496
          # Combine the results from multiple support queries
497 2
          unless support.empty?
498 2
            support = support.first.product(*support[1..-1])
499 2
            support.map! do |results|
500 2
              results.reduce(&:merge!).merge!(setting_values)
501
            end
502
          end
503
        else
504
          # Execute the first support query to get a list of IDs
505 3
          first_query = @support_plans.first.query
506
507
          # We may not have a statement if this is manually defined
508 3
          if @statement.nil?
509
            select_key = false
510
            entity_fields = nil
511
          else
512 3
            id = @statement.entity.id_field
513 3
            select_key = first_query.select.include? id
514
515
            # Select any fields from the entity being modified if required
516
            entity_fields = @support_plans.first.execute settings \
517
              if first_query.graph.size == 1 && \
518 3
                 first_query.graph.entities.first == @statement.entity
519
          end
520
521 3
          if select_key
522
            # Pull the IDs from the first support query
523 1
            conditions = entity_fields.map do |row|
524 1
              { id.id => Condition.new(id, :'=', row[id.id]) }
525
            end
526
          else
527
            # Use the ID specified in the statement conditions
528 2
            conditions = [settings]
529
          end
530
531
          # Execute the support queries for each ID
532 3
          support = conditions.each_with_index.flat_map do |condition, i|
533 3
            results = @support_plans[(select_key ? 1 : 0)..-1].map do |plan|
534 5
              plan.execute condition
535
            end
536
537
            # Combine the results of the different support queries
538 3
            results[0].product(*results[1..-1]).map do |result|
539 2
              row = result.reduce(&:merge!)
540 2
              row.merge!(entity_fields[i]) unless entity_fields.nil?
541 2
              row.merge!(setting_values)
542
543 2
              row
544
            end
545
          end
546
        end
547
548 5
        support
549
      end
550
    end
551
552
    # Raised when a statement is executed that we have no plan for
553 1
    class PlanNotFound < StandardError
554
    end
555
556
    # Raised when a backend attempts to create an index that already exists
557 1
    class IndexAlreadyExists < StandardError
558
    end
559
  end
560
end
561
562 1
require_relative 'backend/cassandra'
563 1
require_relative 'backend/file'
564
require_relative 'backend/mongo'
565