Completed
Push — mongo-graph ( e69734...bcf034 )
by Michael
03:47
created

MongoBackend.generate_id()   A

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
c 1
b 0
f 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
# frozen_string_literal: true
2
3 1
require 'mongo'
4
5 1
module NoSE
6 1
  module Backend
7
    # A backend which communicates with MongoDB
8 1
    class MongoBackend < BackendBase
9 1
      def initialize(model, indexes, plans, update_plans, config)
10 2
        super
11
12 2
        @uri = config[:uri]
13 2
        @database = config[:database]
14 2
        Mongo::Logger.logger.level = ::Logger::FATAL
15
      end
16
17
      # MongoDB uses ID graphs for column families
18
      # @return [Boolean]
19 1
      def by_id_graph
20 2
        true
21
      end
22
23
      # Produce a new ObjectId
24
      # @return [BSON::ObjectId]
25 1
      def generate_id
26 1
        BSON::ObjectId.new
27
      end
28
29
      # Create new MongoDB collections for each index
30 1
      def indexes_ddl(execute = false, skip_existing = false,
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for indexes_ddl is considered too high. [51.95/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
Coding Style introduced by
This method is 27 lines long. Your coding style permits a maximum length of 20.
Loading history...
31
                      drop_existing = false)
32 2
        ddl = []
33
34
        # Create the ID graphs for all indexes
35 2
        id_graphs = @indexes.map(&:to_id_graph).uniq
36 2
        id_graphs.map do |id_graph|
37 8
          ddl << "Create #{id_graph.key}"
38 8
          next unless execute
39
40 14
          collection = client.collections.find { |c| c.name == id_graph.key }
41 8
          collection.drop if drop_existing && !collection.nil?
42 8
          client[id_graph.key].create unless skip_existing
43
        end
44
45
        # Create any necessary indexes on the ID graphs
46 2
        index_keys = []
47
        @indexes.sort_by do |index|
48 8
          -(index.hash_fields.to_a + index.order_fields).length
49 2
        end.each do |index|
0 ignored issues
show
Coding Style introduced by
Your coding style requires you to avoid multi-line chains of blocks. They can make code unclear.
Loading history...
50
          # Check if we already have a prefix of this index created
51 8
          keys = index.hash_fields.to_a + index.order_fields
52 20
          next if index_keys.any? { |i| i[keys.length - 1] == keys }
53 8
          index_keys << keys
54
55 8
          id_graph = index.to_id_graph
56 8
          next if id_graph == index
57
58
          # Combine the key paths for all fields to create a compound index
59 4
          index_spec = Hash[keys.map do |key|
60 16
            [self.class.field_path(index, key).join('.'), 1]
61
          end]
62
63 4
          ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})"
64 4
          next unless execute
65
66 4
          client[id_graph.key].indexes.create_one index_spec
67
        end
68
69 2
        ddl
70
      end
71
72
      # Insert a chunk of rows into an index
73
      # @return [Array<BSON::ObjectId>]
74 1
      def index_insert_chunk(index, chunk)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for index_insert_chunk is considered too high. [26.76/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
75
        # We only need to insert into indexes which are ID graphs
76 1
        fail unless index == index.to_id_graph
77
78 1
        chunk.map! do |row|
79 1
          row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) }
80 1
          index.all_fields.each do |field|
81 3
            field_path = self.class.field_path(index, field)
82 3
            entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] }
83
84 3
            if field_path.last == '_id'
85 1
              entity_hash[field_path.last] = BSON::ObjectId.new
86
            else
87 2
              entity_hash[field_path.last] = row[field.id]
88
            end
89
          end
90
91 1
          row_hash
92
        end
93
94 1
        client[index.key].insert_many(chunk).inserted_ids
95
      end
96
97
      # Sample a number of values from the given index
98 1
      def index_sample(index, count)
99 1
        rows = client[index.to_id_graph.key].aggregate(
100
          [
101
            { '$sample' => { 'size' => count } }
102
          ]
103
        ).to_a
104
105 1
        MongoBackend.rows_from_mongo rows, index
106
      end
107
108
      # Convert documens returned from MongoDB into the format we understand
109
      # @return [Array<Hash>]
110 1
      def self.rows_from_mongo(rows, index, fields = nil)
111 2
        fields = index.all_fields if fields.nil?
112
113 2
        rows.map! do |row|
114 2
          Hash[fields.map do |field|
115 6
            field_path = MongoBackend.field_path(index, field)
116 12
            [field.id, field_path.reduce(row) { |h, p| h[p] }]
117
          end]
