Completed
Push — master ( 1de452...64907a )
by Michael
01:25
created

MongoBackend.rows_from_mongo()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
dl 0
loc 10
ccs 6
cts 6
cp 1
crap 2
rs 9.4285
c 1
b 0
f 0
1
# frozen_string_literal: true
2
3 1
require 'mongo'
4
5 1
module NoSE
6 1
  module Backend
7
    # A backend which communicates with MongoDB
8 1
    class MongoBackend < BackendBase
9 1
      def initialize(model, indexes, plans, update_plans, config)
10 3
        super
11
12 3
        @uri = config[:uri]
13 3
        @database = config[:database]
14 3
        Mongo::Logger.logger.level = ::Logger::FATAL
15
      end
16
17
      # MongoDB uses ID graphs for column families
18
      # @return [Boolean]
19 1
      def by_id_graph
20 4
        true
21
      end
22
23
      # Produce a new ObjectId
24
      # @return [BSON::ObjectId]
25 1
      def generate_id
26 2
        BSON::ObjectId.new
27
      end
28
29
      # Create new MongoDB collections for each index
30 1
      def indexes_ddl(execute = false, skip_existing = false,
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for indexes_ddl is considered too high. [51.95/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
Coding Style introduced by
This method is 27 lines long. Your coding style permits a maximum length of 20.
Loading history...
31
                      drop_existing = false)
32 4
        ddl = []
33
34
        # Create the ID graphs for all indexes
35 4
        id_graphs = @indexes.map(&:to_id_graph).uniq
36 4
        id_graphs.map do |id_graph|
37 16
          ddl << "Create #{id_graph.key}"
38 16
          next unless execute
39
40 34
          collection = client.collections.find { |c| c.name == id_graph.key }
41 16
          collection.drop if drop_existing && !collection.nil?
42 16
          client[id_graph.key].create unless skip_existing
43
        end
44
45
        # Create any necessary indexes on the ID graphs
46 4
        index_keys = []
47
        @indexes.sort_by do |index|
48 16
          -(index.hash_fields.to_a + index.order_fields).length
49 4
        end.each do |index|
0 ignored issues
show
Coding Style introduced by
Your coding style requires you to avoid multi-line chains of blocks. They can make code unclear.
Loading history...
50
          # Check if we already have a prefix of this index created
51 16
          keys = index.hash_fields.to_a + index.order_fields
52 40
          next if index_keys.any? { |i| i[keys.length - 1] == keys }
53 16
          index_keys << keys
54
55 16
          id_graph = index.to_id_graph
56 16
          next if id_graph == index
57
58
          # Combine the key paths for all fields to create a compound index
59 8
          index_spec = Hash[keys.map do |key|
60 32
            [self.class.field_path(index, key).join('.'), 1]
61
          end]
62
63 8
          ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})"
64 8
          next unless execute
65
66 8
          client[id_graph.key].indexes.create_one index_spec
67
        end
68
69 4
        ddl
70
      end
71
72
      # Insert a chunk of rows into an index
73
      # @return [Array<BSON::ObjectId>]
74 1
      def index_insert_chunk(index, chunk)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for index_insert_chunk is considered too high. [27/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
75
        # We only need to insert into indexes which are ID graphs
76 2
        fail unless index == index.to_id_graph
77
78 2
        chunk.map! do |row|
79 2
          row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) }
80 2
          index.all_fields.each do |field|
81 6
            field_path = self.class.field_path(index, field)
82 6
            entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] }
83
84 6
            if field_path.last == '_id'
85 2
              entity_hash[field_path.last] = BSON::ObjectId.new
86
            else
87 4
              entity_hash[field_path.last] = row[field.id]
88
            end
89
          end
90
91 2
          row_hash.default_proc = nil
92 2
          row_hash
93
        end
94
95 2
        client[index.key].insert_many(chunk, ordered: false).inserted_ids
96
      end
97
98
      # Sample a number of values from the given index
99 1
      def index_sample(index, count)
100 2
        rows = client[index.to_id_graph.key].aggregate(
101
          [
102
            { '$sample' => { 'size' => count } }
103
          ]
104
        ).to_a
105
106 2
        MongoBackend.rows_from_mongo rows, index
107
      end
108
109
      # Convert documens returned from MongoDB into the format we understand
110
      # @return [Array<Hash>]
111 1
      def self.rows_from_mongo(rows, index, fields = nil)
112 3
        fields = index.all_fields if fields.nil?
