1
|
|
|
import os |
2
|
|
|
import logging |
3
|
|
|
import pdb |
4
|
|
|
import time |
5
|
|
|
import random |
6
|
|
|
from multiprocessing import Process |
7
|
|
|
import numpy as np |
8
|
|
|
from client import MilvusClient |
9
|
|
|
import utils |
10
|
|
|
import parser |
11
|
|
|
from runner import Runner |
12
|
|
|
|
13
|
|
|
logger = logging.getLogger("milvus_benchmark.docker") |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
class DockerRunner(Runner): |
17
|
|
|
"""run docker mode""" |
18
|
|
|
def __init__(self, image): |
19
|
|
|
super(DockerRunner, self).__init__() |
20
|
|
|
self.image = image |
21
|
|
|
|
22
|
|
|
def run(self, definition, run_type=None): |
23
|
|
|
if run_type == "performance": |
24
|
|
|
for op_type, op_value in definition.items(): |
25
|
|
|
# run docker mode |
26
|
|
|
run_count = op_value["run_count"] |
27
|
|
|
run_params = op_value["params"] |
28
|
|
|
container = None |
29
|
|
|
|
30
|
|
|
if op_type == "insert": |
31
|
|
|
if not run_params: |
32
|
|
|
logger.debug("No run params") |
33
|
|
|
continue |
34
|
|
|
for index, param in enumerate(run_params): |
35
|
|
|
logger.info("Definition param: %s" % str(param)) |
36
|
|
|
collection_name = param["collection_name"] |
37
|
|
|
volume_name = param["db_path_prefix"] |
38
|
|
|
print(collection_name) |
39
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
40
|
|
|
for k, v in param.items(): |
41
|
|
|
if k.startswith("server."): |
42
|
|
|
# Update server config |
43
|
|
|
utils.modify_config(k, v, type="server", db_slave=None) |
44
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
45
|
|
|
time.sleep(2) |
46
|
|
|
milvus = MilvusClient(collection_name) |
47
|
|
|
# Check has collection or not |
48
|
|
|
if milvus.exists_collection(): |
49
|
|
|
milvus.delete() |
50
|
|
|
time.sleep(10) |
51
|
|
|
milvus.create_collection(collection_name, dimension, index_file_size, metric_type) |
52
|
|
|
# debug |
53
|
|
|
# milvus.create_index("ivf_sq8", 16384) |
54
|
|
|
res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"]) |
55
|
|
|
logger.info(res) |
56
|
|
|
# wait for file merge |
57
|
|
|
time.sleep(collection_size * dimension / 5000000) |
58
|
|
|
# Clear up |
59
|
|
|
utils.remove_container(container) |
60
|
|
|
|
61
|
|
|
elif op_type == "query": |
62
|
|
|
for index, param in enumerate(run_params): |
63
|
|
|
logger.info("Definition param: %s" % str(param)) |
64
|
|
|
collection_name = param["dataset"] |
65
|
|
|
volume_name = param["db_path_prefix"] |
66
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
67
|
|
|
for k, v in param.items(): |
68
|
|
|
if k.startswith("server."): |
69
|
|
|
utils.modify_config(k, v, type="server") |
70
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
71
|
|
|
time.sleep(2) |
72
|
|
|
milvus = MilvusClient(collection_name) |
73
|
|
|
logger.debug(milvus.show_collections()) |
74
|
|
|
# Check has collection or not |
75
|
|
|
if not milvus.exists_collection(): |
76
|
|
|
logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
77
|
|
|
continue |
78
|
|
|
# parse index info |
79
|
|
|
index_types = param["index.index_types"] |
80
|
|
|
nlists = param["index.nlists"] |
81
|
|
|
# parse top-k, nq, nprobe |
82
|
|
|
top_ks, nqs, nprobes = parser.search_params_parser(param) |
83
|
|
|
for index_type in index_types: |
84
|
|
|
for nlist in nlists: |
85
|
|
|
result = milvus.describe_index() |
86
|
|
|
logger.info(result) |
87
|
|
|
# milvus.drop_index() |
88
|
|
|
# milvus.create_index(index_type, nlist) |
89
|
|
|
result = milvus.describe_index() |
90
|
|
|
logger.info(result) |
91
|
|
|
logger.info(milvus.count()) |
92
|
|
|
# preload index |
93
|
|
|
milvus.preload_collection() |
94
|
|
|
logger.info("Start warm up query") |
95
|
|
|
res = self.do_query(milvus, collection_name, [1], [1], 1, 1) |
96
|
|
|
logger.info("End warm up query") |
97
|
|
|
# Run query test |
98
|
|
|
for nprobe in nprobes: |
99
|
|
|
logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
100
|
|
|
res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
101
|
|
|
headers = ["Nq/Top-k"] |
102
|
|
|
headers.extend([str(top_k) for top_k in top_ks]) |
103
|
|
|
utils.print_collection(headers, nqs, res) |
104
|
|
|
utils.remove_container(container) |
105
|
|
|
|
106
|
|
|
elif run_type == "insert_performance": |
107
|
|
|
for op_type, op_value in definition.items(): |
108
|
|
|
# run docker mode |
109
|
|
|
run_count = op_value["run_count"] |
110
|
|
|
run_params = op_value["params"] |
111
|
|
|
container = None |
112
|
|
|
if not run_params: |
113
|
|
|
logger.debug("No run params") |
114
|
|
|
continue |
115
|
|
|
for index, param in enumerate(run_params): |
116
|
|
|
logger.info("Definition param: %s" % str(param)) |
117
|
|
|
collection_name = param["collection_name"] |
118
|
|
|
volume_name = param["db_path_prefix"] |
119
|
|
|
print(collection_name) |
120
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
121
|
|
|
for k, v in param.items(): |
122
|
|
|
if k.startswith("server."): |
123
|
|
|
# Update server config |
124
|
|
|
utils.modify_config(k, v, type="server", db_slave=None) |
125
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
126
|
|
|
time.sleep(2) |
127
|
|
|
milvus = MilvusClient(collection_name) |
128
|
|
|
# Check has collection or not |
129
|
|
|
if milvus.exists_collection(): |
130
|
|
|
milvus.delete() |
131
|
|
|
time.sleep(10) |
132
|
|
|
milvus.create_collection(collection_name, dimension, index_file_size, metric_type) |
133
|
|
|
# debug |
134
|
|
|
# milvus.create_index("ivf_sq8", 16384) |
135
|
|
|
res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"]) |
136
|
|
|
logger.info(res) |
137
|
|
|
# wait for file merge |
138
|
|
|
time.sleep(collection_size * dimension / 5000000) |
139
|
|
|
# Clear up |
140
|
|
|
utils.remove_container(container) |
141
|
|
|
|
142
|
|
|
elif run_type == "search_performance": |
143
|
|
|
for op_type, op_value in definition.items(): |
144
|
|
|
# run docker mode |
145
|
|
|
run_count = op_value["run_count"] |
146
|
|
|
run_params = op_value["params"] |
147
|
|
|
container = None |
148
|
|
|
for index, param in enumerate(run_params): |
149
|
|
|
logger.info("Definition param: %s" % str(param)) |
150
|
|
|
collection_name = param["dataset"] |
151
|
|
|
volume_name = param["db_path_prefix"] |
152
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
153
|
|
|
for k, v in param.items(): |
154
|
|
|
if k.startswith("server."): |
155
|
|
|
utils.modify_config(k, v, type="server") |
156
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
157
|
|
|
time.sleep(2) |
158
|
|
|
milvus = MilvusClient(collection_name) |
159
|
|
|
logger.debug(milvus.show_collections()) |
160
|
|
|
# Check has collection or not |
161
|
|
|
if not milvus.exists_collection(): |
162
|
|
|
logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
163
|
|
|
continue |
164
|
|
|
# parse index info |
165
|
|
|
index_types = param["index.index_types"] |
166
|
|
|
nlists = param["index.nlists"] |
167
|
|
|
# parse top-k, nq, nprobe |
168
|
|
|
top_ks, nqs, nprobes = parser.search_params_parser(param) |
169
|
|
|
for index_type in index_types: |
170
|
|
|
for nlist in nlists: |
171
|
|
|
result = milvus.describe_index() |
172
|
|
|
logger.info(result) |
173
|
|
|
# milvus.drop_index() |
174
|
|
|
# milvus.create_index(index_type, nlist) |
175
|
|
|
result = milvus.describe_index() |
176
|
|
|
logger.info(result) |
177
|
|
|
logger.info(milvus.count()) |
178
|
|
|
# preload index |
179
|
|
|
milvus.preload_collection() |
180
|
|
|
logger.info("Start warm up query") |
181
|
|
|
res = self.do_query(milvus, collection_name, [1], [1], 1, 1) |
182
|
|
|
logger.info("End warm up query") |
183
|
|
|
# Run query test |
184
|
|
|
for nprobe in nprobes: |
185
|
|
|
logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
186
|
|
|
res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
187
|
|
|
headers = ["Nq/Top-k"] |
188
|
|
|
headers.extend([str(top_k) for top_k in top_ks]) |
189
|
|
|
utils.print_collection(headers, nqs, res) |
190
|
|
|
utils.remove_container(container) |
191
|
|
|
|
192
|
|
|
elif run_type == "accuracy": |
193
|
|
|
""" |
194
|
|
|
{ |
195
|
|
|
"dataset": "random_50m_1024_512", |
196
|
|
|
"index.index_types": ["flat", ivf_flat", "ivf_sq8"], |
197
|
|
|
"index.nlists": [16384], |
198
|
|
|
"nprobes": [1, 32, 128], |
199
|
|
|
"nqs": [100], |
200
|
|
|
"top_ks": [1, 64], |
201
|
|
|
"server.use_blas_threshold": 1100, |
202
|
|
|
"server.cpu_cache_capacity": 256 |
203
|
|
|
} |
204
|
|
|
""" |
205
|
|
|
for op_type, op_value in definition.items(): |
206
|
|
|
if op_type != "query": |
207
|
|
|
logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type) |
208
|
|
|
break |
209
|
|
|
run_count = op_value["run_count"] |
210
|
|
|
run_params = op_value["params"] |
211
|
|
|
container = None |
212
|
|
|
|
213
|
|
|
for index, param in enumerate(run_params): |
214
|
|
|
logger.info("Definition param: %s" % str(param)) |
215
|
|
|
collection_name = param["dataset"] |
216
|
|
|
sift_acc = False |
217
|
|
|
if "sift_acc" in param: |
218
|
|
|
sift_acc = param["sift_acc"] |
219
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
220
|
|
|
for k, v in param.items(): |
221
|
|
|
if k.startswith("server."): |
222
|
|
|
utils.modify_config(k, v, type="server") |
223
|
|
|
volume_name = param["db_path_prefix"] |
224
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
225
|
|
|
time.sleep(2) |
226
|
|
|
milvus = MilvusClient(collection_name) |
227
|
|
|
# Check has collection or not |
228
|
|
|
if not milvus.exists_collection(): |
229
|
|
|
logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
230
|
|
|
continue |
231
|
|
|
|
232
|
|
|
# parse index info |
233
|
|
|
index_types = param["index.index_types"] |
234
|
|
|
nlists = param["index.nlists"] |
235
|
|
|
# parse top-k, nq, nprobe |
236
|
|
|
top_ks, nqs, nprobes = parser.search_params_parser(param) |
237
|
|
|
if sift_acc is True: |
238
|
|
|
# preload groundtruth data |
239
|
|
|
true_ids_all = self.get_groundtruth_ids(collection_size) |
240
|
|
|
acc_dict = {} |
241
|
|
|
for index_type in index_types: |
242
|
|
|
for nlist in nlists: |
243
|
|
|
result = milvus.describe_index() |
244
|
|
|
logger.info(result) |
245
|
|
|
milvus.create_index(index_type, nlist) |
246
|
|
|
# preload index |
247
|
|
|
milvus.preload_collection() |
248
|
|
|
# Run query test |
249
|
|
|
for nprobe in nprobes: |
250
|
|
|
logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
251
|
|
|
for top_k in top_ks: |
252
|
|
|
for nq in nqs: |
253
|
|
|
result_ids = [] |
254
|
|
|
id_prefix = "%s_index_%s_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \ |
255
|
|
|
(collection_name, index_type, nlist, metric_type, nprobe, top_k, nq) |
256
|
|
|
if sift_acc is False: |
257
|
|
|
self.do_query_acc(milvus, collection_name, top_k, nq, nprobe, id_prefix) |
258
|
|
|
if index_type != "flat": |
259
|
|
|
# Compute accuracy |
260
|
|
|
base_name = "%s_index_flat_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \ |
261
|
|
|
(collection_name, nlist, metric_type, nprobe, top_k, nq) |
262
|
|
|
avg_acc = self.compute_accuracy(base_name, id_prefix) |
263
|
|
|
logger.info("Query: <%s> accuracy: %s" % (id_prefix, avg_acc)) |
264
|
|
|
else: |
265
|
|
|
result_ids, result_distances = self.do_query_ids(milvus, collection_name, top_k, nq, nprobe) |
266
|
|
|
debug_file_ids = "0.5.3_result_ids" |
267
|
|
|
debug_file_distances = "0.5.3_result_distances" |
268
|
|
|
with open(debug_file_ids, "w+") as fd: |
269
|
|
|
total = 0 |
270
|
|
|
for index, item in enumerate(result_ids): |
271
|
|
|
true_item = true_ids_all[:nq, :top_k].tolist()[index] |
|
|
|
|
272
|
|
|
tmp = set(item).intersection(set(true_item)) |
273
|
|
|
total = total + len(tmp) |
274
|
|
|
fd.write("query: N-%d, intersection: %d, total: %d\n" % (index, len(tmp), total)) |
275
|
|
|
fd.write("%s\n" % str(item)) |
276
|
|
|
fd.write("%s\n" % str(true_item)) |
277
|
|
|
acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids) |
278
|
|
|
logger.info("Query: <%s> accuracy: %s" % (id_prefix, acc_value)) |
279
|
|
|
# # print accuracy collection |
280
|
|
|
# headers = [collection_name] |
281
|
|
|
# headers.extend([str(top_k) for top_k in top_ks]) |
282
|
|
|
# utils.print_collection(headers, nqs, res) |
283
|
|
|
|
284
|
|
|
# remove container, and run next definition |
285
|
|
|
logger.info("remove container, and run next definition") |
286
|
|
|
utils.remove_container(container) |
287
|
|
|
|
288
|
|
|
elif run_type == "stability": |
289
|
|
|
for op_type, op_value in definition.items(): |
290
|
|
|
if op_type != "query": |
291
|
|
|
logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type) |
292
|
|
|
break |
293
|
|
|
run_count = op_value["run_count"] |
294
|
|
|
run_params = op_value["params"] |
295
|
|
|
container = None |
296
|
|
|
for index, param in enumerate(run_params): |
297
|
|
|
logger.info("Definition param: %s" % str(param)) |
298
|
|
|
collection_name = param["dataset"] |
299
|
|
|
index_type = param["index_type"] |
300
|
|
|
volume_name = param["db_path_prefix"] |
301
|
|
|
(data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
302
|
|
|
|
303
|
|
|
# set default test time |
304
|
|
|
if "during_time" not in param: |
305
|
|
|
during_time = 100 # seconds |
306
|
|
|
else: |
307
|
|
|
during_time = int(param["during_time"]) * 60 |
308
|
|
|
# set default query process num |
309
|
|
|
if "query_process_num" not in param: |
310
|
|
|
query_process_num = 10 |
311
|
|
|
else: |
312
|
|
|
query_process_num = int(param["query_process_num"]) |
313
|
|
|
|
314
|
|
|
for k, v in param.items(): |
315
|
|
|
if k.startswith("server."): |
316
|
|
|
utils.modify_config(k, v, type="server") |
317
|
|
|
|
318
|
|
|
container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
319
|
|
|
time.sleep(2) |
320
|
|
|
milvus = MilvusClient(collection_name) |
321
|
|
|
# Check has collection or not |
322
|
|
|
if not milvus.exists_collection(): |
323
|
|
|
logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
324
|
|
|
continue |
325
|
|
|
|
326
|
|
|
start_time = time.time() |
327
|
|
|
insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
328
|
|
|
i = 0 |
329
|
|
|
while time.time() < start_time + during_time: |
330
|
|
|
i = i + 1 |
331
|
|
|
processes = [] |
332
|
|
|
# do query |
333
|
|
|
# for i in range(query_process_num): |
334
|
|
|
# milvus_instance = MilvusClient(collection_name) |
335
|
|
|
# top_k = random.choice([x for x in range(1, 100)]) |
336
|
|
|
# nq = random.choice([x for x in range(1, 100)]) |
337
|
|
|
# nprobe = random.choice([x for x in range(1, 1000)]) |
338
|
|
|
# # logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
339
|
|
|
# p = Process(target=self.do_query, args=(milvus_instance, collection_name, [top_k], [nq], [nprobe], run_count, )) |
340
|
|
|
# processes.append(p) |
341
|
|
|
# p.start() |
342
|
|
|
# time.sleep(0.1) |
343
|
|
|
# for p in processes: |
344
|
|
|
# p.join() |
345
|
|
|
milvus_instance = MilvusClient(collection_name) |
346
|
|
|
top_ks = random.sample([x for x in range(1, 100)], 3) |
347
|
|
|
nqs = random.sample([x for x in range(1, 1000)], 3) |
348
|
|
|
nprobe = random.choice([x for x in range(1, 500)]) |
349
|
|
|
res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
350
|
|
|
if i % 10 == 0: |
351
|
|
|
status, res = milvus_instance.insert(insert_vectors, ids=[x for x in range(len(insert_vectors))]) |
352
|
|
|
if not status.OK(): |
353
|
|
|
logger.error(status) |
354
|
|
|
# status = milvus_instance.drop_index() |
355
|
|
|
# if not status.OK(): |
356
|
|
|
# logger.error(status) |
357
|
|
|
# index_type = random.choice(["flat", "ivf_flat", "ivf_sq8"]) |
358
|
|
|
milvus_instance.create_index(index_type, 16384) |
359
|
|
|
result = milvus.describe_index() |
360
|
|
|
logger.info(result) |
361
|
|
|
# milvus_instance.create_index("ivf_sq8", 16384) |
362
|
|
|
utils.remove_container(container) |
363
|
|
|
|
364
|
|
|
else: |
365
|
|
|
logger.warning("Run type: %s not supported" % run_type) |
366
|
|
|
|
367
|
|
|
|