118
        end
119
      end
120
121
      # Find the path to a given field
122
      # @return [Array<String>]
123 1
      def self.field_path(index, field)
124
        # Find the path from the hash entity to the given key
125 28
        field_path = index.graph.path_between index.hash_fields.first.parent,
126
                                              field.parent
127 28
        field_path = field_path.path_for_field(field)
128
129
        # Use _id for any primary keys
130 28
        field_path[-1] = '_id' if field.is_a? Fields::IDField
131
132 28
        field_path
133
      end
134
135
      # Insert data into an index on the backend
136 1
      class InsertStatementStep < BackendBase::InsertStatementStep
137 1
        def initialize(client, index, fields)
138 1
          super
139
140 1
          @fields = fields.map(&:id) & index.all_fields.map(&:id)
141
        end
142
143
        # Insert each row into the index
144 1
        def process(results)
0 ignored issues
show
Coding Style introduced by
The Assignment, Branch, Condition size for process is considered too high. [20.45/20]. The ABC size is based on assignments, branches (method calls), and conditions.
Loading history...
145 1
          results.each do |result|
146 1
            values = Hash[@index.all_fields.map do |field|
147 3
              next unless result.key? field.id
148 2
              value = result[field.id]
149
150
              # If this is an ID, generate or construct an ObjectId
151 2
              if field.is_a?(Fields::IDField)
152 1
                value = if value.nil?
153
                          BSON::ObjectId.new
154
                        else
155 1
                          BSON::ObjectId.from_string(value)
156
                        end
157
              end
158 2
              [MongoBackend.field_path(@index, field).join('.'), value]
159
            end.compact]
160
161 1
            @client[@index.to_id_graph.key].update_one(
162
              { '_id' => values['_id'] },
163
              { '$set' => values },
164
              upsert: true
165
            )
166
          end
167
        end
168
      end
169
170
      # A query step to look up data from a particular collection
171 1
      class IndexLookupStatementStep < BackendBase::IndexLookupStatementStep
172
        # rubocop:disable Metrics/ParameterLists
173 1
        def initialize(client, select, conditions, step, next_step, prev_step)
174 1
          super
175
176 1
          @logger = Logging.logger['nose::backend::mongo::indexlookupstep']
177 1
          @order = @step.order_by.map do |field|
178
            { MongoBackend.field_path(@index, field).join('.') => 1 }
179
          end
180
        end
181
        # rubocop:enable Metrics/ParameterLists
182
183
        # Perform a column family lookup in MongoDB
184 1
        def process(conditions, results)
185 1
          results = initial_results(conditions) if results.nil?
186 1
          condition_list = result_conditions conditions, results
187
188 1
          new_result = condition_list.flat_map do |result_conditions|
189 1
            query_doc = query_doc_for_conditions result_conditions
190 1
            result = @client[@index.to_id_graph.key].find(query_doc)
191 1
            result = result.sort(*@order) unless @order.empty?
192
193 1
            result.to_a
194
          end
195
196
          # Limit the size of the results in case we fetched multiple keys
197 1
          new_result = new_result[0..(@step.limit.nil? ? -1 : @step.limit)]
198 1
          MongoBackend.rows_from_mongo new_result, @index, @step.fields
199
        end
200
201 1
        private
202
203
        # Produce the document used to issue the query to MongoDB
204
        # @return [Hash]
205 1
        def query_doc_for_conditions(conditions)
206
          conditions.map do |c|
207 1
            match = c.value
208 1
            match = BSON::ObjectId(match) if c.field.is_a? Fields::IDField
209
210
            # For range operators, find the corresponding MongoDB operator
211 1
            match = { mongo_operator(op) => match } if c.operator != :'='
212
213 1
            { MongoBackend.field_path(@index, c.field).join('.') => match }
214 1
          end.reduce(&:merge)
215
        end
216
217
        # Produce the comparison operator used in MongoDB
218
        # @return [String]
219 1
        def mongo_operator(operator)
220
          case operator
221
          when :>
222
            '$gt'
223
          when :>=
224
            '$gte'
225
          when :<
226
            '$lt'
227
          when :<=
228
            '$lte'
229
          end
230
        end
231
      end
232
233 1
      private
234
235
      # Create a Mongo client from the saved config
236 1
      def client
237 16
        @client ||= Mongo::Client.new @uri, database: @database
238
      end
239
    end
240
  end
241
end
242