113
114 3
        rows.map! do |row|
115 3
          Hash[fields.map do |field|
116 9
            field_path = MongoBackend.field_path(index, field)
117 18
            [field.id, field_path.reduce(row) { |h, p| h[p] }]
118
          end]
119
        end
120
      end
121
122
      # Find the path to a given field
123
      # @return [Array<String>]
124 1
      def self.field_path(index, field)
125
        # Find the path from the hash entity to the given key
126 53
        field_path = index.graph.path_between index.hash_fields.first.parent,
127
                                              field.parent
128 53
        field_path = field_path.path_for_field(field)
129
130
        # Use _id for any primary keys
131 53
        field_path[-1] = '_id' if field.is_a? Fields::IDField
132
133 53
        field_path
134
      end
135
136
      # Insert data into an index on the backend
137 1
      class InsertStatementStep < BackendBase::InsertStatementStep
138 1
        def initialize(client, index, fields)
139 2
          super
140
141 2
          @fields = fields.map(&:id) & index.all_fields.map(&:id)
142
        end
143
144
        # Insert each row into the index
145 1
        def process(results)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for process is considered too high. [20.45/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
146 2
          results.each do |result|
147 2
            values = Hash[@index.all_fields.map do |field|
148 6
              next unless result.key? field.id
149 5
              value = result[field.id]
150
151
              # If this is an ID, generate or construct an ObjectId
152 5
              if field.is_a?(Fields::IDField)
153 2
                value = if value.nil?
154
                          BSON::ObjectId.new
155
                        else
156 2
                          BSON::ObjectId.from_string(value)
157
                        end
158
              end
159 5
              [MongoBackend.field_path(@index, field).join('.'), value]
160
            end.compact]
161
162 2
            @client[@index.to_id_graph.key].update_one(
163
              { '_id' => values['_id'] },
164
              { '$set' => values },
165
              upsert: true
166
            )
167
          end
168
        end
169
      end
170
171
      # A query step to look up data from a particular collection
172 1
      class IndexLookupStatementStep < BackendBase::IndexLookupStatementStep
173
        # rubocop:disable Metrics/ParameterLists
174 1
        def initialize(client, select, conditions, step, next_step, prev_step)
175 1
          super
176
177 1
          @logger = Logging.logger['nose::backend::mongo::indexlookupstep']
178 1
          @order = @step.order_by.map do |field|
179
            { MongoBackend.field_path(@index, field).join('.') => 1 }
180
          end
181
        end
182
        # rubocop:enable Metrics/ParameterLists
183
184
        # Perform a column family lookup in MongoDB
185 1
        def process(conditions, results)
186 1
          results = initial_results(conditions) if results.nil?
187 1
          condition_list = result_conditions conditions, results
188
189 1
          new_result = condition_list.flat_map do |result_conditions|
190 1
            query_doc = query_doc_for_conditions result_conditions
191 1
            result = @client[@index.to_id_graph.key].find(query_doc)
192 1
            result = result.sort(*@order) unless @order.empty?
193
194 1
            result.to_a
195
          end
196
197
          # Limit the size of the results in case we fetched multiple keys
198 1
          new_result = new_result[0..(@step.limit.nil? ? -1 : @step.limit)]
199 1
          MongoBackend.rows_from_mongo new_result, @index, @step.fields
200
        end
201
202 1
        private
203
204
        # Produce the document used to issue the query to MongoDB
205
        # @return [Hash]
206 1
        def query_doc_for_conditions(conditions)
207
          conditions.map do |c|
208 1
            match = c.value
209 1
            match = BSON::ObjectId(match) if c.field.is_a? Fields::IDField
210
211
            # For range operators, find the corresponding MongoDB operator
212 1
            match = { mongo_operator(op) => match } if c.operator != :'='
213
214 1
            { MongoBackend.field_path(@index, c.field).join('.') => match }
215 1
          end.reduce(&:merge)
216
        end
217
218
        # Produce the comparison operator used in MongoDB
219
        # @return [String]
220 1
        def mongo_operator(operator)
221
          case operator
222
          when :>
223
            '$gt'
224
          when :>=
225
            '$gte'
226
          when :<
227
            '$lt'
228
          when :<=
229
            '$lte'
230
          end
231
        end
232
      end
233
234 1
      private
235
236
      # Create a Mongo client from the saved config
237 1
      def client
238 31
        @client ||= Mongo::Client.new @uri, database: @database
239
      end
240
    end
241
  end
242
end
243