Conditions | 110 |
Total Lines | 1009 |
Code Lines | 803 |
Lines | 18 |
Ratio | 1.78 % |
Changes | 0 |
Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.
For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.
Commonly applied refactorings include:
If many parameters/temporary variables are present:
Complex classes like milvus_benchmark.k8s_runner.K8sRunner.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import os |
||
105 | def run(self, run_type, collection): |
||
106 | """ |
||
107 | override runner.run |
||
108 | """ |
||
109 | logger.debug(run_type) |
||
110 | logger.debug(collection) |
||
111 | collection_name = collection["collection_name"] if "collection_name" in collection else None |
||
112 | milvus_instance = MilvusClient(collection_name=collection_name, host=self.host) |
||
113 | self.env_value = milvus_instance.get_server_config() |
||
114 | |||
115 | # ugly implemention |
||
116 | # remove some parts of result before uploading results |
||
117 | self.env_value.pop("logs") |
||
118 | if milvus_instance.get_server_mode() == "CPU": |
||
119 | if "gpu" in self.env_value: |
||
120 | self.env_value.pop("gpu") |
||
121 | elif "cache.enable" in self.env_value["gpu"]: |
||
122 | self.env_value["gpu"].pop("cache.enable") |
||
123 | |||
124 | self.env_value.pop("network") |
||
125 | |||
126 | if run_type == "insert_performance": |
||
127 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
128 | ni_per = collection["ni_per"] |
||
129 | build_index = collection["build_index"] |
||
130 | if milvus_instance.exists_collection(): |
||
131 | milvus_instance.drop() |
||
132 | time.sleep(10) |
||
133 | index_info = {} |
||
134 | search_params = {} |
||
135 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
136 | if build_index is True: |
||
137 | index_type = collection["index_type"] |
||
138 | index_param = collection["index_param"] |
||
139 | index_info = { |
||
140 | "index_type": index_type, |
||
141 | "index_param": index_param |
||
142 | } |
||
143 | milvus_instance.create_index(index_type, index_param) |
||
144 | logger.debug(milvus_instance.describe_index()) |
||
145 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
146 | logger.info(res) |
||
147 | if "flush" in collection and collection["flush"] == "no": |
||
148 | logger.debug("No manual flush") |
||
149 | else: |
||
150 | milvus_instance.flush() |
||
151 | logger.debug(milvus_instance.count()) |
||
152 | collection_info = { |
||
153 | "dimension": dimension, |
||
154 | "metric_type": metric_type, |
||
155 | "dataset_name": collection_name |
||
156 | } |
||
157 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
158 | metric.metrics = { |
||
159 | "type": run_type, |
||
160 | "value": { |
||
161 | "total_time": res["total_time"], |
||
162 | "qps": res["qps"], |
||
163 | "ni_time": res["ni_time"] |
||
164 | } |
||
165 | } |
||
166 | report(metric) |
||
167 | if build_index is True: |
||
168 | logger.debug("Start build index for last file") |
||
169 | milvus_instance.create_index(index_type, index_param) |
||
170 | logger.debug(milvus_instance.describe_index()) |
||
171 | |||
172 | elif run_type == "insert_debug_performance": |
||
173 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
174 | ni_per = collection["ni_per"] |
||
175 | if milvus_instance.exists_collection(): |
||
176 | milvus_instance.drop() |
||
177 | time.sleep(10) |
||
178 | index_info = {} |
||
179 | search_params = {} |
||
180 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
181 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(ni_per)] |
||
182 | start_time = time.time() |
||
183 | i = 0 |
||
184 | while time.time() < start_time + 2 * 24 * 3600: |
||
185 | i = i + 1 |
||
186 | logger.debug(i) |
||
187 | logger.debug("Row count: %d" % milvus_instance.count()) |
||
188 | milvus_instance.insert(insert_vectors) |
||
189 | time.sleep(0.1) |
||
190 | |||
191 | elif run_type == "insert_performance_multi_collections": |
||
192 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
193 | ni_per = collection["ni_per"] |
||
194 | build_index = collection["build_index"] |
||
195 | if milvus_instance.exists_collection(): |
||
196 | milvus_instance.drop() |
||
197 | time.sleep(10) |
||
198 | index_info = {} |
||
199 | search_params = {} |
||
200 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
201 | if build_index is True: |
||
202 | index_type = collection["index_type"] |
||
203 | index_param = collection["index_param"] |
||
204 | index_info = { |
||
205 | "index_type": index_type, |
||
206 | "index_param": index_param |
||
207 | } |
||
208 | milvus_instance.create_index(index_type, index_param) |
||
209 | logger.debug(milvus_instance.describe_index()) |
||
210 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
211 | logger.info(res) |
||
212 | milvus_instance.flush() |
||
213 | collection_info = { |
||
214 | "dimension": dimension, |
||
215 | "metric_type": metric_type, |
||
216 | "dataset_name": collection_name |
||
217 | } |
||
218 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
219 | metric.metrics = { |
||
220 | "type": run_type, |
||
221 | "value": { |
||
222 | "total_time": res["total_time"], |
||
223 | "qps": res["qps"], |
||
224 | "ni_time": res["ni_time"] |
||
225 | } |
||
226 | } |
||
227 | report(metric) |
||
228 | if build_index is True: |
||
229 | logger.debug("Start build index for last file") |
||
230 | milvus_instance.create_index(index_type, index_param) |
||
231 | logger.debug(milvus_instance.describe_index()) |
||
232 | |||
233 | elif run_type == "insert_flush_performance": |
||
234 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
235 | ni_per = collection["ni_per"] |
||
236 | if milvus_instance.exists_collection(): |
||
237 | milvus_instance.drop() |
||
238 | time.sleep(10) |
||
239 | index_info = {} |
||
240 | search_params = {} |
||
241 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
242 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
243 | logger.info(res) |
||
244 | logger.debug(milvus_instance.count()) |
||
245 | start_time = time.time() |
||
246 | milvus_instance.flush() |
||
247 | end_time = time.time() |
||
248 | logger.debug(milvus_instance.count()) |
||
249 | collection_info = { |
||
250 | "dimension": dimension, |
||
251 | "metric_type": metric_type, |
||
252 | "dataset_name": collection_name |
||
253 | } |
||
254 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
255 | metric.metrics = { |
||
256 | "type": run_type, |
||
257 | "value": { |
||
258 | "flush_time": round(end_time - start_time, 1) |
||
259 | } |
||
260 | } |
||
261 | report(metric) |
||
262 | |||
263 | elif run_type == "build_performance": |
||
264 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
265 | index_type = collection["index_type"] |
||
266 | index_param = collection["index_param"] |
||
267 | collection_info = { |
||
268 | "dimension": dimension, |
||
269 | "metric_type": metric_type, |
||
270 | "index_file_size": index_file_size, |
||
271 | "dataset_name": collection_name |
||
272 | } |
||
273 | index_info = { |
||
274 | "index_type": index_type, |
||
275 | "index_param": index_param |
||
276 | } |
||
277 | if not milvus_instance.exists_collection(): |
||
278 | logger.error("Table name: %s not existed" % collection_name) |
||
279 | return |
||
280 | search_params = {} |
||
281 | start_time = time.time() |
||
282 | # drop index |
||
283 | logger.debug("Drop index") |
||
284 | milvus_instance.drop_index() |
||
285 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
286 | milvus_instance.create_index(index_type, index_param) |
||
287 | logger.debug(milvus_instance.describe_index()) |
||
288 | logger.debug(milvus_instance.count()) |
||
289 | end_time = time.time() |
||
290 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
291 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
292 | metric.metrics = { |
||
293 | "type": "build_performance", |
||
294 | "value": { |
||
295 | "build_time": round(end_time - start_time, 1), |
||
296 | "start_mem_usage": start_mem_usage, |
||
297 | "end_mem_usage": end_mem_usage, |
||
298 | "diff_mem": end_mem_usage - start_mem_usage |
||
299 | } |
||
300 | } |
||
301 | report(metric) |
||
302 | |||
303 | elif run_type == "delete_performance": |
||
304 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
305 | ni_per = collection["ni_per"] |
||
306 | search_params = {} |
||
307 | collection_info = { |
||
308 | "dimension": dimension, |
||
309 | "metric_type": metric_type, |
||
310 | "dataset_name": collection_name |
||
311 | } |
||
312 | if not milvus_instance.exists_collection(): |
||
313 | logger.error("Table name: %s not existed" % collection_name) |
||
314 | return |
||
315 | length = milvus_instance.count() |
||
316 | logger.info(length) |
||
317 | index_info = milvus_instance.describe_index() |
||
318 | logger.info(index_info) |
||
319 | ids = [i for i in range(length)] |
||
320 | loops = int(length / ni_per) |
||
321 | milvus_instance.preload_collection() |
||
322 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
323 | start_time = time.time() |
||
324 | for i in range(loops): |
||
325 | delete_ids = ids[i*ni_per : i*ni_per+ni_per] |
||
326 | logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1])) |
||
327 | milvus_instance.delete(delete_ids) |
||
328 | # milvus_instance.flush() |
||
329 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
330 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
331 | milvus_instance.flush() |
||
332 | end_time = time.time() |
||
333 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
334 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
335 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
336 | metric.metrics = { |
||
337 | "type": "delete_performance", |
||
338 | "value": { |
||
339 | "delete_time": round(end_time - start_time, 1), |
||
340 | "start_mem_usage": start_mem_usage, |
||
341 | "end_mem_usage": end_mem_usage, |
||
342 | "diff_mem": end_mem_usage - start_mem_usage |
||
343 | } |
||
344 | } |
||
345 | report(metric) |
||
346 | |||
347 | elif run_type == "get_ids_performance": |
||
348 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
349 | ids_length_per_segment = collection["ids_length_per_segment"] |
||
350 | if not milvus_instance.exists_collection(): |
||
351 | logger.error("Table name: %s not existed" % collection_name) |
||
352 | return |
||
353 | collection_info = { |
||
354 | "dimension": dimension, |
||
355 | "metric_type": metric_type, |
||
356 | "index_file_size": index_file_size, |
||
357 | "dataset_name": collection_name |
||
358 | } |
||
359 | search_params = {} |
||
360 | logger.info(milvus_instance.count()) |
||
361 | index_info = milvus_instance.describe_index() |
||
362 | logger.info(index_info) |
||
363 | for ids_num in ids_length_per_segment: |
||
364 | segment_num, get_ids = milvus_instance.get_rand_ids_each_segment(ids_num) |
||
365 | start_time = time.time() |
||
366 | _ = milvus_instance.get_entities(get_ids) |
||
367 | total_time = time.time() - start_time |
||
368 | avg_time = total_time / segment_num |
||
369 | run_params = {"ids_num": ids_num} |
||
370 | logger.info("Segment num: %d, ids num per segment: %d, run_time: %f" % (segment_num, ids_num, total_time)) |
||
371 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params=run_params) |
||
372 | metric.metrics = { |
||
373 | "type": run_type, |
||
374 | "value": { |
||
375 | "total_time": round(total_time, 1), |
||
376 | "avg_time": round(avg_time, 1) |
||
377 | } |
||
378 | } |
||
379 | report(metric) |
||
380 | |||
381 | elif run_type == "search_performance": |
||
382 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
383 | run_count = collection["run_count"] |
||
384 | top_ks = collection["top_ks"] |
||
385 | nqs = collection["nqs"] |
||
386 | search_params = collection["search_params"] |
||
387 | collection_info = { |
||
388 | "dimension": dimension, |
||
389 | "metric_type": metric_type, |
||
390 | "index_file_size": index_file_size, |
||
391 | "dataset_name": collection_name |
||
392 | } |
||
393 | if not milvus_instance.exists_collection(): |
||
394 | logger.error("Table name: %s not existed" % collection_name) |
||
395 | return |
||
396 | |||
397 | logger.info(milvus_instance.count()) |
||
398 | index_info = milvus_instance.describe_index() |
||
399 | logger.info(index_info) |
||
400 | milvus_instance.preload_collection() |
||
401 | logger.info("Start warm up query") |
||
402 | res = self.do_query(milvus_instance, collection_name, [1], [1], 2, search_param=search_params[0]) |
||
403 | logger.info("End warm up query") |
||
404 | for search_param in search_params: |
||
405 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
406 | res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param) |
||
407 | headers = ["Nq/Top-k"] |
||
408 | headers.extend([str(top_k) for top_k in top_ks]) |
||
409 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
410 | utils.print_table(headers, nqs, res) |
||
411 | for index_nq, nq in enumerate(nqs): |
||
412 | for index_top_k, top_k in enumerate(top_ks): |
||
413 | search_param_group = { |
||
414 | "nq": nq, |
||
415 | "topk": top_k, |
||
416 | "search_param": search_param |
||
417 | } |
||
418 | search_time = res[index_nq][index_top_k] |
||
419 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
420 | metric.metrics = { |
||
421 | "type": "search_performance", |
||
422 | "value": { |
||
423 | "search_time": search_time |
||
424 | } |
||
425 | } |
||
426 | report(metric) |
||
427 | |||
428 | elif run_type == "locust_search_performance": |
||
429 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser( |
||
430 | collection_name) |
||
431 | ### clear db |
||
432 | ### spawn locust requests |
||
433 | collection_num = collection["collection_num"] |
||
434 | task = collection["task"] |
||
435 | # . generate task code |
||
436 | task_file = utils.get_unique_name() |
||
437 | task_file_script = task_file + '.py' |
||
438 | task_file_csv = task_file + '_stats.csv' |
||
439 | task_type = task["type"] |
||
440 | connection_type = "single" |
||
441 | connection_num = task["connection_num"] |
||
442 | if connection_num > 1: |
||
443 | connection_type = "multi" |
||
444 | clients_num = task["clients_num"] |
||
445 | hatch_rate = task["hatch_rate"] |
||
446 | during_time = task["during_time"] |
||
447 | def_name = task_type |
||
448 | task_params = task["params"] |
||
449 | collection_names = [] |
||
450 | for i in range(collection_num): |
||
451 | suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) |
||
452 | collection_names.append(collection_name + "_" + suffix) |
||
453 | # ##### |
||
454 | ni_per = collection["ni_per"] |
||
455 | build_index = collection["build_index"] |
||
456 | # TODO: debug |
||
457 | for c_name in collection_names: |
||
458 | milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port) |
||
459 | if milvus_instance.exists_collection(collection_name=c_name): |
||
460 | milvus_instance.drop(name=c_name) |
||
461 | time.sleep(10) |
||
462 | milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type) |
||
463 | index_info = { |
||
464 | "build_index": build_index |
||
465 | } |
||
466 | if build_index is True: |
||
467 | index_type = collection["index_type"] |
||
468 | index_param = collection["index_param"] |
||
469 | index_info.update({ |
||
470 | "index_type": index_type, |
||
471 | "index_param": index_param |
||
472 | }) |
||
473 | milvus_instance.create_index(index_type, index_param) |
||
474 | logger.debug(milvus_instance.describe_index()) |
||
475 | res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per) |
||
476 | logger.info(res) |
||
477 | if "flush" in collection and collection["flush"] == "no": |
||
478 | logger.debug("No manual flush") |
||
479 | else: |
||
480 | milvus_instance.flush() |
||
481 | logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name)) |
||
482 | if build_index is True: |
||
483 | logger.debug("Start build index for last file") |
||
484 | milvus_instance.create_index(index_type, index_param) |
||
485 | logger.debug(milvus_instance.describe_index()) |
||
486 | code_str = """ |
||
487 | import random |
||
488 | import string |
||
489 | from locust import User, task, between |
||
490 | from locust_task import MilvusTask |
||
491 | from client import MilvusClient |
||
492 | |||
493 | host = '%s' |
||
494 | port = %s |
||
495 | dim = %s |
||
496 | connection_type = '%s' |
||
497 | collection_names = %s |
||
498 | m = MilvusClient(host=host, port=port) |
||
499 | |||
500 | |||
501 | def get_collection_name(): |
||
502 | return random.choice(collection_names) |
||
503 | |||
504 | |||
505 | def get_client(collection_name): |
||
506 | if connection_type == 'single': |
||
507 | return MilvusTask(m=m) |
||
508 | elif connection_type == 'multi': |
||
509 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
510 | |||
511 | |||
512 | class QueryTask(User): |
||
513 | wait_time = between(0.001, 0.002) |
||
514 | |||
515 | @task() |
||
516 | def %s(self): |
||
517 | top_k = %s |
||
518 | X = [[random.random() for i in range(dim)] for i in range(%s)] |
||
519 | search_param = %s |
||
520 | collection_name = get_collection_name() |
||
521 | client = get_client(collection_name) |
||
522 | client.query(X, top_k, search_param, collection_name=collection_name) |
||
523 | """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"]) |
||
524 | with open(task_file_script, 'w+') as fd: |
||
525 | fd.write(code_str) |
||
526 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
527 | task_file_script, |
||
528 | task_file, |
||
529 | clients_num, |
||
530 | hatch_rate, |
||
531 | during_time) |
||
532 | logger.info(locust_cmd) |
||
533 | try: |
||
534 | res = os.system(locust_cmd) |
||
535 | except Exception as e: |
||
536 | logger.error(str(e)) |
||
537 | return |
||
538 | |||
539 | # . retrieve and collect test statistics |
||
540 | locust_stats = None |
||
541 | with open(task_file_csv, newline='') as fd: |
||
542 | dr = csv.DictReader(fd) |
||
543 | for row in dr: |
||
544 | if row["Name"] != "Aggregated": |
||
545 | continue |
||
546 | locust_stats = row |
||
547 | logger.info(locust_stats) |
||
548 | # clean up temp files |
||
549 | search_params = { |
||
550 | "top_k": task_params["top_k"], |
||
551 | "nq": task_params["nq"], |
||
552 | "nprobe": task_params["search_param"]["nprobe"] |
||
553 | } |
||
554 | run_params = { |
||
555 | "connection_num": connection_num, |
||
556 | "clients_num": clients_num, |
||
557 | "hatch_rate": hatch_rate, |
||
558 | "during_time": during_time |
||
559 | } |
||
560 | collection_info = { |
||
561 | "dimension": dimension, |
||
562 | "metric_type": metric_type, |
||
563 | "index_file_size": index_file_size, |
||
564 | "dataset_name": collection_name |
||
565 | } |
||
566 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params) |
||
567 | metric.metrics = { |
||
568 | "type": run_type, |
||
569 | "value": { |
||
570 | "during_time": during_time, |
||
571 | "request_count": int(locust_stats["Request Count"]), |
||
572 | "failure_count": int(locust_stats["Failure Count"]), |
||
573 | "qps": locust_stats["Requests/s"], |
||
574 | "min_response_time": int(locust_stats["Min Response Time"]), |
||
575 | "max_response_time": int(locust_stats["Max Response Time"]), |
||
576 | "median_response_time": int(locust_stats["Median Response Time"]), |
||
577 | "avg_response_time": int(locust_stats["Average Response Time"]) |
||
578 | } |
||
579 | } |
||
580 | report(metric) |
||
581 | |||
582 | elif run_type == "search_ids_stability": |
||
583 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
584 | search_params = collection["search_params"] |
||
585 | during_time = collection["during_time"] |
||
586 | ids_length = collection["ids_length"] |
||
587 | ids = collection["ids"] |
||
588 | collection_info = { |
||
589 | "dimension": dimension, |
||
590 | "metric_type": metric_type, |
||
591 | "index_file_size": index_file_size, |
||
592 | "dataset_name": collection_name |
||
593 | } |
||
594 | if not milvus_instance.exists_collection(): |
||
595 | logger.error("Table name: %s not existed" % collection_name) |
||
596 | return |
||
597 | logger.info(milvus_instance.count()) |
||
598 | index_info = milvus_instance.describe_index() |
||
599 | logger.info(index_info) |
||
600 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
601 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
602 | # g_id = int(ids.split("-")[1]) |
||
603 | # l_id = int(ids.split("-")[0]) |
||
604 | g_id_length = int(ids_length.split("-")[1]) |
||
605 | l_id_length = int(ids_length.split("-")[0]) |
||
606 | |||
607 | milvus_instance.preload_collection() |
||
608 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
609 | logger.debug(start_mem_usage) |
||
610 | start_time = time.time() |
||
611 | while time.time() < start_time + during_time * 60: |
||
612 | search_param = {} |
||
613 | top_k = random.randint(l_top_k, g_top_k) |
||
614 | ids_num = random.randint(l_id_length, g_id_length) |
||
615 | ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)] |
||
616 | for k, v in search_params.items(): |
||
617 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
618 | logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param))) |
||
619 | result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param) |
||
620 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
621 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
622 | metric.metrics = { |
||
623 | "type": "search_ids_stability", |
||
624 | "value": { |
||
625 | "during_time": during_time, |
||
626 | "start_mem_usage": start_mem_usage, |
||
627 | "end_mem_usage": end_mem_usage, |
||
628 | "diff_mem": end_mem_usage - start_mem_usage |
||
629 | } |
||
630 | } |
||
631 | report(metric) |
||
632 | |||
633 | # for sift/deep datasets |
||
634 | # TODO: enable |
||
635 | elif run_type == "accuracy": |
||
636 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
637 | search_params = collection["search_params"] |
||
638 | # mapping to search param list |
||
639 | search_params = self.generate_combinations(search_params) |
||
640 | |||
641 | top_ks = collection["top_ks"] |
||
642 | nqs = collection["nqs"] |
||
643 | collection_info = { |
||
644 | "dimension": dimension, |
||
645 | "metric_type": metric_type, |
||
646 | "index_file_size": index_file_size, |
||
647 | "dataset_name": collection_name |
||
648 | } |
||
649 | if not milvus_instance.exists_collection(): |
||
650 | logger.error("Table name: %s not existed" % collection_name) |
||
651 | return |
||
652 | logger.info(milvus_instance.count()) |
||
653 | index_info = milvus_instance.describe_index() |
||
654 | logger.info(index_info) |
||
655 | milvus_instance.preload_collection() |
||
656 | true_ids_all = self.get_groundtruth_ids(collection_size) |
||
657 | for search_param in search_params: |
||
658 | for top_k in top_ks: |
||
659 | for nq in nqs: |
||
660 | # total = 0 |
||
661 | search_param_group = { |
||
662 | "nq": nq, |
||
663 | "topk": top_k, |
||
664 | "search_param": search_param |
||
665 | } |
||
666 | logger.info("Query params: %s" % json.dumps(search_param_group)) |
||
667 | result_ids, _ = self.do_query_ids(milvus_instance, collection_name, top_k, nq, search_param=search_param) |
||
668 | acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids) |
||
669 | logger.info("Query accuracy: %s" % acc_value) |
||
670 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
671 | metric.metrics = { |
||
672 | "type": "accuracy", |
||
673 | "value": { |
||
674 | "acc": acc_value |
||
675 | } |
||
676 | } |
||
677 | report(metric) |
||
678 | |||
679 | elif run_type == "ann_accuracy": |
||
680 | hdf5_source_file = collection["source_file"] |
||
681 | collection_name = collection["collection_name"] |
||
682 | index_file_sizes = collection["index_file_sizes"] |
||
683 | index_types = collection["index_types"] |
||
684 | index_params = collection["index_params"] |
||
685 | top_ks = collection["top_ks"] |
||
686 | nqs = collection["nqs"] |
||
687 | search_params = collection["search_params"] |
||
688 | # mapping to search param list |
||
689 | search_params = self.generate_combinations(search_params) |
||
690 | # mapping to index param list |
||
691 | index_params = self.generate_combinations(index_params) |
||
692 | |||
693 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
694 | dataset = utils.get_dataset(hdf5_source_file) |
||
695 | true_ids = np.array(dataset["neighbors"]) |
||
696 | for index_file_size in index_file_sizes: |
||
697 | collection_info = { |
||
698 | "dimension": dimension, |
||
699 | "metric_type": metric_type, |
||
700 | "index_file_size": index_file_size, |
||
701 | "dataset_name": collection_name |
||
702 | } |
||
703 | if milvus_instance.exists_collection(collection_name): |
||
704 | logger.info("Re-create collection: %s" % collection_name) |
||
705 | milvus_instance.drop() |
||
706 | time.sleep(DELETE_INTERVAL_TIME) |
||
707 | |||
708 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
709 | logger.info(milvus_instance.describe()) |
||
710 | insert_vectors = self.normalize(metric_type, np.array(dataset["train"])) |
||
711 | # Insert batch once |
||
712 | # milvus_instance.insert(insert_vectors) |
||
713 | loops = len(insert_vectors) // INSERT_INTERVAL + 1 |
||
714 | for i in range(loops): |
||
715 | start = i*INSERT_INTERVAL |
||
716 | end = min((i+1)*INSERT_INTERVAL, len(insert_vectors)) |
||
717 | tmp_vectors = insert_vectors[start:end] |
||
718 | if start < end: |
||
719 | if not isinstance(tmp_vectors, list): |
||
720 | milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)]) |
||
721 | else: |
||
722 | milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)]) |
||
723 | milvus_instance.flush() |
||
724 | logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count())) |
||
725 | if milvus_instance.count() != len(insert_vectors): |
||
726 | logger.error("Table row count is not equal to insert vectors") |
||
727 | return |
||
728 | for index_type in index_types: |
||
729 | for index_param in index_params: |
||
730 | logger.debug("Building index with param: %s" % json.dumps(index_param)) |
||
731 | milvus_instance.create_index(index_type, index_param=index_param) |
||
732 | logger.info(milvus_instance.describe_index()) |
||
733 | logger.info("Start preload collection: %s" % collection_name) |
||
734 | milvus_instance.preload_collection() |
||
735 | index_info = { |
||
736 | "index_type": index_type, |
||
737 | "index_param": index_param |
||
738 | } |
||
739 | logger.debug(index_info) |
||
740 | for search_param in search_params: |
||
741 | for nq in nqs: |
||
742 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
743 | for top_k in top_ks: |
||
744 | search_param_group = { |
||
745 | "nq": len(query_vectors), |
||
746 | "topk": top_k, |
||
747 | "search_param": search_param |
||
748 | } |
||
749 | logger.debug(search_param_group) |
||
750 | if not isinstance(query_vectors, list): |
||
751 | result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param) |
||
752 | else: |
||
753 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
754 | if len(result): |
||
755 | logger.debug(len(result)) |
||
756 | logger.debug(len(result[0])) |
||
757 | result_ids = result.id_array |
||
758 | acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids) |
||
759 | logger.info("Query ann_accuracy: %s" % acc_value) |
||
760 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
761 | metric.metrics = { |
||
762 | "type": "ann_accuracy", |
||
763 | "value": { |
||
764 | "acc": acc_value |
||
765 | } |
||
766 | } |
||
767 | report(metric) |
||
768 | |||
769 | elif run_type == "search_stability": |
||
770 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
771 | search_params = collection["search_params"] |
||
772 | during_time = collection["during_time"] |
||
773 | collection_info = { |
||
774 | "dimension": dimension, |
||
775 | "metric_type": metric_type, |
||
776 | "dataset_name": collection_name |
||
777 | } |
||
778 | if not milvus_instance.exists_collection(): |
||
779 | logger.error("Table name: %s not existed" % collection_name) |
||
780 | return |
||
781 | logger.info(milvus_instance.count()) |
||
782 | index_info = milvus_instance.describe_index() |
||
783 | logger.info(index_info) |
||
784 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
785 | g_nq = int(collection["nqs"].split("-")[1]) |
||
786 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
787 | l_nq = int(collection["nqs"].split("-")[0]) |
||
788 | milvus_instance.preload_collection() |
||
789 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
790 | logger.debug(start_mem_usage) |
||
791 | start_row_count = milvus_instance.count() |
||
792 | logger.debug(milvus_instance.describe_index()) |
||
793 | logger.info(start_row_count) |
||
794 | start_time = time.time() |
||
795 | while time.time() < start_time + during_time * 60: |
||
796 | search_param = {} |
||
797 | top_k = random.randint(l_top_k, g_top_k) |
||
798 | nq = random.randint(l_nq, g_nq) |
||
799 | for k, v in search_params.items(): |
||
800 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
801 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(nq)] |
||
802 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
803 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
804 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
805 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
806 | metric.metrics = { |
||
807 | "type": "search_stability", |
||
808 | "value": { |
||
809 | "during_time": during_time, |
||
810 | "start_mem_usage": start_mem_usage, |
||
811 | "end_mem_usage": end_mem_usage, |
||
812 | "diff_mem": end_mem_usage - start_mem_usage |
||
813 | } |
||
814 | } |
||
815 | report(metric) |
||
816 | |||
817 | elif run_type == "loop_stability": |
||
818 | # init data |
||
819 | milvus_instance.clean_db() |
||
820 | pull_interval = collection["pull_interval"] |
||
821 | collection_num = collection["collection_num"] |
||
822 | concurrent = collection["concurrent"] if "concurrent" in collection else False |
||
823 | concurrent_num = collection_num |
||
824 | dimension = collection["dimension"] if "dimension" in collection else 128 |
||
825 | insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000 |
||
826 | index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8'] |
||
827 | index_param = {"nlist": 2048} |
||
828 | collection_names = [] |
||
829 | milvus_instances_map = {} |
||
830 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
831 | for i in range(collection_num): |
||
832 | name = utils.get_unique_name(prefix="collection_") |
||
833 | collection_names.append(name) |
||
834 | metric_type = random.choice(["l2", "ip"]) |
||
835 | index_file_size = random.randint(10, 20) |
||
836 | milvus_instance.create_collection(name, dimension, index_file_size, metric_type) |
||
837 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
838 | index_type = random.choice(index_types) |
||
839 | milvus_instance.create_index(index_type, index_param=index_param) |
||
840 | logger.info(milvus_instance.describe_index()) |
||
841 | insert_vectors = utils.normalize(metric_type, insert_vectors) |
||
842 | milvus_instance.insert(insert_vectors) |
||
843 | milvus_instance.flush() |
||
844 | milvus_instances_map.update({name: milvus_instance}) |
||
845 | logger.info(milvus_instance.describe_index()) |
||
846 | logger.info(milvus_instance.describe()) |
||
847 | |||
848 | # loop time unit: min -> s |
||
849 | pull_interval_seconds = pull_interval * 60 |
||
850 | tasks = ["insert_rand", "delete_rand", "query_rand", "flush", "compact"] |
||
851 | i = 1 |
||
852 | while True: |
||
853 | logger.info("Loop time: %d" % i) |
||
854 | start_time = time.time() |
||
855 | while time.time() - start_time < pull_interval_seconds: |
||
856 | if concurrent: |
||
857 | mp = [] |
||
858 | for _ in range(concurrent_num): |
||
859 | tmp_collection_name = random.choice(collection_names) |
||
860 | task_name = random.choice(tasks) |
||
861 | mp.append((tmp_collection_name, task_name)) |
||
862 | |||
863 | with futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
864 | future_results = {executor.submit(getattr(milvus_instances_map[mp[j][0]], mp[j][1])): j for j in range(concurrent_num)} |
||
865 | for future in futures.as_completed(future_results): |
||
866 | future.result() |
||
867 | |||
868 | else: |
||
869 | tmp_collection_name = random.choice(collection_names) |
||
870 | task_name = random.choice(tasks) |
||
871 | logger.info(tmp_collection_name) |
||
872 | logger.info(task_name) |
||
873 | task_run = getattr(milvus_instances_map[tmp_collection_name], task_name) |
||
874 | task_run() |
||
875 | |||
876 | logger.debug("Restart server") |
||
877 | utils.restart_server(self.service_name, namespace) |
||
878 | # new connection |
||
879 | for name in collection_names: |
||
880 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
881 | milvus_instances_map.update({name: milvus_instance}) |
||
882 | i = i + 1 |
||
883 | |||
884 | elif run_type == "stability": |
||
885 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
886 | search_params = collection["search_params"] |
||
887 | insert_xb = collection["insert_xb"] |
||
888 | insert_interval = collection["insert_interval"] |
||
889 | delete_xb = collection["delete_xb"] |
||
890 | during_time = collection["during_time"] |
||
891 | collection_info = { |
||
892 | "dimension": dimension, |
||
893 | "metric_type": metric_type, |
||
894 | "dataset_name": collection_name |
||
895 | } |
||
896 | if not milvus_instance.exists_collection(): |
||
897 | logger.error("Table name: %s not existed" % collection_name) |
||
898 | return |
||
899 | logger.info(milvus_instance.count()) |
||
900 | index_info = milvus_instance.describe_index() |
||
901 | logger.info(index_info) |
||
902 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
903 | g_nq = int(collection["nqs"].split("-")[1]) |
||
904 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
905 | l_nq = int(collection["nqs"].split("-")[0]) |
||
906 | milvus_instance.preload_collection() |
||
907 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
908 | start_row_count = milvus_instance.count() |
||
909 | logger.debug(milvus_instance.describe_index()) |
||
910 | logger.info(start_row_count) |
||
911 | start_time = time.time() |
||
912 | i = 0 |
||
913 | ids = [] |
||
914 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
915 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
||
916 | View Code Duplication | while time.time() < start_time + during_time * 60: |
|
917 | i = i + 1 |
||
918 | for j in range(insert_interval): |
||
919 | top_k = random.randint(l_top_k, g_top_k) |
||
920 | nq = random.randint(l_nq, g_nq) |
||
921 | search_param = {} |
||
922 | for k, v in search_params.items(): |
||
923 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
924 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
925 | result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param) |
||
926 | count = milvus_instance.count() |
||
927 | insert_ids = [(count+x) for x in range(len(insert_vectors))] |
||
928 | ids.extend(insert_ids) |
||
929 | status, res = milvus_instance.insert(insert_vectors, ids=insert_ids) |
||
930 | logger.debug("%d, row_count: %d" % (i, milvus_instance.count())) |
||
931 | milvus_instance.delete(ids[-delete_xb:]) |
||
932 | milvus_instance.flush() |
||
933 | milvus_instance.compact() |
||
934 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
935 | end_row_count = milvus_instance.count() |
||
936 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
937 | metric.metrics = { |
||
938 | "type": "stability", |
||
939 | "value": { |
||
940 | "during_time": during_time, |
||
941 | "start_mem_usage": start_mem_usage, |
||
942 | "end_mem_usage": end_mem_usage, |
||
943 | "diff_mem": end_mem_usage - start_mem_usage, |
||
944 | "row_count_increments": end_row_count - start_row_count |
||
945 | } |
||
946 | } |
||
947 | report(metric) |
||
948 | |||
949 | elif run_type == "locust_mix_performance": |
||
950 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser( |
||
951 | collection_name) |
||
952 | ni_per = collection["ni_per"] |
||
953 | build_index = collection["build_index"] |
||
954 | # # TODO: debug |
||
955 | if milvus_instance.exists_collection(): |
||
956 | milvus_instance.drop() |
||
957 | time.sleep(10) |
||
958 | index_info = {} |
||
959 | search_params = {} |
||
960 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
961 | if build_index is True: |
||
962 | index_type = collection["index_type"] |
||
963 | index_param = collection["index_param"] |
||
964 | index_info = { |
||
965 | "index_tyoe": index_type, |
||
966 | "index_param": index_param |
||
967 | } |
||
968 | milvus_instance.create_index(index_type, index_param) |
||
969 | logger.debug(milvus_instance.describe_index()) |
||
970 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
971 | logger.info(res) |
||
972 | if "flush" in collection and collection["flush"] == "no": |
||
973 | logger.debug("No manual flush") |
||
974 | else: |
||
975 | milvus_instance.flush() |
||
976 | if build_index is True: |
||
977 | logger.debug("Start build index for last file") |
||
978 | milvus_instance.create_index(index_type, index_param) |
||
979 | logger.debug(milvus_instance.describe_index()) |
||
980 | ### spawn locust requests |
||
981 | task = collection["tasks"] |
||
982 | # generate task code |
||
983 | task_file = utils.get_unique_name() |
||
984 | task_file_script = task_file + '.py' |
||
985 | task_file_csv = task_file + '_stats.csv' |
||
986 | task_types = task["types"] |
||
987 | connection_type = "single" |
||
988 | connection_num = task["connection_num"] |
||
989 | if connection_num > 1: |
||
990 | connection_type = "multi" |
||
991 | clients_num = task["clients_num"] |
||
992 | hatch_rate = task["hatch_rate"] |
||
993 | during_time = task["during_time"] |
||
994 | def_strs = "" |
||
995 | for task_type in task_types: |
||
996 | _type = task_type["type"] |
||
997 | weight = task_type["weight"] |
||
998 | if _type == "flush": |
||
999 | def_str = """ |
||
1000 | @task(%d) |
||
1001 | def flush(self): |
||
1002 | client = get_client(collection_name) |
||
1003 | client.flush(collection_name=collection_name) |
||
1004 | """ % weight |
||
1005 | if _type == "compact": |
||
1006 | def_str = """ |
||
1007 | @task(%d) |
||
1008 | def compact(self): |
||
1009 | client = get_client(collection_name) |
||
1010 | client.compact(collection_name) |
||
1011 | """ % weight |
||
1012 | if _type == "query": |
||
1013 | def_str = """ |
||
1014 | @task(%d) |
||
1015 | def query(self): |
||
1016 | client = get_client(collection_name) |
||
1017 | params = %s |
||
1018 | X = [[random.random() for i in range(dim)] for i in range(params["nq"])] |
||
1019 | client.query(X, params["top_k"], params["search_param"], collection_name=collection_name) |
||
1020 | """ % (weight, task_type["params"]) |
||
1021 | if _type == "insert": |
||
1022 | def_str = """ |
||
1023 | @task(%d) |
||
1024 | def insert(self): |
||
1025 | client = get_client(collection_name) |
||
1026 | params = %s |
||
1027 | ids = [random.randint(10, 1000000) for i in range(params["nb"])] |
||
1028 | X = [[random.random() for i in range(dim)] for i in range(params["nb"])] |
||
1029 | client.insert(X,ids=ids, collection_name=collection_name) |
||
1030 | """ % (weight, task_type["params"]) |
||
1031 | if _type == "delete": |
||
1032 | def_str = """ |
||
1033 | @task(%d) |
||
1034 | def delete(self): |
||
1035 | client = get_client(collection_name) |
||
1036 | ids = [random.randint(1, 1000000) for i in range(1)] |
||
1037 | client.delete(ids, collection_name) |
||
1038 | """ % weight |
||
1039 | def_strs += def_str |
||
1040 | code_str = """ |
||
1041 | import random |
||
1042 | import json |
||
1043 | from locust import User, task, between |
||
1044 | from locust_task import MilvusTask |
||
1045 | from client import MilvusClient |
||
1046 | |||
1047 | host = '%s' |
||
1048 | port = %s |
||
1049 | collection_name = '%s' |
||
1050 | dim = %s |
||
1051 | connection_type = '%s' |
||
1052 | m = MilvusClient(host=host, port=port) |
||
1053 | |||
1054 | def get_client(collection_name): |
||
1055 | if connection_type == 'single': |
||
1056 | return MilvusTask(m=m) |
||
1057 | elif connection_type == 'multi': |
||
1058 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
1059 | |||
1060 | |||
1061 | class MixTask(User): |
||
1062 | wait_time = between(0.001, 0.002) |
||
1063 | %s |
||
1064 | """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs) |
||
1065 | print(def_strs) |
||
1066 | with open(task_file_script, "w+") as fd: |
||
1067 | fd.write(code_str) |
||
1068 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
1069 | task_file_script, |
||
1070 | task_file, |
||
1071 | clients_num, |
||
1072 | hatch_rate, |
||
1073 | during_time) |
||
1074 | logger.info(locust_cmd) |
||
1075 | try: |
||
1076 | res = os.system(locust_cmd) |
||
1077 | except Exception as e: |
||
1078 | logger.error(str(e)) |
||
1079 | return |
||
1080 | # . retrieve and collect test statistics |
||
1081 | locust_stats = None |
||
1082 | with open(task_file_csv, newline='') as fd: |
||
1083 | dr = csv.DictReader(fd) |
||
1084 | for row in dr: |
||
1085 | if row["Name"] != "Aggregated": |
||
1086 | continue |
||
1087 | locust_stats = row |
||
1088 | logger.info(locust_stats) |
||
1089 | collection_info = { |
||
1090 | "dimension": dimension, |
||
1091 | "metric_type": metric_type, |
||
1092 | "dataset_name": collection_name |
||
1093 | } |
||
1094 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
1095 | metric.metrics = { |
||
1096 | "type": run_type, |
||
1097 | "value": { |
||
1098 | "during_time": during_time, |
||
1099 | "request_count": int(locust_stats["Request Count"]), |
||
1100 | "failure_count": int(locust_stats["Failure Count"]), |
||
1101 | "qps": locust_stats["Requests/s"], |
||
1102 | "min_response_time": int(locust_stats["Min Response Time"]), |
||
1103 | "max_response_time": int(locust_stats["Max Response Time"]), |
||
1104 | "median_response_time": int(locust_stats["Median Response Time"]), |
||
1105 | "avg_response_time": int(locust_stats["Average Response Time"]) |
||
1106 | } |
||
1107 | } |
||
1108 | report(metric) |
||
1109 | |||
1110 | else: |
||
1111 | logger.warning("Run type: %s not defined" % run_type) |
||
1112 | return |
||
1113 | logger.debug("Test finished") |
||
1114 |