|
1
|
|
|
# frozen_string_literal: true |
|
2
|
|
|
|
|
3
|
1 |
|
require 'mongo' |
|
4
|
|
|
|
|
5
|
1 |
|
module NoSE |
|
6
|
1 |
|
module Backend |
|
7
|
|
|
# A backend which communicates with MongoDB |
|
8
|
1 |
|
class MongoBackend < BackendBase |
|
9
|
1 |
|
def initialize(model, indexes, plans, update_plans, config) |
|
10
|
2 |
|
super |
|
11
|
|
|
|
|
12
|
2 |
|
@uri = config[:uri] |
|
13
|
2 |
|
@database = config[:database] |
|
14
|
2 |
|
Mongo::Logger.logger.level = ::Logger::FATAL |
|
15
|
|
|
end |
|
16
|
|
|
|
|
17
|
|
|
# MongoDB uses ID graphs for column families |
|
18
|
|
|
# @return [Boolean] |
|
19
|
1 |
|
def by_id_graph |
|
20
|
2 |
|
true |
|
21
|
|
|
end |
|
22
|
|
|
|
|
23
|
|
|
# Produce a new ObjectId |
|
24
|
|
|
# @return [BSON::ObjectId] |
|
25
|
1 |
|
def generate_id |
|
26
|
1 |
|
BSON::ObjectId.new |
|
27
|
|
|
end |
|
28
|
|
|
|
|
29
|
|
|
# Create new MongoDB collections for each index |
|
30
|
1 |
|
def indexes_ddl(execute = false, skip_existing = false, |
|
|
|
|
|
|
31
|
|
|
drop_existing = false) |
|
32
|
2 |
|
ddl = [] |
|
33
|
|
|
|
|
34
|
|
|
# Create the ID graphs for all indexes |
|
35
|
2 |
|
id_graphs = @indexes.map(&:to_id_graph).uniq |
|
36
|
2 |
|
id_graphs.map do |id_graph| |
|
37
|
8 |
|
ddl << "Create #{id_graph.key}" |
|
38
|
8 |
|
next unless execute |
|
39
|
|
|
|
|
40
|
14 |
|
collection = client.collections.find { |c| c.name == id_graph.key } |
|
41
|
8 |
|
collection.drop if drop_existing && !collection.nil? |
|
42
|
8 |
|
client[id_graph.key].create unless skip_existing |
|
43
|
|
|
end |
|
44
|
|
|
|
|
45
|
|
|
# Create any necessary indexes on the ID graphs |
|
46
|
2 |
|
index_keys = [] |
|
47
|
|
|
@indexes.sort_by do |index| |
|
48
|
8 |
|
-(index.hash_fields.to_a + index.order_fields).length |
|
49
|
2 |
|
end.each do |index| |
|
|
|
|
|
|
50
|
|
|
# Check if we already have a prefix of this index created |
|
51
|
8 |
|
keys = index.hash_fields.to_a + index.order_fields |
|
52
|
20 |
|
next if index_keys.any? { |i| i[keys.length - 1] == keys } |
|
53
|
8 |
|
index_keys << keys |
|
54
|
|
|
|
|
55
|
8 |
|
id_graph = index.to_id_graph |
|
56
|
8 |
|
next if id_graph == index |
|
57
|
|
|
|
|
58
|
|
|
# Combine the key paths for all fields to create a compound index |
|
59
|
4 |
|
index_spec = Hash[keys.map do |key| |
|
60
|
16 |
|
[self.class.field_path(index, key).join('.'), 1] |
|
61
|
|
|
end] |
|
62
|
|
|
|
|
63
|
4 |
|
ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})" |
|
64
|
4 |
|
next unless execute |
|
65
|
|
|
|
|
66
|
4 |
|
client[id_graph.key].indexes.create_one index_spec |
|
67
|
|
|
end |
|
68
|
|
|
|
|
69
|
2 |
|
ddl |
|
70
|
|
|
end |
|
71
|
|
|
|
|
72
|
|
|
# Insert a chunk of rows into an index |
|
73
|
|
|
# @return [Array<BSON::ObjectId>] |
|
74
|
1 |
|
def index_insert_chunk(index, chunk) |
|
|
|
|
|
|
75
|
|
|
# We only need to insert into indexes which are ID graphs |
|
76
|
1 |
|
fail unless index == index.to_id_graph |
|
77
|
|
|
|
|
78
|
1 |
|
chunk.map! do |row| |
|
79
|
1 |
|
row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) } |
|
80
|
1 |
|
index.all_fields.each do |field| |
|
81
|
3 |
|
field_path = self.class.field_path(index, field) |
|
82
|
3 |
|
entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] } |
|
83
|
|
|
|
|
84
|
3 |
|
if field_path.last == '_id' |
|
85
|
1 |
|
entity_hash[field_path.last] = BSON::ObjectId.new |
|
86
|
|
|
else |
|
87
|
2 |
|
entity_hash[field_path.last] = row[field.id] |
|
88
|
|
|
end |
|
89
|
|
|
end |
|
90
|
|
|
|
|
91
|
1 |
|
row_hash |
|
92
|
|
|
end |
|
93
|
|
|
|
|
94
|
1 |
|
client[index.key].insert_many(chunk).inserted_ids |
|
95
|
|
|
end |
|
96
|
|
|
|
|
97
|
|
|
# Sample a number of values from the given index |
|
98
|
1 |
|
def index_sample(index, count) |
|
99
|
1 |
|
rows = client[index.to_id_graph.key].aggregate( |
|
100
|
|
|
[ |
|
101
|
|
|
{ '$sample' => { 'size' => count } } |
|
102
|
|
|
] |
|
103
|
|
|
).to_a |
|
104
|
|
|
|
|
105
|
1 |
|
MongoBackend.rows_from_mongo rows, index |
|
106
|
|
|
end |
|
107
|
|
|
|
|
108
|
|
|
# Convert documens returned from MongoDB into the format we understand |
|
109
|
|
|
# @return [Array<Hash>] |
|
110
|
1 |
|
def self.rows_from_mongo(rows, index, fields = nil) |
|
111
|
2 |
|
fields = index.all_fields if fields.nil? |
|
112
|
|
|
|
|
113
|
2 |
|
rows.map! do |row| |
|
114
|
2 |
|
Hash[fields.map do |field| |
|
115
|
6 |
|
field_path = MongoBackend.field_path(index, field) |
|
116
|
12 |
|
[field.id, field_path.reduce(row) { |h, p| h[p] }] |
|
117
|
|
|
end] |
|
118
|
|
|
end |
|
119
|
|
|
end |
|
120
|
|
|
|
|
121
|
|
|
# Find the path to a given field |
|
122
|
|
|
# @return [Array<String>] |
|
123
|
1 |
|
def self.field_path(index, field) |
|
124
|
|
|
# Find the path from the hash entity to the given key |
|
125
|
28 |
|
field_path = index.graph.path_between index.hash_fields.first.parent, |
|
126
|
|
|
field.parent |
|
127
|
28 |
|
field_path = field_path.path_for_field(field) |
|
128
|
|
|
|
|
129
|
|
|
# Use _id for any primary keys |
|
130
|
28 |
|
field_path[-1] = '_id' if field.is_a? Fields::IDField |
|
131
|
|
|
|
|
132
|
28 |
|
field_path |
|
133
|
|
|
end |
|
134
|
|
|
|
|
135
|
|
|
# Insert data into an index on the backend |
|
136
|
1 |
|
class InsertStatementStep < BackendBase::InsertStatementStep |
|
137
|
1 |
|
def initialize(client, index, fields) |
|
138
|
1 |
|
super |
|
139
|
|
|
|
|
140
|
1 |
|
@fields = fields.map(&:id) & index.all_fields.map(&:id) |
|
141
|
|
|
end |
|
142
|
|
|
|
|
143
|
|
|
# Insert each row into the index |
|
144
|
1 |
|
def process(results) |
|
|
|
|
|
|
145
|
1 |
|
results.each do |result| |
|
146
|
1 |
|
values = Hash[@index.all_fields.map do |field| |
|
147
|
3 |
|
next unless result.key? field.id |
|
148
|
2 |
|
value = result[field.id] |
|
149
|
|
|
|
|
150
|
|
|
# If this is an ID, generate or construct an ObjectId |
|
151
|
2 |
|
if field.is_a?(Fields::IDField) |
|
152
|
1 |
|
value = if value.nil? |
|
153
|
|
|
BSON::ObjectId.new |
|
154
|
|
|
else |
|
155
|
1 |
|
BSON::ObjectId.from_string(value) |
|
156
|
|
|
end |
|
157
|
|
|
end |
|
158
|
2 |
|
[MongoBackend.field_path(@index, field).join('.'), value] |
|
159
|
|
|
end.compact] |
|
160
|
|
|
|
|
161
|
1 |
|
@client[@index.to_id_graph.key].update_one( |
|
162
|
|
|
{ '_id' => values['_id'] }, |
|
163
|
|
|
{ '$set' => values }, |
|
164
|
|
|
upsert: true |
|
165
|
|
|
) |
|
166
|
|
|
end |
|
167
|
|
|
end |
|
168
|
|
|
end |
|
169
|
|
|
|
|
170
|
|
|
# A query step to look up data from a particular collection |
|
171
|
1 |
|
class IndexLookupStatementStep < BackendBase::IndexLookupStatementStep |
|
172
|
|
|
# rubocop:disable Metrics/ParameterLists |
|
173
|
1 |
|
def initialize(client, select, conditions, step, next_step, prev_step) |
|
174
|
1 |
|
super |
|
175
|
|
|
|
|
176
|
1 |
|
@logger = Logging.logger['nose::backend::mongo::indexlookupstep'] |
|
177
|
1 |
|
@order = @step.order_by.map do |field| |
|
178
|
|
|
{ MongoBackend.field_path(@index, field).join('.') => 1 } |
|
179
|
|
|
end |
|
180
|
|
|
end |
|
181
|
|
|
# rubocop:enable Metrics/ParameterLists |
|
182
|
|
|
|
|
183
|
|
|
# Perform a column family lookup in MongoDB |
|
184
|
1 |
|
def process(conditions, results) |
|
185
|
1 |
|
results = initial_results(conditions) if results.nil? |
|
186
|
1 |
|
condition_list = result_conditions conditions, results |
|
187
|
|
|
|
|
188
|
1 |
|
new_result = condition_list.flat_map do |result_conditions| |
|
189
|
1 |
|
query_doc = query_doc_for_conditions result_conditions |
|
190
|
1 |
|
result = @client[@index.to_id_graph.key].find(query_doc) |
|
191
|
1 |
|
result = result.sort(*@order) unless @order.empty? |
|
192
|
|
|
|
|
193
|
1 |
|
result.to_a |
|
194
|
|
|
end |
|
195
|
|
|
|
|
196
|
|
|
# Limit the size of the results in case we fetched multiple keys |
|
197
|
1 |
|
new_result = new_result[0..(@step.limit.nil? ? -1 : @step.limit)] |
|
198
|
1 |
|
MongoBackend.rows_from_mongo new_result, @index, @step.fields |
|
199
|
|
|
end |
|
200
|
|
|
|
|
201
|
1 |
|
private |
|
202
|
|
|
|
|
203
|
|
|
# Produce the document used to issue the query to MongoDB |
|
204
|
|
|
# @return [Hash] |
|
205
|
1 |
|
def query_doc_for_conditions(conditions) |
|
206
|
|
|
conditions.map do |c| |
|
207
|
1 |
|
match = c.value |
|
208
|
1 |
|
match = BSON::ObjectId(match) if c.field.is_a? Fields::IDField |
|
209
|
|
|
|
|
210
|
|
|
# For range operators, find the corresponding MongoDB operator |
|
211
|
1 |
|
match = { mongo_operator(op) => match } if c.operator != :'=' |
|
212
|
|
|
|
|
213
|
1 |
|
{ MongoBackend.field_path(@index, c.field).join('.') => match } |
|
214
|
1 |
|
end.reduce(&:merge) |
|
215
|
|
|
end |
|
216
|
|
|
|
|
217
|
|
|
# Produce the comparison operator used in MongoDB |
|
218
|
|
|
# @return [String] |
|
219
|
1 |
|
def mongo_operator(operator) |
|
220
|
|
|
case operator |
|
221
|
|
|
when :> |
|
222
|
|
|
'$gt' |
|
223
|
|
|
when :>= |
|
224
|
|
|
'$gte' |
|
225
|
|
|
when :< |
|
226
|
|
|
'$lt' |
|
227
|
|
|
when :<= |
|
228
|
|
|
'$lte' |
|
229
|
|
|
end |
|
230
|
|
|
end |
|
231
|
|
|
end |
|
232
|
|
|
|
|
233
|
1 |
|
private |
|
234
|
|
|
|
|
235
|
|
|
# Create a Mongo client from the saved config |
|
236
|
1 |
|
def client |
|
237
|
16 |
|
@client ||= Mongo::Client.new @uri, database: @database |
|
238
|
|
|
end |
|
239
|
|
|
end |
|
240
|
|
|
end |
|
241
|
|
|
end |
|
242
|
|
|
|