| Total Complexity | 78 |
| Total Lines | 629 |
| Duplicated Lines | 2.86 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like milvus_benchmark.local_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | import os |
||
| 2 | import logging |
||
| 3 | import pdb |
||
| 4 | import time |
||
| 5 | import random |
||
| 6 | import string |
||
| 7 | import json |
||
| 8 | import csv |
||
| 9 | from multiprocessing import Process |
||
| 10 | import numpy as np |
||
| 11 | import concurrent.futures |
||
| 12 | from client import MilvusClient |
||
| 13 | import utils |
||
| 14 | import parser |
||
| 15 | from runner import Runner |
||
| 16 | |||
| 17 | DELETE_INTERVAL_TIME = 5 |
||
| 18 | INSERT_INTERVAL = 50000 |
||
| 19 | logger = logging.getLogger("milvus_benchmark.local_runner") |
||
| 20 | |||
| 21 | |||
| 22 | class LocalRunner(Runner): |
||
| 23 | def __init__(self, host, port): |
||
| 24 | """ |
||
| 25 | Run tests at local mode. |
||
| 26 | |||
| 27 | Make sure the server has started |
||
| 28 | """ |
||
| 29 | super(LocalRunner, self).__init__() |
||
| 30 | self.host = host |
||
| 31 | self.port = port |
||
| 32 | |||
| 33 | def run(self, run_type, collection): |
||
| 34 | logger.debug(run_type) |
||
| 35 | logger.debug(collection) |
||
| 36 | collection_name = collection["collection_name"] if "collection_name" in collection else None |
||
| 37 | milvus_instance = MilvusClient(collection_name=collection_name, host=self.host, port=self.port) |
||
| 38 | logger.info(milvus_instance.show_collections()) |
||
| 39 | env_value = milvus_instance.get_server_config() |
||
| 40 | logger.debug(env_value) |
||
| 41 | |||
| 42 | if run_type in ["insert_performance", "insert_flush_performance"]: |
||
| 43 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 44 | ni_per = collection["ni_per"] |
||
| 45 | build_index = collection["build_index"] |
||
| 46 | if milvus_instance.exists_collection(): |
||
| 47 | milvus_instance.drop() |
||
| 48 | time.sleep(10) |
||
| 49 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 50 | if build_index is True: |
||
| 51 | index_type = collection["index_type"] |
||
| 52 | index_param = collection["index_param"] |
||
| 53 | milvus_instance.create_index(index_type, index_param) |
||
| 54 | logger.debug(milvus_instance.describe_index()) |
||
| 55 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 56 | milvus_instance.flush() |
||
| 57 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 58 | if build_index is True: |
||
| 59 | logger.debug("Start build index for last file") |
||
| 60 | milvus_instance.create_index(index_type, index_param) |
||
| 61 | logger.debug(milvus_instance.describe_index()) |
||
| 62 | |||
| 63 | elif run_type == "delete_performance": |
||
| 64 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 65 | ni_per = collection["ni_per"] |
||
| 66 | if not milvus_instance.exists_collection(): |
||
| 67 | logger.error(milvus_instance.show_collections()) |
||
| 68 | logger.warning("Table: %s not found" % collection_name) |
||
| 69 | return |
||
| 70 | length = milvus_instance.count() |
||
| 71 | ids = [i for i in range(length)] |
||
| 72 | loops = int(length / ni_per) |
||
| 73 | for i in range(loops): |
||
| 74 | delete_ids = ids[i*ni_per : i*ni_per+ni_per] |
||
| 75 | logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1])) |
||
| 76 | milvus_instance.delete(delete_ids) |
||
| 77 | milvus_instance.flush() |
||
| 78 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 79 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 80 | milvus_instance.flush() |
||
| 81 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 82 | |||
| 83 | elif run_type == "build_performance": |
||
| 84 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 85 | index_type = collection["index_type"] |
||
| 86 | index_param = collection["index_param"] |
||
| 87 | if not milvus_instance.exists_collection(): |
||
| 88 | logger.error("Table name: %s not existed" % collection_name) |
||
| 89 | return |
||
| 90 | search_params = {} |
||
| 91 | start_time = time.time() |
||
| 92 | # drop index |
||
| 93 | logger.debug("Drop index") |
||
| 94 | milvus_instance.drop_index() |
||
| 95 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 96 | milvus_instance.create_index(index_type, index_param) |
||
| 97 | logger.debug(milvus_instance.describe_index()) |
||
| 98 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 99 | end_time = time.time() |
||
| 100 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 101 | logger.debug("Diff memory: %s, current memory usage: %s, build time: %s" % ((end_mem_usage - start_mem_usage), end_mem_usage, round(end_time - start_time, 1))) |
||
| 102 | |||
| 103 | elif run_type == "search_performance": |
||
| 104 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 105 | run_count = collection["run_count"] |
||
| 106 | top_ks = collection["top_ks"] |
||
| 107 | nqs = collection["nqs"] |
||
| 108 | search_params = collection["search_params"] |
||
| 109 | # for debugging |
||
| 110 | # time.sleep(3600) |
||
| 111 | if not milvus_instance.exists_collection(): |
||
| 112 | logger.error("Table name: %s not existed" % collection_name) |
||
| 113 | return |
||
| 114 | logger.info(milvus_instance.count()) |
||
| 115 | result = milvus_instance.describe_index() |
||
| 116 | logger.info(result) |
||
| 117 | milvus_instance.preload_collection() |
||
| 118 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 119 | logger.info(mem_usage) |
||
| 120 | for search_param in search_params: |
||
| 121 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
| 122 | res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param) |
||
| 123 | headers = ["Nq/Top-k"] |
||
| 124 | headers.extend([str(top_k) for top_k in top_ks]) |
||
| 125 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
| 126 | utils.print_table(headers, nqs, res) |
||
| 127 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 128 | logger.info(mem_usage) |
||
| 129 | |||
| 130 | elif run_type == "locust_search_performance": |
||
| 131 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 132 | ### spawn locust requests |
||
| 133 | collection_num = collection["collection_num"] |
||
| 134 | task = collection["task"] |
||
| 135 | #. generate task code |
||
| 136 | task_file = utils.get_unique_name() |
||
| 137 | task_file_script = task_file+'.py' |
||
| 138 | task_file_csv = task_file+'_stats.csv' |
||
| 139 | task_type = task["type"] |
||
| 140 | connection_type = "single" |
||
| 141 | connection_num = task["connection_num"] |
||
| 142 | if connection_num > 1: |
||
| 143 | connection_type = "multi" |
||
| 144 | clients_num = task["clients_num"] |
||
| 145 | hatch_rate = task["hatch_rate"] |
||
| 146 | during_time = task["during_time"] |
||
| 147 | def_name = task_type |
||
| 148 | task_params = task["params"] |
||
| 149 | collection_names = [] |
||
| 150 | for i in range(collection_num): |
||
| 151 | suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) |
||
| 152 | collection_names.append(collection_name + "_" + suffix) |
||
| 153 | # collection_names = ['sift_1m_1024_128_l2_Kg6co', 'sift_1m_1024_128_l2_egkBK', 'sift_1m_1024_128_l2_D0wtE', |
||
| 154 | # 'sift_1m_1024_128_l2_9naps', 'sift_1m_1024_128_l2_iJ0jj', 'sift_1m_1024_128_l2_nqUTm', |
||
| 155 | # 'sift_1m_1024_128_l2_GIF0D', 'sift_1m_1024_128_l2_EL2qk', 'sift_1m_1024_128_l2_qLRnC', |
||
| 156 | # 'sift_1m_1024_128_l2_8Ditg'] |
||
| 157 | # ##### |
||
| 158 | ni_per = collection["ni_per"] |
||
| 159 | build_index = collection["build_index"] |
||
| 160 | for c_name in collection_names: |
||
| 161 | milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port) |
||
| 162 | if milvus_instance.exists_collection(collection_name=c_name): |
||
| 163 | milvus_instance.drop(name=c_name) |
||
| 164 | time.sleep(10) |
||
| 165 | milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type) |
||
| 166 | if build_index is True: |
||
| 167 | index_type = collection["index_type"] |
||
| 168 | index_param = collection["index_param"] |
||
| 169 | milvus_instance.create_index(index_type, index_param) |
||
| 170 | logger.debug(milvus_instance.describe_index()) |
||
| 171 | res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per) |
||
| 172 | milvus_instance.flush() |
||
| 173 | logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name)) |
||
| 174 | if build_index is True: |
||
| 175 | logger.debug("Start build index for last file") |
||
| 176 | milvus_instance.create_index(index_type, index_param) |
||
| 177 | logger.debug(milvus_instance.describe_index()) |
||
| 178 | code_str = """ |
||
| 179 | import random |
||
| 180 | import string |
||
| 181 | from locust import User, task, between |
||
| 182 | from locust_task import MilvusTask |
||
| 183 | from client import MilvusClient |
||
| 184 | |||
| 185 | host = '%s' |
||
| 186 | port = %s |
||
| 187 | dim = %s |
||
| 188 | connection_type = '%s' |
||
| 189 | collection_names = %s |
||
| 190 | m = MilvusClient(host=host, port=port) |
||
| 191 | |||
| 192 | |||
| 193 | def get_collection_name(): |
||
| 194 | return random.choice(collection_names) |
||
| 195 | |||
| 196 | |||
| 197 | def get_client(collection_name): |
||
| 198 | if connection_type == 'single': |
||
| 199 | return MilvusTask(m=m) |
||
| 200 | elif connection_type == 'multi': |
||
| 201 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
| 202 | |||
| 203 | |||
| 204 | class QueryTask(User): |
||
| 205 | wait_time = between(0.001, 0.002) |
||
| 206 | |||
| 207 | @task() |
||
| 208 | def %s(self): |
||
| 209 | top_k = %s |
||
| 210 | X = [[random.random() for i in range(dim)] for i in range(%s)] |
||
| 211 | search_param = %s |
||
| 212 | collection_name = get_collection_name() |
||
| 213 | print(collection_name) |
||
| 214 | client = get_client(collection_name) |
||
| 215 | client.query(X, top_k, search_param, collection_name=collection_name) |
||
| 216 | """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"]) |
||
| 217 | with open(task_file_script, 'w+') as fd: |
||
| 218 | fd.write(code_str) |
||
| 219 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
| 220 | task_file_script, |
||
| 221 | task_file, |
||
| 222 | clients_num, |
||
| 223 | hatch_rate, |
||
| 224 | during_time) |
||
| 225 | logger.info(locust_cmd) |
||
| 226 | try: |
||
| 227 | res = os.system(locust_cmd) |
||
| 228 | except Exception as e: |
||
| 229 | logger.error(str(e)) |
||
| 230 | return |
||
| 231 | #. retrieve and collect test statistics |
||
| 232 | metric = None |
||
| 233 | with open(task_file_csv, newline='') as fd: |
||
| 234 | dr = csv.DictReader(fd) |
||
| 235 | for row in dr: |
||
| 236 | if row["Name"] != "Aggregated": |
||
| 237 | continue |
||
| 238 | metric = row |
||
| 239 | logger.info(metric) |
||
| 240 | # clean up temp files |
||
| 241 | |||
| 242 | elif run_type == "search_ids_stability": |
||
| 243 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 244 | search_params = collection["search_params"] |
||
| 245 | during_time = collection["during_time"] |
||
| 246 | ids_length = collection["ids_length"] |
||
| 247 | ids = collection["ids"] |
||
| 248 | logger.info(milvus_instance.count()) |
||
| 249 | index_info = milvus_instance.describe_index() |
||
| 250 | logger.info(index_info) |
||
| 251 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
| 252 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
| 253 | g_id = int(ids.split("-")[1]) |
||
| 254 | l_id = int(ids.split("-")[0]) |
||
| 255 | g_id_length = int(ids_length.split("-")[1]) |
||
| 256 | l_id_length = int(ids_length.split("-")[0]) |
||
| 257 | |||
| 258 | milvus_instance.preload_collection() |
||
| 259 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 260 | logger.debug(start_mem_usage) |
||
| 261 | start_time = time.time() |
||
| 262 | while time.time() < start_time + during_time * 60: |
||
| 263 | search_param = {} |
||
| 264 | top_k = random.randint(l_top_k, g_top_k) |
||
| 265 | ids_num = random.randint(l_id_length, g_id_length) |
||
| 266 | l_ids = random.randint(l_id, g_id-ids_num) |
||
| 267 | # ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)] |
||
| 268 | ids_param = [id for id in range(l_ids, l_ids+ids_num)] |
||
| 269 | for k, v in search_params.items(): |
||
| 270 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
| 271 | logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param))) |
||
| 272 | result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param) |
||
| 273 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 274 | metrics = { |
||
| 275 | "during_time": during_time, |
||
| 276 | "start_mem_usage": start_mem_usage, |
||
| 277 | "end_mem_usage": end_mem_usage, |
||
| 278 | "diff_mem": end_mem_usage - start_mem_usage, |
||
| 279 | } |
||
| 280 | logger.info(metrics) |
||
| 281 | |||
| 282 | elif run_type == "search_performance_concurrents": |
||
| 283 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
| 284 | hdf5_source_file = collection["source_file"] |
||
| 285 | use_single_connection = collection["use_single_connection"] |
||
| 286 | concurrents = collection["concurrents"] |
||
| 287 | top_ks = collection["top_ks"] |
||
| 288 | nqs = collection["nqs"] |
||
| 289 | search_params = self.generate_combinations(collection["search_params"]) |
||
| 290 | if not milvus_instance.exists_collection(): |
||
| 291 | logger.error("Table name: %s not existed" % collection_name) |
||
| 292 | return |
||
| 293 | logger.info(milvus_instance.count()) |
||
| 294 | result = milvus_instance.describe_index() |
||
| 295 | logger.info(result) |
||
| 296 | milvus_instance.preload_collection() |
||
| 297 | dataset = utils.get_dataset(hdf5_source_file) |
||
| 298 | for concurrent_num in concurrents: |
||
| 299 | top_k = top_ks[0] |
||
| 300 | for nq in nqs: |
||
| 301 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 302 | logger.info(mem_usage) |
||
| 303 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
| 304 | logger.debug(search_params) |
||
| 305 | for search_param in search_params: |
||
| 306 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
| 307 | total_time = 0.0 |
||
| 308 | if use_single_connection is True: |
||
| 309 | connections = [MilvusClient(collection_name=collection_name, host=self.host, port=self.port)] |
||
| 310 | with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
| 311 | future_results = {executor.submit( |
||
| 312 | self.do_query_qps, connections[0], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)} |
||
| 313 | else: |
||
| 314 | connections = [MilvusClient(collection_name=collection_name, host=self.hos, port=self.port) for i in range(concurrent_num)] |
||
| 315 | with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
| 316 | future_results = {executor.submit( |
||
| 317 | self.do_query_qps, connections[index], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)} |
||
| 318 | for future in concurrent.futures.as_completed(future_results): |
||
| 319 | total_time = total_time + future.result() |
||
| 320 | qps_value = total_time / concurrent_num |
||
| 321 | logger.debug("QPS value: %f, total_time: %f, request_nums: %f" % (qps_value, total_time, concurrent_num)) |
||
| 322 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 323 | logger.info(mem_usage) |
||
| 324 | |||
| 325 | elif run_type == "ann_accuracy": |
||
| 326 | hdf5_source_file = collection["source_file"] |
||
| 327 | collection_name = collection["collection_name"] |
||
| 328 | index_file_sizes = collection["index_file_sizes"] |
||
| 329 | index_types = collection["index_types"] |
||
| 330 | index_params = collection["index_params"] |
||
| 331 | top_ks = collection["top_ks"] |
||
| 332 | nqs = collection["nqs"] |
||
| 333 | search_params = collection["search_params"] |
||
| 334 | # mapping to search param list |
||
| 335 | search_params = self.generate_combinations(search_params) |
||
| 336 | # mapping to index param list |
||
| 337 | index_params = self.generate_combinations(index_params) |
||
| 338 | |||
| 339 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
| 340 | dataset = utils.get_dataset(hdf5_source_file) |
||
| 341 | if milvus_instance.exists_collection(collection_name): |
||
| 342 | logger.info("Re-create collection: %s" % collection_name) |
||
| 343 | milvus_instance.drop() |
||
| 344 | time.sleep(DELETE_INTERVAL_TIME) |
||
| 345 | true_ids = np.array(dataset["neighbors"]) |
||
| 346 | for index_file_size in index_file_sizes: |
||
| 347 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 348 | logger.info(milvus_instance.describe()) |
||
| 349 | insert_vectors = self.normalize(metric_type, np.array(dataset["train"])) |
||
| 350 | logger.debug(len(insert_vectors)) |
||
| 351 | # Insert batch once |
||
| 352 | # milvus_instance.insert(insert_vectors) |
||
| 353 | loops = len(insert_vectors) // INSERT_INTERVAL + 1 |
||
| 354 | for i in range(loops): |
||
| 355 | start = i*INSERT_INTERVAL |
||
| 356 | end = min((i+1)*INSERT_INTERVAL, len(insert_vectors)) |
||
| 357 | tmp_vectors = insert_vectors[start:end] |
||
| 358 | if start < end: |
||
| 359 | if not isinstance(tmp_vectors, list): |
||
| 360 | milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)]) |
||
| 361 | else: |
||
| 362 | milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)]) |
||
| 363 | milvus_instance.flush() |
||
| 364 | logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count())) |
||
| 365 | if milvus_instance.count() != len(insert_vectors): |
||
| 366 | logger.error("Table row count is not equal to insert vectors") |
||
| 367 | return |
||
| 368 | for index_type in index_types: |
||
| 369 | for index_param in index_params: |
||
| 370 | logger.debug("Building index with param: %s" % json.dumps(index_param)) |
||
| 371 | milvus_instance.create_index(index_type, index_param=index_param) |
||
| 372 | logger.info(milvus_instance.describe_index()) |
||
| 373 | logger.info("Start preload collection: %s" % collection_name) |
||
| 374 | milvus_instance.preload_collection() |
||
| 375 | for search_param in search_params: |
||
| 376 | for nq in nqs: |
||
| 377 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
| 378 | for top_k in top_ks: |
||
| 379 | logger.debug("Search nq: %d, top-k: %d, search_param: %s" % (nq, top_k, json.dumps(search_param))) |
||
| 380 | if not isinstance(query_vectors, list): |
||
| 381 | result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param) |
||
| 382 | else: |
||
| 383 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
| 384 | result_ids = result.id_array |
||
| 385 | acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids) |
||
| 386 | logger.info("Query ann_accuracy: %s" % acc_value) |
||
| 387 | |||
| 388 | |||
| 389 | elif run_type == "stability": |
||
| 390 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 391 | search_params = collection["search_params"] |
||
| 392 | insert_xb = collection["insert_xb"] |
||
| 393 | insert_interval = collection["insert_interval"] |
||
| 394 | delete_xb = collection["delete_xb"] |
||
| 395 | # flush_interval = collection["flush_interval"] |
||
| 396 | # compact_interval = collection["compact_interval"] |
||
| 397 | during_time = collection["during_time"] |
||
| 398 | if not milvus_instance.exists_collection(): |
||
| 399 | logger.error(milvus_instance.show_collections()) |
||
| 400 | logger.error("Table name: %s not existed" % collection_name) |
||
| 401 | return |
||
| 402 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
| 403 | g_nq = int(collection["nqs"].split("-")[1]) |
||
| 404 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
| 405 | l_nq = int(collection["nqs"].split("-")[0]) |
||
| 406 | milvus_instance.preload_collection() |
||
| 407 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 408 | start_row_count = milvus_instance.count() |
||
| 409 | logger.debug(milvus_instance.describe_index()) |
||
| 410 | logger.info(start_row_count) |
||
| 411 | start_time = time.time() |
||
| 412 | i = 0 |
||
| 413 | ids = [] |
||
| 414 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
| 415 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
||
| 416 | View Code Duplication | while time.time() < start_time + during_time * 60: |
|
| 417 | i = i + 1 |
||
| 418 | for _ in range(insert_interval): |
||
| 419 | top_k = random.randint(l_top_k, g_top_k) |
||
| 420 | nq = random.randint(l_nq, g_nq) |
||
| 421 | search_param = {} |
||
| 422 | for k, v in search_params.items(): |
||
| 423 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
| 424 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
| 425 | result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param) |
||
| 426 | count = milvus_instance.count() |
||
| 427 | insert_ids = [(count+x) for x in range(len(insert_vectors))] |
||
| 428 | ids.extend(insert_ids) |
||
| 429 | _, res = milvus_instance.insert(insert_vectors, ids=insert_ids) |
||
| 430 | logger.debug("%d, row_count: %d" % (i, milvus_instance.count())) |
||
| 431 | milvus_instance.delete(ids[-delete_xb:]) |
||
| 432 | milvus_instance.flush() |
||
| 433 | milvus_instance.compact() |
||
| 434 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
| 435 | end_row_count = milvus_instance.count() |
||
| 436 | metrics = { |
||
| 437 | "during_time": during_time, |
||
| 438 | "start_mem_usage": start_mem_usage, |
||
| 439 | "end_mem_usage": end_mem_usage, |
||
| 440 | "diff_mem": end_mem_usage - start_mem_usage, |
||
| 441 | "row_count_increments": end_row_count - start_row_count |
||
| 442 | } |
||
| 443 | logger.info(metrics) |
||
| 444 | |||
| 445 | elif run_type == "loop_stability": |
||
| 446 | # init data |
||
| 447 | milvus_instance.clean_db() |
||
| 448 | pull_interval = collection["pull_interval"] |
||
| 449 | pull_interval_seconds = pull_interval * 60 |
||
| 450 | collection_num = collection["collection_num"] |
||
| 451 | dimension = collection["dimension"] if "dimension" in collection else 128 |
||
| 452 | insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000 |
||
| 453 | index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8'] |
||
| 454 | index_param = {"nlist": 2048} |
||
| 455 | collection_names = [] |
||
| 456 | milvus_instances_map = {} |
||
| 457 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
| 458 | for i in range(collection_num): |
||
| 459 | name = utils.get_unique_name(prefix="collection_") |
||
| 460 | collection_names.append(name) |
||
| 461 | metric_type = random.choice(["l2", "ip"]) |
||
| 462 | index_file_size = random.randint(10, 20) |
||
| 463 | milvus_instance.create_collection(name, dimension, index_file_size, metric_type) |
||
| 464 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
| 465 | index_type = random.choice(index_types) |
||
| 466 | milvus_instance.create_index(index_type, index_param=index_param) |
||
| 467 | logger.info(milvus_instance.describe_index()) |
||
| 468 | insert_vectors = utils.normalize(metric_type, insert_vectors) |
||
| 469 | milvus_instance.insert(insert_vectors) |
||
| 470 | milvus_instance.flush() |
||
| 471 | milvus_instances_map.update({name: milvus_instance}) |
||
| 472 | logger.info(milvus_instance.describe_index()) |
||
| 473 | logger.info(milvus_instance.describe()) |
||
| 474 | |||
| 475 | tasks = ["insert_rand", "delete_rand", "query_rand", "flush"] |
||
| 476 | i = 1 |
||
| 477 | while True: |
||
| 478 | logger.info("Loop time: %d" % i) |
||
| 479 | start_time = time.time() |
||
| 480 | while time.time() - start_time < pull_interval_seconds: |
||
| 481 | # choose collection |
||
| 482 | tmp_collection_name = random.choice(collection_names) |
||
| 483 | # choose task from task |
||
| 484 | task_name = random.choice(tasks) |
||
| 485 | logger.info(tmp_collection_name) |
||
| 486 | logger.info(task_name) |
||
| 487 | # execute task |
||
| 488 | task_run = getattr(milvus_instances_map[tmp_collection_name], task_name) |
||
| 489 | task_run() |
||
| 490 | # new connection |
||
| 491 | for name in collection_names: |
||
| 492 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
| 493 | milvus_instances_map.update({name: milvus_instance}) |
||
| 494 | i = i + 1 |
||
| 495 | elif run_type == "locust_mix_performance": |
||
| 496 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
| 497 | # ni_per = collection["ni_per"] |
||
| 498 | # build_index = collection["build_index"] |
||
| 499 | # if milvus_instance.exists_collection(): |
||
| 500 | # milvus_instance.drop() |
||
| 501 | # time.sleep(10) |
||
| 502 | # milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
| 503 | # if build_index is True: |
||
| 504 | # index_type = collection["index_type"] |
||
| 505 | # index_param = collection["index_param"] |
||
| 506 | # milvus_instance.create_index(index_type, index_param) |
||
| 507 | # logger.debug(milvus_instance.describe_index()) |
||
| 508 | # res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
| 509 | # milvus_instance.flush() |
||
| 510 | # logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
| 511 | # if build_index is True: |
||
| 512 | # logger.debug("Start build index for last file") |
||
| 513 | # milvus_instance.create_index(index_type, index_param) |
||
| 514 | # logger.debug(milvus_instance.describe_index()) |
||
| 515 | task = collection["tasks"] |
||
| 516 | task_file = utils.get_unique_name() |
||
| 517 | task_file_script = task_file + '.py' |
||
| 518 | task_file_csv = task_file + '_stats.csv' |
||
| 519 | task_types = task["types"] |
||
| 520 | connection_type = "single" |
||
| 521 | connection_num = task["connection_num"] |
||
| 522 | if connection_num > 1: |
||
| 523 | connection_type = "multi" |
||
| 524 | clients_num = task["clients_num"] |
||
| 525 | hatch_rate = task["hatch_rate"] |
||
| 526 | during_time = task["during_time"] |
||
| 527 | def_strs = "" |
||
| 528 | for task_type in task_types: |
||
| 529 | _type = task_type["type"] |
||
| 530 | weight = task_type["weight"] |
||
| 531 | if _type == "flush": |
||
| 532 | def_str = """ |
||
| 533 | @task(%d) |
||
| 534 | def flush(self): |
||
| 535 | client = get_client(collection_name) |
||
| 536 | client.flush(collection_name=collection_name) |
||
| 537 | """ % weight |
||
| 538 | if _type == "compact": |
||
| 539 | def_str = """ |
||
| 540 | @task(%d) |
||
| 541 | def compact(self): |
||
| 542 | client = get_client(collection_name) |
||
| 543 | client.compact(collection_name) |
||
| 544 | """ % weight |
||
| 545 | if _type == "query": |
||
| 546 | def_str = """ |
||
| 547 | @task(%d) |
||
| 548 | def query(self): |
||
| 549 | client = get_client(collection_name) |
||
| 550 | params = %s |
||
| 551 | X = [[random.random() for i in range(dim)] for i in range(params["nq"])] |
||
| 552 | client.query(X, params["top_k"], params["search_param"], collection_name=collection_name) |
||
| 553 | """ % (weight, task_type["params"]) |
||
| 554 | if _type == "insert": |
||
| 555 | def_str = """ |
||
| 556 | @task(%d) |
||
| 557 | def insert(self): |
||
| 558 | client = get_client(collection_name) |
||
| 559 | params = %s |
||
| 560 | ids = [random.randint(10, 1000000) for i in range(params["nb"])] |
||
| 561 | X = [[random.random() for i in range(dim)] for i in range(params["nb"])] |
||
| 562 | client.insert(X,ids=ids, collection_name=collection_name) |
||
| 563 | """ % (weight, task_type["params"]) |
||
| 564 | if _type == "delete": |
||
| 565 | def_str = """ |
||
| 566 | @task(%d) |
||
| 567 | def delete(self): |
||
| 568 | client = get_client(collection_name) |
||
| 569 | ids = [random.randint(1, 1000000) for i in range(1)] |
||
| 570 | client.delete(ids, collection_name) |
||
| 571 | """ % weight |
||
| 572 | def_strs += def_str |
||
| 573 | print(def_strs) |
||
| 574 | code_str = """ |
||
| 575 | import random |
||
| 576 | import json |
||
| 577 | from locust import User, task, between |
||
| 578 | from locust_task import MilvusTask |
||
| 579 | from client import MilvusClient |
||
| 580 | |||
| 581 | host = '%s' |
||
| 582 | port = %s |
||
| 583 | collection_name = '%s' |
||
| 584 | dim = %s |
||
| 585 | connection_type = '%s' |
||
| 586 | m = MilvusClient(host=host, port=port) |
||
| 587 | |||
| 588 | |||
| 589 | def get_client(collection_name): |
||
| 590 | if connection_type == 'single': |
||
| 591 | return MilvusTask(m=m) |
||
| 592 | elif connection_type == 'multi': |
||
| 593 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
| 594 | |||
| 595 | |||
| 596 | class MixTask(User): |
||
| 597 | wait_time = between(0.001, 0.002) |
||
| 598 | %s |
||
| 599 | """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs) |
||
| 600 | with open(task_file_script, "w+") as fd: |
||
| 601 | fd.write(code_str) |
||
| 602 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
| 603 | task_file_script, |
||
| 604 | task_file, |
||
| 605 | clients_num, |
||
| 606 | hatch_rate, |
||
| 607 | during_time) |
||
| 608 | logger.info(locust_cmd) |
||
| 609 | try: |
||
| 610 | res = os.system(locust_cmd) |
||
| 611 | except Exception as e: |
||
| 612 | logger.error(str(e)) |
||
| 613 | return |
||
| 614 | |||
| 615 | # . retrieve and collect test statistics |
||
| 616 | metric = None |
||
| 617 | with open(task_file_csv, newline='') as fd: |
||
| 618 | dr = csv.DictReader(fd) |
||
| 619 | for row in dr: |
||
| 620 | if row["Name"] != "Aggregated": |
||
| 621 | continue |
||
| 622 | metric = row |
||
| 623 | logger.info(metric) |
||
| 624 | |||
| 625 | else: |
||
| 626 | logger.warning("Run type not defined") |
||
| 627 | return |
||
| 628 | logger.debug("Test finished") |
||
| 629 |