Conditions | 63 |
Total Lines | 344 |
Code Lines | 252 |
Lines | 0 |
Ratio | 0 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like milvus_benchmark.docker_runner.DockerRunner.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import os |
||
22 | def run(self, definition, run_type=None): |
||
23 | if run_type == "performance": |
||
24 | for op_type, op_value in definition.items(): |
||
25 | # run docker mode |
||
26 | run_count = op_value["run_count"] |
||
27 | run_params = op_value["params"] |
||
28 | container = None |
||
29 | |||
30 | if op_type == "insert": |
||
31 | if not run_params: |
||
32 | logger.debug("No run params") |
||
33 | continue |
||
34 | for index, param in enumerate(run_params): |
||
35 | logger.info("Definition param: %s" % str(param)) |
||
36 | collection_name = param["collection_name"] |
||
37 | volume_name = param["db_path_prefix"] |
||
38 | print(collection_name) |
||
39 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
40 | for k, v in param.items(): |
||
41 | if k.startswith("server."): |
||
42 | # Update server config |
||
43 | utils.modify_config(k, v, type="server", db_slave=None) |
||
44 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
45 | time.sleep(2) |
||
46 | milvus = MilvusClient(collection_name) |
||
47 | # Check has collection or not |
||
48 | if milvus.exists_collection(): |
||
49 | milvus.delete() |
||
50 | time.sleep(10) |
||
51 | milvus.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
52 | # debug |
||
53 | # milvus.create_index("ivf_sq8", 16384) |
||
54 | res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"]) |
||
55 | logger.info(res) |
||
56 | # wait for file merge |
||
57 | time.sleep(collection_size * dimension / 5000000) |
||
58 | # Clear up |
||
59 | utils.remove_container(container) |
||
60 | |||
61 | elif op_type == "query": |
||
62 | for index, param in enumerate(run_params): |
||
63 | logger.info("Definition param: %s" % str(param)) |
||
64 | collection_name = param["dataset"] |
||
65 | volume_name = param["db_path_prefix"] |
||
66 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
67 | for k, v in param.items(): |
||
68 | if k.startswith("server."): |
||
69 | utils.modify_config(k, v, type="server") |
||
70 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
71 | time.sleep(2) |
||
72 | milvus = MilvusClient(collection_name) |
||
73 | logger.debug(milvus.show_collections()) |
||
74 | # Check has collection or not |
||
75 | if not milvus.exists_collection(): |
||
76 | logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
||
77 | continue |
||
78 | # parse index info |
||
79 | index_types = param["index.index_types"] |
||
80 | nlists = param["index.nlists"] |
||
81 | # parse top-k, nq, nprobe |
||
82 | top_ks, nqs, nprobes = parser.search_params_parser(param) |
||
83 | for index_type in index_types: |
||
84 | for nlist in nlists: |
||
85 | result = milvus.describe_index() |
||
86 | logger.info(result) |
||
87 | # milvus.drop_index() |
||
88 | # milvus.create_index(index_type, nlist) |
||
89 | result = milvus.describe_index() |
||
90 | logger.info(result) |
||
91 | logger.info(milvus.count()) |
||
92 | # preload index |
||
93 | milvus.preload_collection() |
||
94 | logger.info("Start warm up query") |
||
95 | res = self.do_query(milvus, collection_name, [1], [1], 1, 1) |
||
96 | logger.info("End warm up query") |
||
97 | # Run query test |
||
98 | for nprobe in nprobes: |
||
99 | logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
||
100 | res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
||
101 | headers = ["Nq/Top-k"] |
||
102 | headers.extend([str(top_k) for top_k in top_ks]) |
||
103 | utils.print_collection(headers, nqs, res) |
||
104 | utils.remove_container(container) |
||
105 | |||
106 | elif run_type == "insert_performance": |
||
107 | for op_type, op_value in definition.items(): |
||
108 | # run docker mode |
||
109 | run_count = op_value["run_count"] |
||
110 | run_params = op_value["params"] |
||
111 | container = None |
||
112 | if not run_params: |
||
113 | logger.debug("No run params") |
||
114 | continue |
||
115 | for index, param in enumerate(run_params): |
||
116 | logger.info("Definition param: %s" % str(param)) |
||
117 | collection_name = param["collection_name"] |
||
118 | volume_name = param["db_path_prefix"] |
||
119 | print(collection_name) |
||
120 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
121 | for k, v in param.items(): |
||
122 | if k.startswith("server."): |
||
123 | # Update server config |
||
124 | utils.modify_config(k, v, type="server", db_slave=None) |
||
125 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
126 | time.sleep(2) |
||
127 | milvus = MilvusClient(collection_name) |
||
128 | # Check has collection or not |
||
129 | if milvus.exists_collection(): |
||
130 | milvus.delete() |
||
131 | time.sleep(10) |
||
132 | milvus.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
133 | # debug |
||
134 | # milvus.create_index("ivf_sq8", 16384) |
||
135 | res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"]) |
||
136 | logger.info(res) |
||
137 | # wait for file merge |
||
138 | time.sleep(collection_size * dimension / 5000000) |
||
139 | # Clear up |
||
140 | utils.remove_container(container) |
||
141 | |||
142 | elif run_type == "search_performance": |
||
143 | for op_type, op_value in definition.items(): |
||
144 | # run docker mode |
||
145 | run_count = op_value["run_count"] |
||
146 | run_params = op_value["params"] |
||
147 | container = None |
||
148 | for index, param in enumerate(run_params): |
||
149 | logger.info("Definition param: %s" % str(param)) |
||
150 | collection_name = param["dataset"] |
||
151 | volume_name = param["db_path_prefix"] |
||
152 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
153 | for k, v in param.items(): |
||
154 | if k.startswith("server."): |
||
155 | utils.modify_config(k, v, type="server") |
||
156 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
157 | time.sleep(2) |
||
158 | milvus = MilvusClient(collection_name) |
||
159 | logger.debug(milvus.show_collections()) |
||
160 | # Check has collection or not |
||
161 | if not milvus.exists_collection(): |
||
162 | logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
||
163 | continue |
||
164 | # parse index info |
||
165 | index_types = param["index.index_types"] |
||
166 | nlists = param["index.nlists"] |
||
167 | # parse top-k, nq, nprobe |
||
168 | top_ks, nqs, nprobes = parser.search_params_parser(param) |
||
169 | for index_type in index_types: |
||
170 | for nlist in nlists: |
||
171 | result = milvus.describe_index() |
||
172 | logger.info(result) |
||
173 | # milvus.drop_index() |
||
174 | # milvus.create_index(index_type, nlist) |
||
175 | result = milvus.describe_index() |
||
176 | logger.info(result) |
||
177 | logger.info(milvus.count()) |
||
178 | # preload index |
||
179 | milvus.preload_collection() |
||
180 | logger.info("Start warm up query") |
||
181 | res = self.do_query(milvus, collection_name, [1], [1], 1, 1) |
||
182 | logger.info("End warm up query") |
||
183 | # Run query test |
||
184 | for nprobe in nprobes: |
||
185 | logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
||
186 | res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
||
187 | headers = ["Nq/Top-k"] |
||
188 | headers.extend([str(top_k) for top_k in top_ks]) |
||
189 | utils.print_collection(headers, nqs, res) |
||
190 | utils.remove_container(container) |
||
191 | |||
192 | elif run_type == "accuracy": |
||
193 | """ |
||
194 | { |
||
195 | "dataset": "random_50m_1024_512", |
||
196 | "index.index_types": ["flat", ivf_flat", "ivf_sq8"], |
||
197 | "index.nlists": [16384], |
||
198 | "nprobes": [1, 32, 128], |
||
199 | "nqs": [100], |
||
200 | "top_ks": [1, 64], |
||
201 | "server.use_blas_threshold": 1100, |
||
202 | "server.cpu_cache_capacity": 256 |
||
203 | } |
||
204 | """ |
||
205 | for op_type, op_value in definition.items(): |
||
206 | if op_type != "query": |
||
207 | logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type) |
||
208 | break |
||
209 | run_count = op_value["run_count"] |
||
210 | run_params = op_value["params"] |
||
211 | container = None |
||
212 | |||
213 | for index, param in enumerate(run_params): |
||
214 | logger.info("Definition param: %s" % str(param)) |
||
215 | collection_name = param["dataset"] |
||
216 | sift_acc = False |
||
217 | if "sift_acc" in param: |
||
218 | sift_acc = param["sift_acc"] |
||
219 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
220 | for k, v in param.items(): |
||
221 | if k.startswith("server."): |
||
222 | utils.modify_config(k, v, type="server") |
||
223 | volume_name = param["db_path_prefix"] |
||
224 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
225 | time.sleep(2) |
||
226 | milvus = MilvusClient(collection_name) |
||
227 | # Check has collection or not |
||
228 | if not milvus.exists_collection(): |
||
229 | logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
||
230 | continue |
||
231 | |||
232 | # parse index info |
||
233 | index_types = param["index.index_types"] |
||
234 | nlists = param["index.nlists"] |
||
235 | # parse top-k, nq, nprobe |
||
236 | top_ks, nqs, nprobes = parser.search_params_parser(param) |
||
237 | if sift_acc is True: |
||
238 | # preload groundtruth data |
||
239 | true_ids_all = self.get_groundtruth_ids(collection_size) |
||
240 | acc_dict = {} |
||
241 | for index_type in index_types: |
||
242 | for nlist in nlists: |
||
243 | result = milvus.describe_index() |
||
244 | logger.info(result) |
||
245 | milvus.create_index(index_type, nlist) |
||
246 | # preload index |
||
247 | milvus.preload_collection() |
||
248 | # Run query test |
||
249 | for nprobe in nprobes: |
||
250 | logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
||
251 | for top_k in top_ks: |
||
252 | for nq in nqs: |
||
253 | result_ids = [] |
||
254 | id_prefix = "%s_index_%s_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \ |
||
255 | (collection_name, index_type, nlist, metric_type, nprobe, top_k, nq) |
||
256 | if sift_acc is False: |
||
257 | self.do_query_acc(milvus, collection_name, top_k, nq, nprobe, id_prefix) |
||
258 | if index_type != "flat": |
||
259 | # Compute accuracy |
||
260 | base_name = "%s_index_flat_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \ |
||
261 | (collection_name, nlist, metric_type, nprobe, top_k, nq) |
||
262 | avg_acc = self.compute_accuracy(base_name, id_prefix) |
||
263 | logger.info("Query: <%s> accuracy: %s" % (id_prefix, avg_acc)) |
||
264 | else: |
||
265 | result_ids, result_distances = self.do_query_ids(milvus, collection_name, top_k, nq, nprobe) |
||
266 | debug_file_ids = "0.5.3_result_ids" |
||
267 | debug_file_distances = "0.5.3_result_distances" |
||
268 | with open(debug_file_ids, "w+") as fd: |
||
269 | total = 0 |
||
270 | for index, item in enumerate(result_ids): |
||
271 | true_item = true_ids_all[:nq, :top_k].tolist()[index] |
||
|
|||
272 | tmp = set(item).intersection(set(true_item)) |
||
273 | total = total + len(tmp) |
||
274 | fd.write("query: N-%d, intersection: %d, total: %d\n" % (index, len(tmp), total)) |
||
275 | fd.write("%s\n" % str(item)) |
||
276 | fd.write("%s\n" % str(true_item)) |
||
277 | acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids) |
||
278 | logger.info("Query: <%s> accuracy: %s" % (id_prefix, acc_value)) |
||
279 | # # print accuracy collection |
||
280 | # headers = [collection_name] |
||
281 | # headers.extend([str(top_k) for top_k in top_ks]) |
||
282 | # utils.print_collection(headers, nqs, res) |
||
283 | |||
284 | # remove container, and run next definition |
||
285 | logger.info("remove container, and run next definition") |
||
286 | utils.remove_container(container) |
||
287 | |||
288 | elif run_type == "stability": |
||
289 | for op_type, op_value in definition.items(): |
||
290 | if op_type != "query": |
||
291 | logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type) |
||
292 | break |
||
293 | run_count = op_value["run_count"] |
||
294 | run_params = op_value["params"] |
||
295 | container = None |
||
296 | for index, param in enumerate(run_params): |
||
297 | logger.info("Definition param: %s" % str(param)) |
||
298 | collection_name = param["dataset"] |
||
299 | index_type = param["index_type"] |
||
300 | volume_name = param["db_path_prefix"] |
||
301 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
302 | |||
303 | # set default test time |
||
304 | if "during_time" not in param: |
||
305 | during_time = 100 # seconds |
||
306 | else: |
||
307 | during_time = int(param["during_time"]) * 60 |
||
308 | # set default query process num |
||
309 | if "query_process_num" not in param: |
||
310 | query_process_num = 10 |
||
311 | else: |
||
312 | query_process_num = int(param["query_process_num"]) |
||
313 | |||
314 | for k, v in param.items(): |
||
315 | if k.startswith("server."): |
||
316 | utils.modify_config(k, v, type="server") |
||
317 | |||
318 | container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None) |
||
319 | time.sleep(2) |
||
320 | milvus = MilvusClient(collection_name) |
||
321 | # Check has collection or not |
||
322 | if not milvus.exists_collection(): |
||
323 | logger.warning("Table %s not existed, continue exec next params ..." % collection_name) |
||
324 | continue |
||
325 | |||
326 | start_time = time.time() |
||
327 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
||
328 | i = 0 |
||
329 | while time.time() < start_time + during_time: |
||
330 | i = i + 1 |
||
331 | processes = [] |
||
332 | # do query |
||
333 | # for i in range(query_process_num): |
||
334 | # milvus_instance = MilvusClient(collection_name) |
||
335 | # top_k = random.choice([x for x in range(1, 100)]) |
||
336 | # nq = random.choice([x for x in range(1, 100)]) |
||
337 | # nprobe = random.choice([x for x in range(1, 1000)]) |
||
338 | # # logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe)) |
||
339 | # p = Process(target=self.do_query, args=(milvus_instance, collection_name, [top_k], [nq], [nprobe], run_count, )) |
||
340 | # processes.append(p) |
||
341 | # p.start() |
||
342 | # time.sleep(0.1) |
||
343 | # for p in processes: |
||
344 | # p.join() |
||
345 | milvus_instance = MilvusClient(collection_name) |
||
346 | top_ks = random.sample([x for x in range(1, 100)], 3) |
||
347 | nqs = random.sample([x for x in range(1, 1000)], 3) |
||
348 | nprobe = random.choice([x for x in range(1, 500)]) |
||
349 | res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count) |
||
350 | if i % 10 == 0: |
||
351 | status, res = milvus_instance.insert(insert_vectors, ids=[x for x in range(len(insert_vectors))]) |
||
352 | if not status.OK(): |
||
353 | logger.error(status) |
||
354 | # status = milvus_instance.drop_index() |
||
355 | # if not status.OK(): |
||
356 | # logger.error(status) |
||
357 | # index_type = random.choice(["flat", "ivf_flat", "ivf_sq8"]) |
||
358 | milvus_instance.create_index(index_type, 16384) |
||
359 | result = milvus.describe_index() |
||
360 | logger.info(result) |
||
361 | # milvus_instance.create_index("ivf_sq8", 16384) |
||
362 | utils.remove_container(container) |
||
363 | |||
364 | else: |
||
365 | logger.warning("Run type: %s not supported" % run_type) |
||
366 | |||
367 |