A calculated magnitude based on number of assignments, branches, and conditions.
1 | # frozen_string_literal: true |
||
2 | |||
3 | 1 | require 'mongo' |
|
4 | |||
5 | 1 | module NoSE |
|
6 | 1 | module Backend |
|
7 | # A backend which communicates with MongoDB |
||
8 | 1 | class MongoBackend < Backend |
|
9 | 1 | include Subtype |
|
10 | |||
11 | 1 | def initialize(model, indexes, plans, update_plans, config) |
|
12 | 3 | super |
|
13 | |||
14 | 3 | @uri = config[:uri] |
|
15 | 3 | @database = config[:database] |
|
16 | 3 | Mongo::Logger.logger.level = ::Logger::FATAL |
|
17 | end |
||
18 | |||
19 | # MongoDB uses ID graphs for column families |
||
20 | # @return [Boolean] |
||
21 | 1 | def by_id_graph |
|
22 | 4 | true |
|
23 | end |
||
24 | |||
25 | # Produce a new ObjectId |
||
26 | # @return [BSON::ObjectId] |
||
27 | 1 | def generate_id |
|
28 | 2 | BSON::ObjectId.new |
|
29 | end |
||
30 | |||
31 | # Create new MongoDB collections for each index |
||
32 | 1 | def indexes_ddl(execute = false, skip_existing = false, |
|
0 ignored issues
–
show
Coding Style
introduced
by
Loading history...
|
|||
33 | drop_existing = false) |
||
34 | 4 | ddl = [] |
|
35 | |||
36 | # Create the ID graphs for all indexes |
||
37 | 4 | id_graphs = @indexes.map(&:to_id_graph).uniq |
|
38 | 4 | id_graphs.map do |id_graph| |
|
39 | 16 | ddl << "Create #{id_graph.key}" |
|
40 | 16 | next unless execute |
|
41 | |||
42 | 34 | collection = client.collections.find { |c| c.name == id_graph.key } |
|
43 | 16 | collection.drop if drop_existing && !collection.nil? |
|
44 | 16 | client[id_graph.key].create unless skip_existing |
|
45 | end |
||
46 | |||
47 | # Create any necessary indexes on the ID graphs |
||
48 | 4 | index_keys = [] |
|
49 | 4 | @indexes.sort_by do |index| |
|
50 | 16 | -(index.hash_fields.to_a + index.order_fields).length |
|
51 | end.each do |index| |
||
52 | # Check if we already have a prefix of this index created |
||
53 | 16 | keys = index.hash_fields.to_a + index.order_fields |
|
54 | 40 | next if index_keys.any? { |i| i[keys.length - 1] == keys } |
|
55 | 16 | index_keys << keys |
|
56 | |||
57 | 16 | id_graph = index.to_id_graph |
|
58 | 16 | next if id_graph == index |
|
59 | |||
60 | # Combine the key paths for all fields to create a compound index |
||
61 | 8 | index_spec = Hash[keys.map do |key| |
|
62 | 32 | [self.class.field_path(index, key).join('.'), 1] |
|
63 | end] |
||
64 | |||
65 | 8 | ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})" |
|
66 | 8 | next unless execute |
|
67 | |||
68 | 8 | client[id_graph.key].indexes.create_one index_spec |
|
69 | end |
||
70 | |||
71 | 4 | ddl |
|
72 | end |
||
73 | |||
74 | # Insert a chunk of rows into an index |
||
75 | # @return [Array<BSON::ObjectId>] |
||
76 | 1 | def index_insert_chunk(index, chunk) |
|
0 ignored issues
–
show
|
|||
77 | # We only need to insert into indexes which are ID graphs |
||
78 | 2 | fail unless index == index.to_id_graph |
|
79 | |||
80 | 2 | chunk.map! do |row| |
|
81 | 2 | row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) } |
|
82 | 2 | index.all_fields.each do |field| |
|
83 | 6 | field_path = self.class.field_path(index, field) |
|
84 | 6 | entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] } |
|
85 | |||
86 | 6 | if field_path.last == '_id' |
|
87 | 2 | entity_hash[field_path.last] = BSON::ObjectId.new |
|
88 | else |
||
89 | 4 | entity_hash[field_path.last] = row[field.id] |
|
90 | end |
||
91 | end |
||
92 | |||
93 | 2 | row_hash.default_proc = nil |
|
94 | 2 | row_hash |
|
95 | end |
||
96 | |||
97 | 2 | client[index.key].insert_many(chunk, ordered: false).inserted_ids |
|
98 | end |
||
99 | |||
100 | # Sample a number of values from the given index |
||
101 | 1 | def index_sample(index, count) |
|
102 | 2 | rows = client[index.to_id_graph.key].aggregate( |
|
103 | [ |
||
104 | { '$sample' => { 'size' => count } } |
||
105 | ] |
||
106 | ).to_a |
||
107 | |||
108 | 2 | MongoBackend.rows_from_mongo rows, index |
|
109 | end |
||
110 | |||
111 | # Convert documens returned from MongoDB into the format we understand |
||
112 | # @return [Array<Hash>] |
||
113 | 1 | def self.rows_from_mongo(rows, index, fields = nil) |
|
114 | 3 | fields = index.all_fields if fields.nil? |
|
115 | |||
116 | 3 | rows.map! do |row| |
|
117 | 3 | Hash[fields.map do |field| |
|
118 | 9 | field_path = MongoBackend.field_path(index, field) |
|
119 | 18 | [field.id, field_path.reduce(row) { |h, p| h[p] }] |
|
120 | end] |
||
121 | end |
||
122 | end |
||
123 | |||
124 | # Find the path to a given field |
||
125 | # @return [Array<String>] |
||
126 | 1 | def self.field_path(index, field) |
|
127 | # Find the path from the hash entity to the given key |
||
128 | 53 | field_path = index.graph.path_between index.hash_fields.first.parent, |
|
129 | field.parent |
||
130 | 53 | field_path = field_path.path_for_field(field) |
|
131 | |||
132 | # Use _id for any primary keys |
||
133 | 53 | field_path[-1] = '_id' if field.is_a? Fields::IDField |
|
134 | |||
135 | 53 | field_path |
|
136 | end |
||
137 | |||
138 | # Insert data into an index on the backend |
||
139 | 1 | class InsertStatementStep < Backend::InsertStatementStep |
|
140 | 1 | def initialize(client, index, fields) |
|
141 | 2 | super |
|
142 | |||
143 | 2 | @fields = fields.map(&:id) & index.all_fields.map(&:id) |
|
144 | end |
||
145 | |||
146 | # Insert each row into the index |
||
147 | 1 | def process(results) |
|
0 ignored issues
–
show
|
|||
148 | 2 | results.each do |result| |
|
149 | 2 | values = Hash[@index.all_fields.map do |field| |
|
150 | 6 | next unless result.key? field.id |
|
151 | 5 | value = result[field.id] |
|
152 | |||
153 | # If this is an ID, generate or construct an ObjectId |
||
154 | 5 | if field.is_a?(Fields::IDField) |
|
155 | 2 | value = if value.nil? |
|
156 | BSON::ObjectId.new |
||
157 | else |
||
158 | 2 | BSON::ObjectId.from_string(value) |
|
159 | end |
||
160 | end |
||
161 | 5 | [MongoBackend.field_path(@index, field).join('.'), value] |
|
162 | end.compact] |
||
163 | |||
164 | 2 | @client[@index.to_id_graph.key].update_one( |
|
165 | { '_id' => values['_id'] }, |
||
166 | { '$set' => values }, |
||
167 | upsert: true |
||
168 | ) |
||
169 | end |
||
170 | end |
||
171 | end |
||
172 | |||
173 | # A query step to look up data from a particular collection |
||
174 | 1 | class IndexLookupStatementStep < Backend::IndexLookupStatementStep |
|
175 | # rubocop:disable Metrics/ParameterLists |
||
176 | 1 | def initialize(client, select, conditions, step, next_step, prev_step) |
|
177 | 1 | super |
|
178 | |||
179 | 1 | @logger = Logging.logger['nose::backend::mongo::indexlookupstep'] |
|
180 | 1 | @order = @step.order_by.map do |field| |
|
181 | { MongoBackend.field_path(@index, field).join('.') => 1 } |
||
182 | end |
||
183 | end |
||
184 | # rubocop:enable Metrics/ParameterLists |
||
185 | |||
186 | # Perform a column family lookup in MongoDB |
||
187 | 1 | def process(conditions, results) |
|
0 ignored issues
–
show
|
|||
188 | 1 | results = initial_results(conditions) if results.nil? |
|
189 | 1 | condition_list = result_conditions conditions, results |
|
190 | |||
191 | 1 | new_result = condition_list.flat_map do |result_conditions| |
|
192 | 1 | query_doc = query_doc_for_conditions result_conditions |
|
193 | 1 | result = @client[@index.to_id_graph.key].find(query_doc) |
|
194 | 1 | result = result.sort(*@order) unless @order.empty? |
|
195 | |||
196 | 1 | result.to_a |
|
197 | end |
||
198 | |||
199 | # Limit the size of the results in case we fetched multiple keys |
||
200 | 1 | new_result = new_result[0..(@step.limit.nil? ? -1 : @step.limit)] |
|
201 | 1 | MongoBackend.rows_from_mongo new_result, @index, @step.fields |
|
202 | end |
||
203 | |||
204 | 1 | private |
|
205 | |||
206 | # Produce the document used to issue the query to MongoDB |
||
207 | # @return [Hash] |
||
208 | 1 | def query_doc_for_conditions(conditions) |
|
209 | 1 | conditions.map do |c| |
|
210 | 1 | match = c.value |
|
211 | 1 | match = BSON::ObjectId(match) if c.field.is_a? Fields::IDField |
|
212 | |||
213 | # For range operators, find the corresponding MongoDB operator |
||
214 | 1 | match = { mongo_operator(op) => match } if c.operator != :'=' |
|
215 | |||
216 | 1 | { MongoBackend.field_path(@index, c.field).join('.') => match } |
|
217 | end.reduce(&:merge) |
||
218 | end |
||
219 | |||
220 | # Produce the comparison operator used in MongoDB |
||
221 | # @return [String] |
||
222 | 1 | def mongo_operator(operator) |
|
223 | case operator |
||
224 | when :> |
||
225 | '$gt' |
||
226 | when :>= |
||
227 | '$gte' |
||
228 | when :< |
||
229 | '$lt' |
||
230 | when :<= |
||
231 | '$lte' |
||
232 | end |
||
233 | end |
||
234 | end |
||
235 | |||
236 | 1 | private |
|
237 | |||
238 | # Create a Mongo client from the saved config |
||
239 | 1 | def client |
|
240 | 31 | @client ||= Mongo::Client.new @uri, database: @database |
|
241 | end |
||
242 | end |
||
243 | end |
||
244 | end |
||
245 |