| Total Complexity | 118 |
| Total Lines | 1114 |
| Duplicated Lines | 1.62 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like milvus_benchmark.k8s_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import os |
||
| 2 | import logging |
||
| 3 | import pdb |
||
| 4 | import string |
||
| 5 | import time |
||
| 6 | import re |
||
| 7 | import random |
||
| 8 | import traceback |
||
| 9 | import json |
||
| 10 | import csv |
||
| 11 | from multiprocessing import Process |
||
| 12 | import numpy as np |
||
| 13 | from yaml import full_load, dump |
||
| 14 | from concurrent import futures |
||
| 15 | from client import MilvusClient |
||
| 16 | import parser |
||
| 17 | from runner import Runner |
||
| 18 | from milvus_metrics.api import report |
||
| 19 | from milvus_metrics.models import Env, Hardware, Server, Metric |
||
| 20 | import utils |
||
| 21 | |||
| 22 | logger = logging.getLogger("milvus_benchmark.k8s_runner") |
||
| 23 | namespace = "milvus" |
||
| 24 | default_port = 19530 |
||
| 25 | DELETE_INTERVAL_TIME = 5 |
||
| 26 | # INSERT_INTERVAL = 100000 |
||
| 27 | INSERT_INTERVAL = 50000 |
||
| 28 | timestamp = int(time.time()) |
||
| 29 | default_path = "/var/lib/milvus" |
||
| 30 | |||
| 31 | |||
| 32 | class K8sRunner(Runner): |
||
| 33 | def __init__(self): |
||
| 34 | """ |
||
| 35 | Run with helm mode. |
||
| 36 | |||
| 37 | Upload test result after tests finished |
||
| 38 | """ |
||
| 39 | super(K8sRunner, self).__init__() |
||
| 40 | self.service_name = utils.get_unique_name() |
||
| 41 | self.host = None |
||
| 42 | self.port = default_port |
||
| 43 | self.hostname = None |
||
| 44 | self.env_value = None |
||
| 45 | |||
| 46 | def init_env(self, server_config, server_host, deploy_mode, image_type, image_tag): |
||
| 47 | """ |
||
| 48 | Deploy start server with using helm and clean up env. |
||
| 49 | |||
| 50 | If deploy or start failed |
||
| 51 | """ |
||
| 52 | logger.debug("Tests run on server host:") |
||
| 53 | logger.debug(server_host) |
||
| 54 | self.hostname = server_host |
||
| 55 | # update values |
||
| 56 | helm_path = os.path.join(os.getcwd(), "../milvus-helm/charts/milvus") |
||
| 57 | values_file_path = helm_path+"/values.yaml" |
||
| 58 | if not os.path.exists(values_file_path): |
||
| 59 | raise Exception("File %s not existed" % values_file_path) |
||
| 60 | if server_config: |
||
| 61 | utils.update_values(values_file_path, deploy_mode, server_host, server_config) |
||
| 62 | try: |
||
| 63 | logger.debug("Start install server") |
||
| 64 | self.host = utils.helm_install_server(helm_path, deploy_mode, image_tag, image_type, self.service_name, namespace) |
||
| 65 | except Exception as e: |
||
| 66 | logger.error("Helm install server failed: %s" % (str(e))) |
||
| 67 | logger.error(traceback.format_exc()) |
||
| 68 | logger.debug(server_config) |
||
| 69 | self.clean_up() |
||
| 70 | return False |
||
| 71 | # for debugging |
||
| 72 | if not self.host: |
||
| 73 | logger.error("Helm install server failed") |
||
| 74 | self.clean_up() |
||
| 75 | return False |
||
| 76 | return True |
||
| 77 | |||
| 78 | def clean_up(self): |
||
| 79 | """ |
||
| 80 | Stop server with using helm. |
||
| 81 | |||
| 82 | """ |
||
| 83 | logger.debug("Start clean up: %s" % self.service_name) |
||
| 84 | utils.helm_del_server(self.service_name, namespace) |
||
| 85 | |||
| 86 | def report_wrapper(self, milvus_instance, env_value, hostname, collection_info, index_info, search_params, run_params=None): |
||
| 87 | """ |
||
| 88 | upload test result |
||
| 89 | """ |
||
| 90 | metric = Metric() |
||
| 91 | metric.set_run_id(timestamp) |
||
| 92 | metric.env = Env(env_value) |
||
| 93 | metric.env.OMP_NUM_THREADS = 0 |
||
| 94 | metric.hardware = Hardware(name=hostname) |
||
| 95 | server_version = milvus_instance.get_server_version() |
||
| 96 | server_mode = milvus_instance.get_server_mode() |
||
| 97 | commit = milvus_instance.get_server_commit() |
||
| 98 | metric.server = Server(version=server_version, mode=server_mode, build_commit=commit) |
||
| 99 | metric.collection = collection_info |
||
| 100 | metric.index = index_info |
||
| 101 | metric.search = search_params |
||
| 102 | metric.run_params = run_params |
||
| 103 | return metric |
||
| 104 | |||
| 105 | def run(self, run_type, collection): |
||
| 106 | """ |
||
| 107 | override runner.run |
||
| 108 | """ |
||
| 109 | logger.debug(run_type) |
||
| 110 | logger.debug(collection) |
||
| 111 | collection_name = collection["collection_name"] if "collection_name" in collection else None |
||
| 112 | milvus_instance = MilvusClient(collection_name=collection_name, host=self.host) |
||
| 113 | self.env_value = milvus_instance.get_server_config() |
||
| 114 | |||
| 115 | # ugly implemention |
||
| 116 | # remove some parts of result before uploading results |
||
| 117 | self.env_value.pop("logs") |
||
| 118 | if milvus_instance.get_server_mode() == "CPU": |
||
| 119 | if "gpu" in self.env_value: |
||
| 120 | self.env_value.pop("gpu") |
||
| 121 | elif "cache.enable" in self.env_value["gpu"]: |
||
| 122 | self.env_value["gpu"].pop("cache.enable") |
||
| 123 | |||
| 124 | self.env_value.pop("network") |
||
| 125 | |||
| 126 | if run_type == "insert_performance": |
||
| 127 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 128 | ni_per = collection["ni_per"] |
||
| 129 | build_index = collection["build_index"] |
||
| 130 | if milvus_instance.exists_collection(): |
||
| 131 | milvus_instance.drop() |
||
| 132 | time.sleep(10) |
||
| 133 | index_info = {} |
||
| 134 | search_params = {} |
||
| 135 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 136 | if build_index is True: |
||
| 137 | index_type = collection["index_type"] |
||
| 138 | index_param = collection["index_param"] |
||
| 139 | index_info = { |
||
| 140 | "index_type": index_type, |
||
| 141 | "index_param": index_param |
||
| 142 | } |
||
| 143 | milvus_instance.create_index(index_type, index_param) |
||
| 144 | logger.debug(milvus_instance.describe_index()) |
||
| 145 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 146 | logger.info(res) |
||
| 147 | if "flush" in collection and collection["flush"] == "no": |
||
| 148 | logger.debug("No manual flush") |
||
| 149 | else: |
||
| 150 | milvus_instance.flush() |
||
| 151 | logger.debug(milvus_instance.count()) |
||
| 152 | collection_info = { |
||
| 153 | "dimension": dimension, |
||
| 154 | "metric_type": metric_type, |
||
| 155 | "dataset_name": collection_name |
||
| 156 | } |
||
| 157 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 158 | metric.metrics = { |
||
| 159 | "type": run_type, |
||
| 160 | "value": { |
||
| 161 | "total_time": res["total_time"], |
||
| 162 | "qps": res["qps"], |
||
| 163 | "ni_time": res["ni_time"] |
||
| 164 | } |
||
| 165 | } |
||
| 166 | report(metric) |
||
| 167 | if build_index is True: |
||
| 168 | logger.debug("Start build index for last file") |
||
| 169 | milvus_instance.create_index(index_type, index_param) |
||
| 170 | logger.debug(milvus_instance.describe_index()) |
||
| 171 | |||
| 172 | elif run_type == "insert_debug_performance": |
||
| 173 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 174 | ni_per = collection["ni_per"] |
||
| 175 | if milvus_instance.exists_collection(): |
||
| 176 | milvus_instance.drop() |
||
| 177 | time.sleep(10) |
||
| 178 | index_info = {} |
||
| 179 | search_params = {} |
||
| 180 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 181 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(ni_per)] |
||
| 182 | start_time = time.time() |
||
| 183 | i = 0 |
||
| 184 | while time.time() < start_time + 2 * 24 * 3600: |
||
| 185 | i = i + 1 |
||
| 186 | logger.debug(i) |
||
| 187 | logger.debug("Row count: %d" % milvus_instance.count()) |
||
| 188 | milvus_instance.insert(insert_vectors) |
||
| 189 | time.sleep(0.1) |
||
| 190 | |||
| 191 | elif run_type == "insert_performance_multi_collections": |
||
| 192 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 193 | ni_per = collection["ni_per"] |
||
| 194 | build_index = collection["build_index"] |
||
| 195 | if milvus_instance.exists_collection(): |
||
| 196 | milvus_instance.drop() |
||
| 197 | time.sleep(10) |
||
| 198 | index_info = {} |
||
| 199 | search_params = {} |
||
| 200 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 201 | if build_index is True: |
||
| 202 | index_type = collection["index_type"] |
||
| 203 | index_param = collection["index_param"] |
||
| 204 | index_info = { |
||
| 205 | "index_type": index_type, |
||
| 206 | "index_param": index_param |
||
| 207 | } |
||
| 208 | milvus_instance.create_index(index_type, index_param) |
||
| 209 | logger.debug(milvus_instance.describe_index()) |
||
| 210 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 211 | logger.info(res) |
||
| 212 | milvus_instance.flush() |
||
| 213 | collection_info = { |
||
| 214 | "dimension": dimension, |
||
| 215 | "metric_type": metric_type, |
||
| 216 | "dataset_name": collection_name |
||
| 217 | } |
||
| 218 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 219 | metric.metrics = { |
||
| 220 | "type": run_type, |
||
| 221 | "value": { |
||
| 222 | "total_time": res["total_time"], |
||
| 223 | "qps": res["qps"], |
||
| 224 | "ni_time": res["ni_time"] |
||
| 225 | } |
||
| 226 | } |
||
| 227 | report(metric) |
||
| 228 | if build_index is True: |
||
| 229 | logger.debug("Start build index for last file") |
||
| 230 | milvus_instance.create_index(index_type, index_param) |
||
| 231 | logger.debug(milvus_instance.describe_index()) |
||
| 232 | |||
| 233 | elif run_type == "insert_flush_performance": |
||
| 234 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 235 | ni_per = collection["ni_per"] |
||
| 236 | if milvus_instance.exists_collection(): |
||
| 237 | milvus_instance.drop() |
||
| 238 | time.sleep(10) |
||
| 239 | index_info = {} |
||
| 240 | search_params = {} |
||
| 241 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 242 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 243 | logger.info(res) |
||
| 244 | logger.debug(milvus_instance.count()) |
||
| 245 | start_time = time.time() |
||
| 246 | milvus_instance.flush() |
||
| 247 | end_time = time.time() |
||
| 248 | logger.debug(milvus_instance.count()) |
||
| 249 | collection_info = { |
||
| 250 | "dimension": dimension, |
||
| 251 | "metric_type": metric_type, |
||
| 252 | "dataset_name": collection_name |
||
| 253 | } |
||
| 254 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 255 | metric.metrics = { |
||
| 256 | "type": run_type, |
||
| 257 | "value": { |
||
| 258 | "flush_time": round(end_time - start_time, 1) |
||
| 259 | } |
||
| 260 | } |
||
| 261 | report(metric) |
||
| 262 | |||
| 263 | elif run_type == "build_performance": |
||
| 264 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 265 | index_type = collection["index_type"] |
||
| 266 | index_param = collection["index_param"] |
||
| 267 | collection_info = { |
||
| 268 | "dimension": dimension, |
||
| 269 | "metric_type": metric_type, |
||
| 270 | "index_file_size": index_file_size, |
||
| 271 | "dataset_name": collection_name |
||
| 272 | } |
||
| 273 | index_info = { |
||
| 274 | "index_type": index_type, |
||
| 275 | "index_param": index_param |
||
| 276 | } |
||
| 277 | if not milvus_instance.exists_collection(): |
||
| 278 | logger.error("Table name: %s not existed" % collection_name) |
||
| 279 | return |
||
| 280 | search_params = {} |
||
| 281 | start_time = time.time() |
||
| 282 | # drop index |
||
| 283 | logger.debug("Drop index") |
||
| 284 | milvus_instance.drop_index() |
||
| 285 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 286 | milvus_instance.create_index(index_type, index_param) |
||
| 287 | logger.debug(milvus_instance.describe_index()) |
||
| 288 | logger.debug(milvus_instance.count()) |
||
| 289 | end_time = time.time() |
||
| 290 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 291 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 292 | metric.metrics = { |
||
| 293 | "type": "build_performance", |
||
| 294 | "value": { |
||
| 295 | "build_time": round(end_time - start_time, 1), |
||
| 296 | "start_mem_usage": start_mem_usage, |
||
| 297 | "end_mem_usage": end_mem_usage, |
||
| 298 | "diff_mem": end_mem_usage - start_mem_usage |
||
| 299 | } |
||
| 300 | } |
||
| 301 | report(metric) |
||
| 302 | |||
| 303 | elif run_type == "delete_performance": |
||
| 304 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 305 | ni_per = collection["ni_per"] |
||
| 306 | search_params = {} |
||
| 307 | collection_info = { |
||
| 308 | "dimension": dimension, |
||
| 309 | "metric_type": metric_type, |
||
| 310 | "dataset_name": collection_name |
||
| 311 | } |
||
| 312 | if not milvus_instance.exists_collection(): |
||
| 313 | logger.error("Table name: %s not existed" % collection_name) |
||
| 314 | return |
||
| 315 | length = milvus_instance.count() |
||
| 316 | logger.info(length) |
||
| 317 | index_info = milvus_instance.describe_index() |
||
| 318 | logger.info(index_info) |
||
| 319 | ids = [i for i in range(length)] |
||
| 320 | loops = int(length / ni_per) |
||
| 321 | milvus_instance.preload_collection() |
||
| 322 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 323 | start_time = time.time() |
||
| 324 | for i in range(loops): |
||
| 325 | delete_ids = ids[i*ni_per : i*ni_per+ni_per] |
||
| 326 | logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1])) |
||
| 327 | milvus_instance.delete(delete_ids) |
||
| 328 | # milvus_instance.flush() |
||
| 329 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 330 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 331 | milvus_instance.flush() |
||
| 332 | end_time = time.time() |
||
| 333 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 334 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 335 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 336 | metric.metrics = { |
||
| 337 | "type": "delete_performance", |
||
| 338 | "value": { |
||
| 339 | "delete_time": round(end_time - start_time, 1), |
||
| 340 | "start_mem_usage": start_mem_usage, |
||
| 341 | "end_mem_usage": end_mem_usage, |
||
| 342 | "diff_mem": end_mem_usage - start_mem_usage |
||
| 343 | } |
||
| 344 | } |
||
| 345 | report(metric) |
||
| 346 | |||
| 347 | elif run_type == "get_ids_performance": |
||
| 348 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 349 | ids_length_per_segment = collection["ids_length_per_segment"] |
||
| 350 | if not milvus_instance.exists_collection(): |
||
| 351 | logger.error("Table name: %s not existed" % collection_name) |
||
| 352 | return |
||
| 353 | collection_info = { |
||
| 354 | "dimension": dimension, |
||
| 355 | "metric_type": metric_type, |
||
| 356 | "index_file_size": index_file_size, |
||
| 357 | "dataset_name": collection_name |
||
| 358 | } |
||
| 359 | search_params = {} |
||
| 360 | logger.info(milvus_instance.count()) |
||
| 361 | index_info = milvus_instance.describe_index() |
||
| 362 | logger.info(index_info) |
||
| 363 | for ids_num in ids_length_per_segment: |
||
| 364 | segment_num, get_ids = milvus_instance.get_rand_ids_each_segment(ids_num) |
||
| 365 | start_time = time.time() |
||
| 366 | _ = milvus_instance.get_entities(get_ids) |
||
| 367 | total_time = time.time() - start_time |
||
| 368 | avg_time = total_time / segment_num |
||
| 369 | run_params = {"ids_num": ids_num} |
||
| 370 | logger.info("Segment num: %d, ids num per segment: %d, run_time: %f" % (segment_num, ids_num, total_time)) |
||
| 371 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params=run_params) |
||
| 372 | metric.metrics = { |
||
| 373 | "type": run_type, |
||
| 374 | "value": { |
||
| 375 | "total_time": round(total_time, 1), |
||
| 376 | "avg_time": round(avg_time, 1) |
||
| 377 | } |
||
| 378 | } |
||
| 379 | report(metric) |
||
| 380 | |||
| 381 | elif run_type == "search_performance": |
||
| 382 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 383 | run_count = collection["run_count"] |
||
| 384 | top_ks = collection["top_ks"] |
||
| 385 | nqs = collection["nqs"] |
||
| 386 | search_params = collection["search_params"] |
||
| 387 | collection_info = { |
||
| 388 | "dimension": dimension, |
||
| 389 | "metric_type": metric_type, |
||
| 390 | "index_file_size": index_file_size, |
||
| 391 | "dataset_name": collection_name |
||
| 392 | } |
||
| 393 | if not milvus_instance.exists_collection(): |
||
| 394 | logger.error("Table name: %s not existed" % collection_name) |
||
| 395 | return |
||
| 396 | |||
| 397 | logger.info(milvus_instance.count()) |
||
| 398 | index_info = milvus_instance.describe_index() |
||
| 399 | logger.info(index_info) |
||
| 400 | milvus_instance.preload_collection() |
||
| 401 | logger.info("Start warm up query") |
||
| 402 | res = self.do_query(milvus_instance, collection_name, [1], [1], 2, search_param=search_params[0]) |
||
| 403 | logger.info("End warm up query") |
||
| 404 | for search_param in search_params: |
||
| 405 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
| 406 | res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param) |
||
| 407 | headers = ["Nq/Top-k"] |
||
| 408 | headers.extend([str(top_k) for top_k in top_ks]) |
||
| 409 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
| 410 | utils.print_table(headers, nqs, res) |
||
| 411 | for index_nq, nq in enumerate(nqs): |
||
| 412 | for index_top_k, top_k in enumerate(top_ks): |
||
| 413 | search_param_group = { |
||
| 414 | "nq": nq, |
||
| 415 | "topk": top_k, |
||
| 416 | "search_param": search_param |
||
| 417 | } |
||
| 418 | search_time = res[index_nq][index_top_k] |
||
| 419 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
| 420 | metric.metrics = { |
||
| 421 | "type": "search_performance", |
||
| 422 | "value": { |
||
| 423 | "search_time": search_time |
||
| 424 | } |
||
| 425 | } |
||
| 426 | report(metric) |
||
| 427 | |||
| 428 | elif run_type == "locust_search_performance": |
||
| 429 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser( |
||
| 430 | collection_name) |
||
| 431 | ### clear db |
||
| 432 | ### spawn locust requests |
||
| 433 | collection_num = collection["collection_num"] |
||
| 434 | task = collection["task"] |
||
| 435 | # . generate task code |
||
| 436 | task_file = utils.get_unique_name() |
||
| 437 | task_file_script = task_file + '.py' |
||
| 438 | task_file_csv = task_file + '_stats.csv' |
||
| 439 | task_type = task["type"] |
||
| 440 | connection_type = "single" |
||
| 441 | connection_num = task["connection_num"] |
||
| 442 | if connection_num > 1: |
||
| 443 | connection_type = "multi" |
||
| 444 | clients_num = task["clients_num"] |
||
| 445 | hatch_rate = task["hatch_rate"] |
||
| 446 | during_time = task["during_time"] |
||
| 447 | def_name = task_type |
||
| 448 | task_params = task["params"] |
||
| 449 | collection_names = [] |
||
| 450 | for i in range(collection_num): |
||
| 451 | suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) |
||
| 452 | collection_names.append(collection_name + "_" + suffix) |
||
| 453 | # ##### |
||
| 454 | ni_per = collection["ni_per"] |
||
| 455 | build_index = collection["build_index"] |
||
| 456 | # TODO: debug |
||
| 457 | for c_name in collection_names: |
||
| 458 | milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port) |
||
| 459 | if milvus_instance.exists_collection(collection_name=c_name): |
||
| 460 | milvus_instance.drop(name=c_name) |
||
| 461 | time.sleep(10) |
||
| 462 | milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type) |
||
| 463 | index_info = { |
||
| 464 | "build_index": build_index |
||
| 465 | } |
||
| 466 | if build_index is True: |
||
| 467 | index_type = collection["index_type"] |
||
| 468 | index_param = collection["index_param"] |
||
| 469 | index_info.update({ |
||
| 470 | "index_type": index_type, |
||
| 471 | "index_param": index_param |
||
| 472 | }) |
||
| 473 | milvus_instance.create_index(index_type, index_param) |
||
| 474 | logger.debug(milvus_instance.describe_index()) |
||
| 475 | res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per) |
||
| 476 | logger.info(res) |
||
| 477 | if "flush" in collection and collection["flush"] == "no": |
||
| 478 | logger.debug("No manual flush") |
||
| 479 | else: |
||
| 480 | milvus_instance.flush() |
||
| 481 | logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name)) |
||
| 482 | if build_index is True: |
||
| 483 | logger.debug("Start build index for last file") |
||
| 484 | milvus_instance.create_index(index_type, index_param) |
||
| 485 | logger.debug(milvus_instance.describe_index()) |
||
| 486 | code_str = """ |
||
| 487 | import random |
||
| 488 | import string |
||
| 489 | from locust import User, task, between |
||
| 490 | from locust_task import MilvusTask |
||
| 491 | from client import MilvusClient |
||
| 492 | |||
| 493 | host = '%s' |
||
| 494 | port = %s |
||
| 495 | dim = %s |
||
| 496 | connection_type = '%s' |
||
| 497 | collection_names = %s |
||
| 498 | m = MilvusClient(host=host, port=port) |
||
| 499 | |||
| 500 | |||
| 501 | def get_collection_name(): |
||
| 502 | return random.choice(collection_names) |
||
| 503 | |||
| 504 | |||
| 505 | def get_client(collection_name): |
||
| 506 | if connection_type == 'single': |
||
| 507 | return MilvusTask(m=m) |
||
| 508 | elif connection_type == 'multi': |
||
| 509 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
| 510 | |||
| 511 | |||
| 512 | class QueryTask(User): |
||
| 513 | wait_time = between(0.001, 0.002) |
||
| 514 | |||
| 515 | @task() |
||
| 516 | def %s(self): |
||
| 517 | top_k = %s |
||
| 518 | X = [[random.random() for i in range(dim)] for i in range(%s)] |
||
| 519 | search_param = %s |
||
| 520 | collection_name = get_collection_name() |
||
| 521 | client = get_client(collection_name) |
||
| 522 | client.query(X, top_k, search_param, collection_name=collection_name) |
||
| 523 | """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"]) |
||
| 524 | with open(task_file_script, 'w+') as fd: |
||
| 525 | fd.write(code_str) |
||
| 526 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
| 527 | task_file_script, |
||
| 528 | task_file, |
||
| 529 | clients_num, |
||
| 530 | hatch_rate, |
||
| 531 | during_time) |
||
| 532 | logger.info(locust_cmd) |
||
| 533 | try: |
||
| 534 | res = os.system(locust_cmd) |
||
| 535 | except Exception as e: |
||
| 536 | logger.error(str(e)) |
||
| 537 | return |
||
| 538 | |||
| 539 | # . retrieve and collect test statistics |
||
| 540 | locust_stats = None |
||
| 541 | with open(task_file_csv, newline='') as fd: |
||
| 542 | dr = csv.DictReader(fd) |
||
| 543 | for row in dr: |
||
| 544 | if row["Name"] != "Aggregated": |
||
| 545 | continue |
||
| 546 | locust_stats = row |
||
| 547 | logger.info(locust_stats) |
||
| 548 | # clean up temp files |
||
| 549 | search_params = { |
||
| 550 | "top_k": task_params["top_k"], |
||
| 551 | "nq": task_params["nq"], |
||
| 552 | "nprobe": task_params["search_param"]["nprobe"] |
||
| 553 | } |
||
| 554 | run_params = { |
||
| 555 | "connection_num": connection_num, |
||
| 556 | "clients_num": clients_num, |
||
| 557 | "hatch_rate": hatch_rate, |
||
| 558 | "during_time": during_time |
||
| 559 | } |
||
| 560 | collection_info = { |
||
| 561 | "dimension": dimension, |
||
| 562 | "metric_type": metric_type, |
||
| 563 | "index_file_size": index_file_size, |
||
| 564 | "dataset_name": collection_name |
||
| 565 | } |
||
| 566 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params) |
||
| 567 | metric.metrics = { |
||
| 568 | "type": run_type, |
||
| 569 | "value": { |
||
| 570 | "during_time": during_time, |
||
| 571 | "request_count": int(locust_stats["Request Count"]), |
||
| 572 | "failure_count": int(locust_stats["Failure Count"]), |
||
| 573 | "qps": locust_stats["Requests/s"], |
||
| 574 | "min_response_time": int(locust_stats["Min Response Time"]), |
||
| 575 | "max_response_time": int(locust_stats["Max Response Time"]), |
||
| 576 | "median_response_time": int(locust_stats["Median Response Time"]), |
||
| 577 | "avg_response_time": int(locust_stats["Average Response Time"]) |
||
| 578 | } |
||
| 579 | } |
||
| 580 | report(metric) |
||
| 581 | |||
| 582 | elif run_type == "search_ids_stability": |
||
| 583 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 584 | search_params = collection["search_params"] |
||
| 585 | during_time = collection["during_time"] |
||
| 586 | ids_length = collection["ids_length"] |
||
| 587 | ids = collection["ids"] |
||
| 588 | collection_info = { |
||
| 589 | "dimension": dimension, |
||
| 590 | "metric_type": metric_type, |
||
| 591 | "index_file_size": index_file_size, |
||
| 592 | "dataset_name": collection_name |
||
| 593 | } |
||
| 594 | if not milvus_instance.exists_collection(): |
||
| 595 | logger.error("Table name: %s not existed" % collection_name) |
||
| 596 | return |
||
| 597 | logger.info(milvus_instance.count()) |
||
| 598 | index_info = milvus_instance.describe_index() |
||
| 599 | logger.info(index_info) |
||
| 600 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
| 601 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
| 602 | # g_id = int(ids.split("-")[1]) |
||
| 603 | # l_id = int(ids.split("-")[0]) |
||
| 604 | g_id_length = int(ids_length.split("-")[1]) |
||
| 605 | l_id_length = int(ids_length.split("-")[0]) |
||
| 606 | |||
| 607 | milvus_instance.preload_collection() |
||
| 608 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 609 | logger.debug(start_mem_usage) |
||
| 610 | start_time = time.time() |
||
| 611 | while time.time() < start_time + during_time * 60: |
||
| 612 | search_param = {} |
||
| 613 | top_k = random.randint(l_top_k, g_top_k) |
||
| 614 | ids_num = random.randint(l_id_length, g_id_length) |
||
| 615 | ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)] |
||
| 616 | for k, v in search_params.items(): |
||
| 617 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
| 618 | logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param))) |
||
| 619 | result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param) |
||
| 620 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 621 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
| 622 | metric.metrics = { |
||
| 623 | "type": "search_ids_stability", |
||
| 624 | "value": { |
||
| 625 | "during_time": during_time, |
||
| 626 | "start_mem_usage": start_mem_usage, |
||
| 627 | "end_mem_usage": end_mem_usage, |
||
| 628 | "diff_mem": end_mem_usage - start_mem_usage |
||
| 629 | } |
||
| 630 | } |
||
| 631 | report(metric) |
||
| 632 | |||
| 633 | # for sift/deep datasets |
||
| 634 | # TODO: enable |
||
| 635 | elif run_type == "accuracy": |
||
| 636 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 637 | search_params = collection["search_params"] |
||
| 638 | # mapping to search param list |
||
| 639 | search_params = self.generate_combinations(search_params) |
||
| 640 | |||
| 641 | top_ks = collection["top_ks"] |
||
| 642 | nqs = collection["nqs"] |
||
| 643 | collection_info = { |
||
| 644 | "dimension": dimension, |
||
| 645 | "metric_type": metric_type, |
||
| 646 | "index_file_size": index_file_size, |
||
| 647 | "dataset_name": collection_name |
||
| 648 | } |
||
| 649 | if not milvus_instance.exists_collection(): |
||
| 650 | logger.error("Table name: %s not existed" % collection_name) |
||
| 651 | return |
||
| 652 | logger.info(milvus_instance.count()) |
||
| 653 | index_info = milvus_instance.describe_index() |
||
| 654 | logger.info(index_info) |
||
| 655 | milvus_instance.preload_collection() |
||
| 656 | true_ids_all = self.get_groundtruth_ids(collection_size) |
||
| 657 | for search_param in search_params: |
||
| 658 | for top_k in top_ks: |
||
| 659 | for nq in nqs: |
||
| 660 | # total = 0 |
||
| 661 | search_param_group = { |
||
| 662 | "nq": nq, |
||
| 663 | "topk": top_k, |
||
| 664 | "search_param": search_param |
||
| 665 | } |
||
| 666 | logger.info("Query params: %s" % json.dumps(search_param_group)) |
||
| 667 | result_ids, _ = self.do_query_ids(milvus_instance, collection_name, top_k, nq, search_param=search_param) |
||
| 668 | acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids) |
||
| 669 | logger.info("Query accuracy: %s" % acc_value) |
||
| 670 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
| 671 | metric.metrics = { |
||
| 672 | "type": "accuracy", |
||
| 673 | "value": { |
||
| 674 | "acc": acc_value |
||
| 675 | } |
||
| 676 | } |
||
| 677 | report(metric) |
||
| 678 | |||
| 679 | elif run_type == "ann_accuracy": |
||
| 680 | hdf5_source_file = collection["source_file"] |
||
| 681 | collection_name = collection["collection_name"] |
||
| 682 | index_file_sizes = collection["index_file_sizes"] |
||
| 683 | index_types = collection["index_types"] |
||
| 684 | index_params = collection["index_params"] |
||
| 685 | top_ks = collection["top_ks"] |
||
| 686 | nqs = collection["nqs"] |
||
| 687 | search_params = collection["search_params"] |
||
| 688 | # mapping to search param list |
||
| 689 | search_params = self.generate_combinations(search_params) |
||
| 690 | # mapping to index param list |
||
| 691 | index_params = self.generate_combinations(index_params) |
||
| 692 | |||
| 693 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
| 694 | dataset = utils.get_dataset(hdf5_source_file) |
||
| 695 | true_ids = np.array(dataset["neighbors"]) |
||
| 696 | for index_file_size in index_file_sizes: |
||
| 697 | collection_info = { |
||
| 698 | "dimension": dimension, |
||
| 699 | "metric_type": metric_type, |
||
| 700 | "index_file_size": index_file_size, |
||
| 701 | "dataset_name": collection_name |
||
| 702 | } |
||
| 703 | if milvus_instance.exists_collection(collection_name): |
||
| 704 | logger.info("Re-create collection: %s" % collection_name) |
||
| 705 | milvus_instance.drop() |
||
| 706 | time.sleep(DELETE_INTERVAL_TIME) |
||
| 707 | |||
| 708 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 709 | logger.info(milvus_instance.describe()) |
||
| 710 | insert_vectors = self.normalize(metric_type, np.array(dataset["train"])) |
||
| 711 | # Insert batch once |
||
| 712 | # milvus_instance.insert(insert_vectors) |
||
| 713 | loops = len(insert_vectors) // INSERT_INTERVAL + 1 |
||
| 714 | for i in range(loops): |
||
| 715 | start = i*INSERT_INTERVAL |
||
| 716 | end = min((i+1)*INSERT_INTERVAL, len(insert_vectors)) |
||
| 717 | tmp_vectors = insert_vectors[start:end] |
||
| 718 | if start < end: |
||
| 719 | if not isinstance(tmp_vectors, list): |
||
| 720 | milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)]) |
||
| 721 | else: |
||
| 722 | milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)]) |
||
| 723 | milvus_instance.flush() |
||
| 724 | logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count())) |
||
| 725 | if milvus_instance.count() != len(insert_vectors): |
||
| 726 | logger.error("Table row count is not equal to insert vectors") |
||
| 727 | return |
||
| 728 | for index_type in index_types: |
||
| 729 | for index_param in index_params: |
||
| 730 | logger.debug("Building index with param: %s" % json.dumps(index_param)) |
||
| 731 | milvus_instance.create_index(index_type, index_param=index_param) |
||
| 732 | logger.info(milvus_instance.describe_index()) |
||
| 733 | logger.info("Start preload collection: %s" % collection_name) |
||
| 734 | milvus_instance.preload_collection() |
||
| 735 | index_info = { |
||
| 736 | "index_type": index_type, |
||
| 737 | "index_param": index_param |
||
| 738 | } |
||
| 739 | logger.debug(index_info) |
||
| 740 | for search_param in search_params: |
||
| 741 | for nq in nqs: |
||
| 742 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
| 743 | for top_k in top_ks: |
||
| 744 | search_param_group = { |
||
| 745 | "nq": len(query_vectors), |
||
| 746 | "topk": top_k, |
||
| 747 | "search_param": search_param |
||
| 748 | } |
||
| 749 | logger.debug(search_param_group) |
||
| 750 | if not isinstance(query_vectors, list): |
||
| 751 | result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param) |
||
| 752 | else: |
||
| 753 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
| 754 | if len(result): |
||
| 755 | logger.debug(len(result)) |
||
| 756 | logger.debug(len(result[0])) |
||
| 757 | result_ids = result.id_array |
||
| 758 | acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids) |
||
| 759 | logger.info("Query ann_accuracy: %s" % acc_value) |
||
| 760 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group) |
||
| 761 | metric.metrics = { |
||
| 762 | "type": "ann_accuracy", |
||
| 763 | "value": { |
||
| 764 | "acc": acc_value |
||
| 765 | } |
||
| 766 | } |
||
| 767 | report(metric) |
||
| 768 | |||
| 769 | elif run_type == "search_stability": |
||
| 770 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 771 | search_params = collection["search_params"] |
||
| 772 | during_time = collection["during_time"] |
||
| 773 | collection_info = { |
||
| 774 | "dimension": dimension, |
||
| 775 | "metric_type": metric_type, |
||
| 776 | "dataset_name": collection_name |
||
| 777 | } |
||
| 778 | if not milvus_instance.exists_collection(): |
||
| 779 | logger.error("Table name: %s not existed" % collection_name) |
||
| 780 | return |
||
| 781 | logger.info(milvus_instance.count()) |
||
| 782 | index_info = milvus_instance.describe_index() |
||
| 783 | logger.info(index_info) |
||
| 784 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
| 785 | g_nq = int(collection["nqs"].split("-")[1]) |
||
| 786 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
| 787 | l_nq = int(collection["nqs"].split("-")[0]) |
||
| 788 | milvus_instance.preload_collection() |
||
| 789 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 790 | logger.debug(start_mem_usage) |
||
| 791 | start_row_count = milvus_instance.count() |
||
| 792 | logger.debug(milvus_instance.describe_index()) |
||
| 793 | logger.info(start_row_count) |
||
| 794 | start_time = time.time() |
||
| 795 | while time.time() < start_time + during_time * 60: |
||
| 796 | search_param = {} |
||
| 797 | top_k = random.randint(l_top_k, g_top_k) |
||
| 798 | nq = random.randint(l_nq, g_nq) |
||
| 799 | for k, v in search_params.items(): |
||
| 800 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
| 801 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(nq)] |
||
| 802 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
| 803 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
| 804 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 805 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
| 806 | metric.metrics = { |
||
| 807 | "type": "search_stability", |
||
| 808 | "value": { |
||
| 809 | "during_time": during_time, |
||
| 810 | "start_mem_usage": start_mem_usage, |
||
| 811 | "end_mem_usage": end_mem_usage, |
||
| 812 | "diff_mem": end_mem_usage - start_mem_usage |
||
| 813 | } |
||
| 814 | } |
||
| 815 | report(metric) |
||
| 816 | |||
| 817 | elif run_type == "loop_stability": |
||
| 818 | # init data |
||
| 819 | milvus_instance.clean_db() |
||
| 820 | pull_interval = collection["pull_interval"] |
||
| 821 | collection_num = collection["collection_num"] |
||
| 822 | concurrent = collection["concurrent"] if "concurrent" in collection else False |
||
| 823 | concurrent_num = collection_num |
||
| 824 | dimension = collection["dimension"] if "dimension" in collection else 128 |
||
| 825 | insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000 |
||
| 826 | index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8'] |
||
| 827 | index_param = {"nlist": 2048} |
||
| 828 | collection_names = [] |
||
| 829 | milvus_instances_map = {} |
||
| 830 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
| 831 | for i in range(collection_num): |
||
| 832 | name = utils.get_unique_name(prefix="collection_") |
||
| 833 | collection_names.append(name) |
||
| 834 | metric_type = random.choice(["l2", "ip"]) |
||
| 835 | index_file_size = random.randint(10, 20) |
||
| 836 | milvus_instance.create_collection(name, dimension, index_file_size, metric_type) |
||
| 837 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
| 838 | index_type = random.choice(index_types) |
||
| 839 | milvus_instance.create_index(index_type, index_param=index_param) |
||
| 840 | logger.info(milvus_instance.describe_index()) |
||
| 841 | insert_vectors = utils.normalize(metric_type, insert_vectors) |
||
| 842 | milvus_instance.insert(insert_vectors) |
||
| 843 | milvus_instance.flush() |
||
| 844 | milvus_instances_map.update({name: milvus_instance}) |
||
| 845 | logger.info(milvus_instance.describe_index()) |
||
| 846 | logger.info(milvus_instance.describe()) |
||
| 847 | |||
| 848 | # loop time unit: min -> s |
||
| 849 | pull_interval_seconds = pull_interval * 60 |
||
| 850 | tasks = ["insert_rand", "delete_rand", "query_rand", "flush", "compact"] |
||
| 851 | i = 1 |
||
| 852 | while True: |
||
| 853 | logger.info("Loop time: %d" % i) |
||
| 854 | start_time = time.time() |
||
| 855 | while time.time() - start_time < pull_interval_seconds: |
||
| 856 | if concurrent: |
||
| 857 | mp = [] |
||
| 858 | for _ in range(concurrent_num): |
||
| 859 | tmp_collection_name = random.choice(collection_names) |
||
| 860 | task_name = random.choice(tasks) |
||
| 861 | mp.append((tmp_collection_name, task_name)) |
||
| 862 | |||
| 863 | with futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
| 864 | future_results = {executor.submit(getattr(milvus_instances_map[mp[j][0]], mp[j][1])): j for j in range(concurrent_num)} |
||
| 865 | for future in futures.as_completed(future_results): |
||
| 866 | future.result() |
||
| 867 | |||
| 868 | else: |
||
| 869 | tmp_collection_name = random.choice(collection_names) |
||
| 870 | task_name = random.choice(tasks) |
||
| 871 | logger.info(tmp_collection_name) |
||
| 872 | logger.info(task_name) |
||
| 873 | task_run = getattr(milvus_instances_map[tmp_collection_name], task_name) |
||
| 874 | task_run() |
||
| 875 | |||
| 876 | logger.debug("Restart server") |
||
| 877 | utils.restart_server(self.service_name, namespace) |
||
| 878 | # new connection |
||
| 879 | for name in collection_names: |
||
| 880 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
| 881 | milvus_instances_map.update({name: milvus_instance}) |
||
| 882 | i = i + 1 |
||
| 883 | |||
| 884 | elif run_type == "stability": |
||
| 885 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 886 | search_params = collection["search_params"] |
||
| 887 | insert_xb = collection["insert_xb"] |
||
| 888 | insert_interval = collection["insert_interval"] |
||
| 889 | delete_xb = collection["delete_xb"] |
||
| 890 | during_time = collection["during_time"] |
||
| 891 | collection_info = { |
||
| 892 | "dimension": dimension, |
||
| 893 | "metric_type": metric_type, |
||
| 894 | "dataset_name": collection_name |
||
| 895 | } |
||
| 896 | if not milvus_instance.exists_collection(): |
||
| 897 | logger.error("Table name: %s not existed" % collection_name) |
||
| 898 | return |
||
| 899 | logger.info(milvus_instance.count()) |
||
| 900 | index_info = milvus_instance.describe_index() |
||
| 901 | logger.info(index_info) |
||
| 902 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
| 903 | g_nq = int(collection["nqs"].split("-")[1]) |
||
| 904 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
| 905 | l_nq = int(collection["nqs"].split("-")[0]) |
||
| 906 | milvus_instance.preload_collection() |
||
| 907 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 908 | start_row_count = milvus_instance.count() |
||
| 909 | logger.debug(milvus_instance.describe_index()) |
||
| 910 | logger.info(start_row_count) |
||
| 911 | start_time = time.time() |
||
| 912 | i = 0 |
||
| 913 | ids = [] |
||
| 914 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
| 915 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
||
| 916 | View Code Duplication | while time.time() < start_time + during_time * 60: |
|
| 917 | i = i + 1 |
||
| 918 | for j in range(insert_interval): |
||
| 919 | top_k = random.randint(l_top_k, g_top_k) |
||
| 920 | nq = random.randint(l_nq, g_nq) |
||
| 921 | search_param = {} |
||
| 922 | for k, v in search_params.items(): |
||
| 923 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
| 924 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
| 925 | result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param) |
||
| 926 | count = milvus_instance.count() |
||
| 927 | insert_ids = [(count+x) for x in range(len(insert_vectors))] |
||
| 928 | ids.extend(insert_ids) |
||
| 929 | status, res = milvus_instance.insert(insert_vectors, ids=insert_ids) |
||
| 930 | logger.debug("%d, row_count: %d" % (i, milvus_instance.count())) |
||
| 931 | milvus_instance.delete(ids[-delete_xb:]) |
||
| 932 | milvus_instance.flush() |
||
| 933 | milvus_instance.compact() |
||
| 934 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 935 | end_row_count = milvus_instance.count() |
||
| 936 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {}) |
||
| 937 | metric.metrics = { |
||
| 938 | "type": "stability", |
||
| 939 | "value": { |
||
| 940 | "during_time": during_time, |
||
| 941 | "start_mem_usage": start_mem_usage, |
||
| 942 | "end_mem_usage": end_mem_usage, |
||
| 943 | "diff_mem": end_mem_usage - start_mem_usage, |
||
| 944 | "row_count_increments": end_row_count - start_row_count |
||
| 945 | } |
||
| 946 | } |
||
| 947 | report(metric) |
||
| 948 | |||
| 949 | elif run_type == "locust_mix_performance": |
||
| 950 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser( |
||
| 951 | collection_name) |
||
| 952 | ni_per = collection["ni_per"] |
||
| 953 | build_index = collection["build_index"] |
||
| 954 | # # TODO: debug |
||
| 955 | if milvus_instance.exists_collection(): |
||
| 956 | milvus_instance.drop() |
||
| 957 | time.sleep(10) |
||
| 958 | index_info = {} |
||
| 959 | search_params = {} |
||
| 960 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 961 | if build_index is True: |
||
| 962 | index_type = collection["index_type"] |
||
| 963 | index_param = collection["index_param"] |
||
| 964 | index_info = { |
||
| 965 | "index_tyoe": index_type, |
||
| 966 | "index_param": index_param |
||
| 967 | } |
||
| 968 | milvus_instance.create_index(index_type, index_param) |
||
| 969 | logger.debug(milvus_instance.describe_index()) |
||
| 970 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 971 | logger.info(res) |
||
| 972 | if "flush" in collection and collection["flush"] == "no": |
||
| 973 | logger.debug("No manual flush") |
||
| 974 | else: |
||
| 975 | milvus_instance.flush() |
||
| 976 | if build_index is True: |
||
| 977 | logger.debug("Start build index for last file") |
||
| 978 | milvus_instance.create_index(index_type, index_param) |
||
| 979 | logger.debug(milvus_instance.describe_index()) |
||
| 980 | ### spawn locust requests |
||
| 981 | task = collection["tasks"] |
||
| 982 | # generate task code |
||
| 983 | task_file = utils.get_unique_name() |
||
| 984 | task_file_script = task_file + '.py' |
||
| 985 | task_file_csv = task_file + '_stats.csv' |
||
| 986 | task_types = task["types"] |
||
| 987 | connection_type = "single" |
||
| 988 | connection_num = task["connection_num"] |
||
| 989 | if connection_num > 1: |
||
| 990 | connection_type = "multi" |
||
| 991 | clients_num = task["clients_num"] |
||
| 992 | hatch_rate = task["hatch_rate"] |
||
| 993 | during_time = task["during_time"] |
||
| 994 | def_strs = "" |
||
| 995 | for task_type in task_types: |
||
| 996 | _type = task_type["type"] |
||
| 997 | weight = task_type["weight"] |
||
| 998 | if _type == "flush": |
||
| 999 | def_str = """ |
||
| 1000 | @task(%d) |
||
| 1001 | def flush(self): |
||
| 1002 | client = get_client(collection_name) |
||
| 1003 | client.flush(collection_name=collection_name) |
||
| 1004 | """ % weight |
||
| 1005 | if _type == "compact": |
||
| 1006 | def_str = """ |
||
| 1007 | @task(%d) |
||
| 1008 | def compact(self): |
||
| 1009 | client = get_client(collection_name) |
||
| 1010 | client.compact(collection_name) |
||
| 1011 | """ % weight |
||
| 1012 | if _type == "query": |
||
| 1013 | def_str = """ |
||
| 1014 | @task(%d) |
||
| 1015 | def query(self): |
||
| 1016 | client = get_client(collection_name) |
||
| 1017 | params = %s |
||
| 1018 | X = [[random.random() for i in range(dim)] for i in range(params["nq"])] |
||
| 1019 | client.query(X, params["top_k"], params["search_param"], collection_name=collection_name) |
||
| 1020 | """ % (weight, task_type["params"]) |
||
| 1021 | if _type == "insert": |
||
| 1022 | def_str = """ |
||
| 1023 | @task(%d) |
||
| 1024 | def insert(self): |
||
| 1025 | client = get_client(collection_name) |
||
| 1026 | params = %s |
||
| 1027 | ids = [random.randint(10, 1000000) for i in range(params["nb"])] |
||
| 1028 | X = [[random.random() for i in range(dim)] for i in range(params["nb"])] |
||
| 1029 | client.insert(X,ids=ids, collection_name=collection_name) |
||
| 1030 | """ % (weight, task_type["params"]) |
||
| 1031 | if _type == "delete": |
||
| 1032 | def_str = """ |
||
| 1033 | @task(%d) |
||
| 1034 | def delete(self): |
||
| 1035 | client = get_client(collection_name) |
||
| 1036 | ids = [random.randint(1, 1000000) for i in range(1)] |
||
| 1037 | client.delete(ids, collection_name) |
||
| 1038 | """ % weight |
||
| 1039 | def_strs += def_str |
||
| 1040 | code_str = """ |
||
| 1041 | import random |
||
| 1042 | import json |
||
| 1043 | from locust import User, task, between |
||
| 1044 | from locust_task import MilvusTask |
||
| 1045 | from client import MilvusClient |
||
| 1046 | |||
| 1047 | host = '%s' |
||
| 1048 | port = %s |
||
| 1049 | collection_name = '%s' |
||
| 1050 | dim = %s |
||
| 1051 | connection_type = '%s' |
||
| 1052 | m = MilvusClient(host=host, port=port) |
||
| 1053 | |||
| 1054 | def get_client(collection_name): |
||
| 1055 | if connection_type == 'single': |
||
| 1056 | return MilvusTask(m=m) |
||
| 1057 | elif connection_type == 'multi': |
||
| 1058 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
| 1059 | |||
| 1060 | |||
| 1061 | class MixTask(User): |
||
| 1062 | wait_time = between(0.001, 0.002) |
||
| 1063 | %s |
||
| 1064 | """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs) |
||
| 1065 | print(def_strs) |
||
| 1066 | with open(task_file_script, "w+") as fd: |
||
| 1067 | fd.write(code_str) |
||
| 1068 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
| 1069 | task_file_script, |
||
| 1070 | task_file, |
||
| 1071 | clients_num, |
||
| 1072 | hatch_rate, |
||
| 1073 | during_time) |
||
| 1074 | logger.info(locust_cmd) |
||
| 1075 | try: |
||
| 1076 | res = os.system(locust_cmd) |
||
| 1077 | except Exception as e: |
||
| 1078 | logger.error(str(e)) |
||
| 1079 | return |
||
| 1080 | # . retrieve and collect test statistics |
||
| 1081 | locust_stats = None |
||
| 1082 | with open(task_file_csv, newline='') as fd: |
||
| 1083 | dr = csv.DictReader(fd) |
||
| 1084 | for row in dr: |
||
| 1085 | if row["Name"] != "Aggregated": |
||
| 1086 | continue |
||
| 1087 | locust_stats = row |
||
| 1088 | logger.info(locust_stats) |
||
| 1089 | collection_info = { |
||
| 1090 | "dimension": dimension, |
||
| 1091 | "metric_type": metric_type, |
||
| 1092 | "dataset_name": collection_name |
||
| 1093 | } |
||
| 1094 | metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params) |
||
| 1095 | metric.metrics = { |
||
| 1096 | "type": run_type, |
||
| 1097 | "value": { |
||
| 1098 | "during_time": during_time, |
||
| 1099 | "request_count": int(locust_stats["Request Count"]), |
||
| 1100 | "failure_count": int(locust_stats["Failure Count"]), |
||
| 1101 | "qps": locust_stats["Requests/s"], |
||
| 1102 | "min_response_time": int(locust_stats["Min Response Time"]), |
||
| 1103 | "max_response_time": int(locust_stats["Max Response Time"]), |
||
| 1104 | "median_response_time": int(locust_stats["Median Response Time"]), |
||
| 1105 | "avg_response_time": int(locust_stats["Average Response Time"]) |
||
| 1106 | } |
||
| 1107 | } |
||
| 1108 | report(metric) |
||
| 1109 | |||
| 1110 | else: |
||
| 1111 | logger.warning("Run type: %s not defined" % run_type) |
||
| 1112 | return |
||
| 1113 | logger.debug("Test finished") |
||
| 1114 |