1
|
|
|
# frozen_string_literal: true |
2
|
|
|
|
3
|
1 |
|
require 'mongo' |
4
|
|
|
|
5
|
1 |
|
module NoSE |
6
|
1 |
|
module Backend |
7
|
|
|
# A backend which communicates with MongoDB |
8
|
1 |
|
class MongoBackend < BackendBase |
9
|
1 |
|
def initialize(model, indexes, plans, update_plans, config) |
10
|
2 |
|
super |
11
|
|
|
|
12
|
2 |
|
@uri = config[:uri] |
13
|
2 |
|
@database = config[:database] |
14
|
2 |
|
Mongo::Logger.logger.level = ::Logger::FATAL |
15
|
|
|
end |
16
|
|
|
|
17
|
|
|
# MongoDB uses ID graphs for column families |
18
|
|
|
# @return [Boolean] |
19
|
1 |
|
def by_id_graph |
20
|
2 |
|
true |
21
|
|
|
end |
22
|
|
|
|
23
|
|
|
# Produce a new ObjectId |
24
|
|
|
# @return [BSON::ObjectId] |
25
|
1 |
|
def generate_id |
26
|
1 |
|
BSON::ObjectId.new |
27
|
|
|
end |
28
|
|
|
|
29
|
|
|
# Create new MongoDB collections for each index |
30
|
1 |
|
def indexes_ddl(execute = false, skip_existing = false, |
|
|
|
|
31
|
|
|
drop_existing = false) |
32
|
2 |
|
ddl = [] |
33
|
|
|
|
34
|
|
|
# Create the ID graphs for all indexes |
35
|
2 |
|
id_graphs = @indexes.map(&:to_id_graph).uniq |
36
|
2 |
|
id_graphs.map do |id_graph| |
37
|
8 |
|
ddl << "Create #{id_graph.key}" |
38
|
8 |
|
next unless execute |
39
|
|
|
|
40
|
14 |
|
collection = client.collections.find { |c| c.name == id_graph.key } |
41
|
8 |
|
collection.drop if drop_existing && !collection.nil? |
42
|
8 |
|
client[id_graph.key].create unless skip_existing |
43
|
|
|
end |
44
|
|
|
|
45
|
|
|
# Create any necessary indexes on the ID graphs |
46
|
2 |
|
index_keys = [] |
47
|
|
|
@indexes.sort_by do |index| |
48
|
8 |
|
-(index.hash_fields.to_a + index.order_fields).length |
49
|
2 |
|
end.each do |index| |
|
|
|
|
50
|
|
|
# Check if we already have a prefix of this index created |
51
|
8 |
|
keys = index.hash_fields.to_a + index.order_fields |
52
|
20 |
|
next if index_keys.any? { |i| i[keys.length - 1] == keys } |
53
|
8 |
|
index_keys << keys |
54
|
|
|
|
55
|
8 |
|
id_graph = index.to_id_graph |
56
|
8 |
|
next if id_graph == index |
57
|
|
|
|
58
|
|
|
# Combine the key paths for all fields to create a compound index |
59
|
4 |
|
index_spec = Hash[keys.map do |key| |
60
|
16 |
|
[self.class.field_path(index, key).join('.'), 1] |
61
|
|
|
end] |
62
|
|
|
|
63
|
4 |
|
ddl << "Add index #{index_spec} to #{id_graph.key} (#{index.key})" |
64
|
4 |
|
next unless execute |
65
|
|
|
|
66
|
4 |
|
client[id_graph.key].indexes.create_one index_spec |
67
|
|
|
end |
68
|
|
|
|
69
|
2 |
|
ddl |
70
|
|
|
end |
71
|
|
|
|
72
|
|
|
# Insert a chunk of rows into an index |
73
|
|
|
# @return [Array<BSON::ObjectId>] |
74
|
1 |
|
def index_insert_chunk(index, chunk) |
|
|
|
|
75
|
|
|
# We only need to insert into indexes which are ID graphs |
76
|
1 |
|
fail unless index == index.to_id_graph |
77
|
|
|
|
78
|
1 |
|
chunk.map! do |row| |
79
|
1 |
|
row_hash = Hash.new { |h, k| h[k] = Hash.new(&h.default_proc) } |
80
|
1 |
|
index.all_fields.each do |field| |
81
|
3 |
|
field_path = self.class.field_path(index, field) |
82
|
3 |
|
entity_hash = field_path[0..-2].reduce(row_hash) { |h, k| h[k] } |
83
|
|
|
|
84
|
3 |
|
if field_path.last == '_id' |
85
|
1 |
|
entity_hash[field_path.last] = BSON::ObjectId.new |
86
|
|
|
else |
87
|
2 |
|
entity_hash[field_path.last] = row[field.id] |
88
|
|
|
end |
89
|
|
|
end |
90
|
|
|
|
91
|
1 |
|
row_hash |
92
|
|
|
end |
93
|
|
|
|
94
|
1 |
|
client[index.key].insert_many(chunk).inserted_ids |
95
|
|
|
end |
96
|
|
|
|
97
|
|
|
# Sample a number of values from the given index |
98
|
1 |
|
def index_sample(index, count) |
99
|
1 |
|
rows = client[index.to_id_graph.key].aggregate( |
100
|
|
|
[ |
101
|
|
|
{ '$sample' => { 'size' => count } } |
102
|
|
|
] |
103
|
|
|
).to_a |
104
|
|
|
|
105
|
1 |
|
MongoBackend.rows_from_mongo rows, index |
106
|
|
|
end |
107
|
|
|
|
108
|
|
|
# Convert documens returned from MongoDB into the format we understand |
109
|
|
|
# @return [Array<Hash>] |
110
|
1 |
|
def self.rows_from_mongo(rows, index, fields = nil) |
111
|
2 |
|
fields = index.all_fields if fields.nil? |
112
|
|
|
|
113
|
2 |
|
rows.map! do |row| |
114
|
2 |
|
Hash[fields.map do |field| |
115
|
6 |
|
field_path = MongoBackend.field_path(index, field) |
116
|
12 |
|
[field.id, field_path.reduce(row) { |h, p| h[p] }] |
117
|
|
|
end] |
118
|
|
|
end |
119
|
|
|
end |
120
|
|
|
|
121
|
|
|
# Find the path to a given field |
122
|
|
|
# @return [Array<String>] |
123
|
1 |
|
def self.field_path(index, field) |
124
|
|
|
# Find the path from the hash entity to the given key |
125
|
28 |
|
field_path = index.graph.path_between index.hash_fields.first.parent, |
126
|
|
|
field.parent |
127
|
28 |
|
field_path = field_path.path_for_field(field) |
128
|
|
|
|
129
|
|
|
# Use _id for any primary keys |
130
|
28 |
|
field_path[-1] = '_id' if field.is_a? Fields::IDField |
131
|
|
|
|
132
|
28 |
|
field_path |
133
|
|
|
end |
134
|
|
|
|
135
|
|
|
# Insert data into an index on the backend |
136
|
1 |
|
class InsertStatementStep < BackendBase::InsertStatementStep |
137
|
1 |
|
def initialize(client, index, fields) |
138
|
1 |
|
super |
139
|
|
|
|
140
|
1 |
|
@fields = fields.map(&:id) & index.all_fields.map(&:id) |
141
|
|
|
end |
142
|
|
|
|
143
|
|
|
# Insert each row into the index |
144
|
1 |
|
def process(results) |
|
|
|
|
145
|
1 |
|
results.each do |result| |
146
|
1 |
|
values = Hash[@index.all_fields.map do |field| |
147
|
3 |
|
next unless result.key? field.id |
148
|
2 |
|
value = result[field.id] |
149
|
|
|
|
150
|
|
|
# If this is an ID, generate or construct an ObjectId |
151
|
2 |
|
if field.is_a?(Fields::IDField) |
152
|
1 |
|
value = if value.nil? |
153
|
|
|
BSON::ObjectId.new |
154
|
|
|
else |
155
|
1 |
|
BSON::ObjectId.from_string(value) |
156
|
|
|
end |
157
|
|
|
end |
158
|
2 |
|
[MongoBackend.field_path(@index, field).join('.'), value] |
159
|
|
|
end.compact] |
160
|
|
|
|
161
|
1 |
|
@client[@index.to_id_graph.key].update_one( |
162
|
|
|
{ '_id' => values['_id'] }, |
163
|
|
|
{ '$set' => values }, |
164
|
|
|
upsert: true |
165
|
|
|
) |
166
|
|
|
end |
167
|
|
|
end |
168
|
|
|
end |
169
|
|
|
|
170
|
|
|
# A query step to look up data from a particular collection |
171
|
1 |
|
class IndexLookupStatementStep < BackendBase::IndexLookupStatementStep |
172
|
|
|
# rubocop:disable Metrics/ParameterLists |
173
|
1 |
|
def initialize(client, select, conditions, step, next_step, prev_step) |
174
|
1 |
|
super |
175
|
|
|
|
176
|
1 |
|
@logger = Logging.logger['nose::backend::mongo::indexlookupstep'] |
177
|
1 |
|
@order = @step.order_by.map do |field| |
178
|
|
|
{ MongoBackend.field_path(@index, field).join('.') => 1 } |
179
|
|
|
end |
180
|
|
|
end |
181
|
|
|
# rubocop:enable Metrics/ParameterLists |
182
|
|
|
|
183
|
|
|
# Perform a column family lookup in MongoDB |
184
|
1 |
|
def process(conditions, results) |
185
|
1 |
|
results = initial_results(conditions) if results.nil? |
186
|
1 |
|
condition_list = result_conditions conditions, results |
187
|
|
|
|
188
|
1 |
|
new_result = condition_list.flat_map do |result_conditions| |
189
|
1 |
|
query_doc = query_doc_for_conditions result_conditions |
190
|
1 |
|
result = @client[@index.to_id_graph.key].find(query_doc) |
191
|
1 |
|
result = result.sort(*@order) unless @order.empty? |
192
|
|
|
|
193
|
1 |
|
result.to_a |
194
|
|
|
end |
195
|
|
|
|
196
|
|
|
# Limit the size of the results in case we fetched multiple keys |
197
|
1 |
|
new_result = new_result[0..(@step.limit.nil? ? -1 : @step.limit)] |
198
|
1 |
|
MongoBackend.rows_from_mongo new_result, @index, @step.fields |
199
|
|
|
end |
200
|
|
|
|
201
|
1 |
|
private |
202
|
|
|
|
203
|
|
|
# Produce the document used to issue the query to MongoDB |
204
|
|
|
# @return [Hash] |
205
|
1 |
|
def query_doc_for_conditions(conditions) |
206
|
|
|
conditions.map do |c| |
207
|
1 |
|
match = c.value |
208
|
1 |
|
match = BSON::ObjectId(match) if c.field.is_a? Fields::IDField |
209
|
|
|
|
210
|
|
|
# For range operators, find the corresponding MongoDB operator |
211
|
1 |
|
match = { mongo_operator(op) => match } if c.operator != :'=' |
212
|
|
|
|
213
|
1 |
|
{ MongoBackend.field_path(@index, c.field).join('.') => match } |
214
|
1 |
|
end.reduce(&:merge) |
215
|
|
|
end |
216
|
|
|
|
217
|
|
|
# Produce the comparison operator used in MongoDB |
218
|
|
|
# @return [String] |
219
|
1 |
|
def mongo_operator(operator) |
220
|
|
|
case operator |
221
|
|
|
when :> |
222
|
|
|
'$gt' |
223
|
|
|
when :>= |
224
|
|
|
'$gte' |
225
|
|
|
when :< |
226
|
|
|
'$lt' |
227
|
|
|
when :<= |
228
|
|
|
'$lte' |
229
|
|
|
end |
230
|
|
|
end |
231
|
|
|
end |
232
|
|
|
|
233
|
1 |
|
private |
234
|
|
|
|
235
|
|
|
# Create a Mongo client from the saved config |
236
|
1 |
|
def client |
237
|
16 |
|
@client ||= Mongo::Client.new @uri, database: @database |
238
|
|
|
end |
239
|
|
|
end |
240
|
|
|
end |
241
|
|
|
end |
242
|
|
|
|