Total Complexity | 78 |
Total Lines | 629 |
Duplicated Lines | 2.86 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like milvus_benchmark.local_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | import os |
||
2 | import logging |
||
3 | import pdb |
||
4 | import time |
||
5 | import random |
||
6 | import string |
||
7 | import json |
||
8 | import csv |
||
9 | from multiprocessing import Process |
||
10 | import numpy as np |
||
11 | import concurrent.futures |
||
12 | from client import MilvusClient |
||
13 | import utils |
||
14 | import parser |
||
15 | from runner import Runner |
||
16 | |||
17 | DELETE_INTERVAL_TIME = 5 |
||
18 | INSERT_INTERVAL = 50000 |
||
19 | logger = logging.getLogger("milvus_benchmark.local_runner") |
||
20 | |||
21 | |||
22 | class LocalRunner(Runner): |
||
23 | def __init__(self, host, port): |
||
24 | """ |
||
25 | Run tests at local mode. |
||
26 | |||
27 | Make sure the server has started |
||
28 | """ |
||
29 | super(LocalRunner, self).__init__() |
||
30 | self.host = host |
||
31 | self.port = port |
||
32 | |||
33 | def run(self, run_type, collection): |
||
34 | logger.debug(run_type) |
||
35 | logger.debug(collection) |
||
36 | collection_name = collection["collection_name"] if "collection_name" in collection else None |
||
37 | milvus_instance = MilvusClient(collection_name=collection_name, host=self.host, port=self.port) |
||
38 | logger.info(milvus_instance.show_collections()) |
||
39 | env_value = milvus_instance.get_server_config() |
||
40 | logger.debug(env_value) |
||
41 | |||
42 | if run_type in ["insert_performance", "insert_flush_performance"]: |
||
43 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
44 | ni_per = collection["ni_per"] |
||
45 | build_index = collection["build_index"] |
||
46 | if milvus_instance.exists_collection(): |
||
47 | milvus_instance.drop() |
||
48 | time.sleep(10) |
||
49 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
50 | if build_index is True: |
||
51 | index_type = collection["index_type"] |
||
52 | index_param = collection["index_param"] |
||
53 | milvus_instance.create_index(index_type, index_param) |
||
54 | logger.debug(milvus_instance.describe_index()) |
||
55 | res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
56 | milvus_instance.flush() |
||
57 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
58 | if build_index is True: |
||
59 | logger.debug("Start build index for last file") |
||
60 | milvus_instance.create_index(index_type, index_param) |
||
61 | logger.debug(milvus_instance.describe_index()) |
||
62 | |||
63 | elif run_type == "delete_performance": |
||
64 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
65 | ni_per = collection["ni_per"] |
||
66 | if not milvus_instance.exists_collection(): |
||
67 | logger.error(milvus_instance.show_collections()) |
||
68 | logger.warning("Table: %s not found" % collection_name) |
||
69 | return |
||
70 | length = milvus_instance.count() |
||
71 | ids = [i for i in range(length)] |
||
72 | loops = int(length / ni_per) |
||
73 | for i in range(loops): |
||
74 | delete_ids = ids[i*ni_per : i*ni_per+ni_per] |
||
75 | logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1])) |
||
76 | milvus_instance.delete(delete_ids) |
||
77 | milvus_instance.flush() |
||
78 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
79 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
80 | milvus_instance.flush() |
||
81 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
82 | |||
83 | elif run_type == "build_performance": |
||
84 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
85 | index_type = collection["index_type"] |
||
86 | index_param = collection["index_param"] |
||
87 | if not milvus_instance.exists_collection(): |
||
88 | logger.error("Table name: %s not existed" % collection_name) |
||
89 | return |
||
90 | search_params = {} |
||
91 | start_time = time.time() |
||
92 | # drop index |
||
93 | logger.debug("Drop index") |
||
94 | milvus_instance.drop_index() |
||
95 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
96 | milvus_instance.create_index(index_type, index_param) |
||
97 | logger.debug(milvus_instance.describe_index()) |
||
98 | logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
99 | end_time = time.time() |
||
100 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
101 | logger.debug("Diff memory: %s, current memory usage: %s, build time: %s" % ((end_mem_usage - start_mem_usage), end_mem_usage, round(end_time - start_time, 1))) |
||
102 | |||
103 | elif run_type == "search_performance": |
||
104 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
105 | run_count = collection["run_count"] |
||
106 | top_ks = collection["top_ks"] |
||
107 | nqs = collection["nqs"] |
||
108 | search_params = collection["search_params"] |
||
109 | # for debugging |
||
110 | # time.sleep(3600) |
||
111 | if not milvus_instance.exists_collection(): |
||
112 | logger.error("Table name: %s not existed" % collection_name) |
||
113 | return |
||
114 | logger.info(milvus_instance.count()) |
||
115 | result = milvus_instance.describe_index() |
||
116 | logger.info(result) |
||
117 | milvus_instance.preload_collection() |
||
118 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
119 | logger.info(mem_usage) |
||
120 | for search_param in search_params: |
||
121 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
122 | res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param) |
||
123 | headers = ["Nq/Top-k"] |
||
124 | headers.extend([str(top_k) for top_k in top_ks]) |
||
125 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
126 | utils.print_table(headers, nqs, res) |
||
127 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
128 | logger.info(mem_usage) |
||
129 | |||
130 | elif run_type == "locust_search_performance": |
||
131 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
132 | ### spawn locust requests |
||
133 | collection_num = collection["collection_num"] |
||
134 | task = collection["task"] |
||
135 | #. generate task code |
||
136 | task_file = utils.get_unique_name() |
||
137 | task_file_script = task_file+'.py' |
||
138 | task_file_csv = task_file+'_stats.csv' |
||
139 | task_type = task["type"] |
||
140 | connection_type = "single" |
||
141 | connection_num = task["connection_num"] |
||
142 | if connection_num > 1: |
||
143 | connection_type = "multi" |
||
144 | clients_num = task["clients_num"] |
||
145 | hatch_rate = task["hatch_rate"] |
||
146 | during_time = task["during_time"] |
||
147 | def_name = task_type |
||
148 | task_params = task["params"] |
||
149 | collection_names = [] |
||
150 | for i in range(collection_num): |
||
151 | suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5)) |
||
152 | collection_names.append(collection_name + "_" + suffix) |
||
153 | # collection_names = ['sift_1m_1024_128_l2_Kg6co', 'sift_1m_1024_128_l2_egkBK', 'sift_1m_1024_128_l2_D0wtE', |
||
154 | # 'sift_1m_1024_128_l2_9naps', 'sift_1m_1024_128_l2_iJ0jj', 'sift_1m_1024_128_l2_nqUTm', |
||
155 | # 'sift_1m_1024_128_l2_GIF0D', 'sift_1m_1024_128_l2_EL2qk', 'sift_1m_1024_128_l2_qLRnC', |
||
156 | # 'sift_1m_1024_128_l2_8Ditg'] |
||
157 | # ##### |
||
158 | ni_per = collection["ni_per"] |
||
159 | build_index = collection["build_index"] |
||
160 | for c_name in collection_names: |
||
161 | milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port) |
||
162 | if milvus_instance.exists_collection(collection_name=c_name): |
||
163 | milvus_instance.drop(name=c_name) |
||
164 | time.sleep(10) |
||
165 | milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type) |
||
166 | if build_index is True: |
||
167 | index_type = collection["index_type"] |
||
168 | index_param = collection["index_param"] |
||
169 | milvus_instance.create_index(index_type, index_param) |
||
170 | logger.debug(milvus_instance.describe_index()) |
||
171 | res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per) |
||
172 | milvus_instance.flush() |
||
173 | logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name)) |
||
174 | if build_index is True: |
||
175 | logger.debug("Start build index for last file") |
||
176 | milvus_instance.create_index(index_type, index_param) |
||
177 | logger.debug(milvus_instance.describe_index()) |
||
178 | code_str = """ |
||
179 | import random |
||
180 | import string |
||
181 | from locust import User, task, between |
||
182 | from locust_task import MilvusTask |
||
183 | from client import MilvusClient |
||
184 | |||
185 | host = '%s' |
||
186 | port = %s |
||
187 | dim = %s |
||
188 | connection_type = '%s' |
||
189 | collection_names = %s |
||
190 | m = MilvusClient(host=host, port=port) |
||
191 | |||
192 | |||
193 | def get_collection_name(): |
||
194 | return random.choice(collection_names) |
||
195 | |||
196 | |||
197 | def get_client(collection_name): |
||
198 | if connection_type == 'single': |
||
199 | return MilvusTask(m=m) |
||
200 | elif connection_type == 'multi': |
||
201 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
202 | |||
203 | |||
204 | class QueryTask(User): |
||
205 | wait_time = between(0.001, 0.002) |
||
206 | |||
207 | @task() |
||
208 | def %s(self): |
||
209 | top_k = %s |
||
210 | X = [[random.random() for i in range(dim)] for i in range(%s)] |
||
211 | search_param = %s |
||
212 | collection_name = get_collection_name() |
||
213 | print(collection_name) |
||
214 | client = get_client(collection_name) |
||
215 | client.query(X, top_k, search_param, collection_name=collection_name) |
||
216 | """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"]) |
||
217 | with open(task_file_script, 'w+') as fd: |
||
218 | fd.write(code_str) |
||
219 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
220 | task_file_script, |
||
221 | task_file, |
||
222 | clients_num, |
||
223 | hatch_rate, |
||
224 | during_time) |
||
225 | logger.info(locust_cmd) |
||
226 | try: |
||
227 | res = os.system(locust_cmd) |
||
228 | except Exception as e: |
||
229 | logger.error(str(e)) |
||
230 | return |
||
231 | #. retrieve and collect test statistics |
||
232 | metric = None |
||
233 | with open(task_file_csv, newline='') as fd: |
||
234 | dr = csv.DictReader(fd) |
||
235 | for row in dr: |
||
236 | if row["Name"] != "Aggregated": |
||
237 | continue |
||
238 | metric = row |
||
239 | logger.info(metric) |
||
240 | # clean up temp files |
||
241 | |||
242 | elif run_type == "search_ids_stability": |
||
243 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
244 | search_params = collection["search_params"] |
||
245 | during_time = collection["during_time"] |
||
246 | ids_length = collection["ids_length"] |
||
247 | ids = collection["ids"] |
||
248 | logger.info(milvus_instance.count()) |
||
249 | index_info = milvus_instance.describe_index() |
||
250 | logger.info(index_info) |
||
251 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
252 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
253 | g_id = int(ids.split("-")[1]) |
||
254 | l_id = int(ids.split("-")[0]) |
||
255 | g_id_length = int(ids_length.split("-")[1]) |
||
256 | l_id_length = int(ids_length.split("-")[0]) |
||
257 | |||
258 | milvus_instance.preload_collection() |
||
259 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
260 | logger.debug(start_mem_usage) |
||
261 | start_time = time.time() |
||
262 | while time.time() < start_time + during_time * 60: |
||
263 | search_param = {} |
||
264 | top_k = random.randint(l_top_k, g_top_k) |
||
265 | ids_num = random.randint(l_id_length, g_id_length) |
||
266 | l_ids = random.randint(l_id, g_id-ids_num) |
||
267 | # ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)] |
||
268 | ids_param = [id for id in range(l_ids, l_ids+ids_num)] |
||
269 | for k, v in search_params.items(): |
||
270 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
271 | logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param))) |
||
272 | result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param) |
||
273 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
274 | metrics = { |
||
275 | "during_time": during_time, |
||
276 | "start_mem_usage": start_mem_usage, |
||
277 | "end_mem_usage": end_mem_usage, |
||
278 | "diff_mem": end_mem_usage - start_mem_usage, |
||
279 | } |
||
280 | logger.info(metrics) |
||
281 | |||
282 | elif run_type == "search_performance_concurrents": |
||
283 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
284 | hdf5_source_file = collection["source_file"] |
||
285 | use_single_connection = collection["use_single_connection"] |
||
286 | concurrents = collection["concurrents"] |
||
287 | top_ks = collection["top_ks"] |
||
288 | nqs = collection["nqs"] |
||
289 | search_params = self.generate_combinations(collection["search_params"]) |
||
290 | if not milvus_instance.exists_collection(): |
||
291 | logger.error("Table name: %s not existed" % collection_name) |
||
292 | return |
||
293 | logger.info(milvus_instance.count()) |
||
294 | result = milvus_instance.describe_index() |
||
295 | logger.info(result) |
||
296 | milvus_instance.preload_collection() |
||
297 | dataset = utils.get_dataset(hdf5_source_file) |
||
298 | for concurrent_num in concurrents: |
||
299 | top_k = top_ks[0] |
||
300 | for nq in nqs: |
||
301 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
302 | logger.info(mem_usage) |
||
303 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
304 | logger.debug(search_params) |
||
305 | for search_param in search_params: |
||
306 | logger.info("Search param: %s" % json.dumps(search_param)) |
||
307 | total_time = 0.0 |
||
308 | if use_single_connection is True: |
||
309 | connections = [MilvusClient(collection_name=collection_name, host=self.host, port=self.port)] |
||
310 | with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
311 | future_results = {executor.submit( |
||
312 | self.do_query_qps, connections[0], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)} |
||
313 | else: |
||
314 | connections = [MilvusClient(collection_name=collection_name, host=self.hos, port=self.port) for i in range(concurrent_num)] |
||
315 | with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor: |
||
316 | future_results = {executor.submit( |
||
317 | self.do_query_qps, connections[index], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)} |
||
318 | for future in concurrent.futures.as_completed(future_results): |
||
319 | total_time = total_time + future.result() |
||
320 | qps_value = total_time / concurrent_num |
||
321 | logger.debug("QPS value: %f, total_time: %f, request_nums: %f" % (qps_value, total_time, concurrent_num)) |
||
322 | mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
323 | logger.info(mem_usage) |
||
324 | |||
325 | elif run_type == "ann_accuracy": |
||
326 | hdf5_source_file = collection["source_file"] |
||
327 | collection_name = collection["collection_name"] |
||
328 | index_file_sizes = collection["index_file_sizes"] |
||
329 | index_types = collection["index_types"] |
||
330 | index_params = collection["index_params"] |
||
331 | top_ks = collection["top_ks"] |
||
332 | nqs = collection["nqs"] |
||
333 | search_params = collection["search_params"] |
||
334 | # mapping to search param list |
||
335 | search_params = self.generate_combinations(search_params) |
||
336 | # mapping to index param list |
||
337 | index_params = self.generate_combinations(index_params) |
||
338 | |||
339 | data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name) |
||
340 | dataset = utils.get_dataset(hdf5_source_file) |
||
341 | if milvus_instance.exists_collection(collection_name): |
||
342 | logger.info("Re-create collection: %s" % collection_name) |
||
343 | milvus_instance.drop() |
||
344 | time.sleep(DELETE_INTERVAL_TIME) |
||
345 | true_ids = np.array(dataset["neighbors"]) |
||
346 | for index_file_size in index_file_sizes: |
||
347 | milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
348 | logger.info(milvus_instance.describe()) |
||
349 | insert_vectors = self.normalize(metric_type, np.array(dataset["train"])) |
||
350 | logger.debug(len(insert_vectors)) |
||
351 | # Insert batch once |
||
352 | # milvus_instance.insert(insert_vectors) |
||
353 | loops = len(insert_vectors) // INSERT_INTERVAL + 1 |
||
354 | for i in range(loops): |
||
355 | start = i*INSERT_INTERVAL |
||
356 | end = min((i+1)*INSERT_INTERVAL, len(insert_vectors)) |
||
357 | tmp_vectors = insert_vectors[start:end] |
||
358 | if start < end: |
||
359 | if not isinstance(tmp_vectors, list): |
||
360 | milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)]) |
||
361 | else: |
||
362 | milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)]) |
||
363 | milvus_instance.flush() |
||
364 | logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count())) |
||
365 | if milvus_instance.count() != len(insert_vectors): |
||
366 | logger.error("Table row count is not equal to insert vectors") |
||
367 | return |
||
368 | for index_type in index_types: |
||
369 | for index_param in index_params: |
||
370 | logger.debug("Building index with param: %s" % json.dumps(index_param)) |
||
371 | milvus_instance.create_index(index_type, index_param=index_param) |
||
372 | logger.info(milvus_instance.describe_index()) |
||
373 | logger.info("Start preload collection: %s" % collection_name) |
||
374 | milvus_instance.preload_collection() |
||
375 | for search_param in search_params: |
||
376 | for nq in nqs: |
||
377 | query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq])) |
||
378 | for top_k in top_ks: |
||
379 | logger.debug("Search nq: %d, top-k: %d, search_param: %s" % (nq, top_k, json.dumps(search_param))) |
||
380 | if not isinstance(query_vectors, list): |
||
381 | result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param) |
||
382 | else: |
||
383 | result = milvus_instance.query(query_vectors, top_k, search_param=search_param) |
||
384 | result_ids = result.id_array |
||
385 | acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids) |
||
386 | logger.info("Query ann_accuracy: %s" % acc_value) |
||
387 | |||
388 | |||
389 | elif run_type == "stability": |
||
390 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
391 | search_params = collection["search_params"] |
||
392 | insert_xb = collection["insert_xb"] |
||
393 | insert_interval = collection["insert_interval"] |
||
394 | delete_xb = collection["delete_xb"] |
||
395 | # flush_interval = collection["flush_interval"] |
||
396 | # compact_interval = collection["compact_interval"] |
||
397 | during_time = collection["during_time"] |
||
398 | if not milvus_instance.exists_collection(): |
||
399 | logger.error(milvus_instance.show_collections()) |
||
400 | logger.error("Table name: %s not existed" % collection_name) |
||
401 | return |
||
402 | g_top_k = int(collection["top_ks"].split("-")[1]) |
||
403 | g_nq = int(collection["nqs"].split("-")[1]) |
||
404 | l_top_k = int(collection["top_ks"].split("-")[0]) |
||
405 | l_nq = int(collection["nqs"].split("-")[0]) |
||
406 | milvus_instance.preload_collection() |
||
407 | start_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
408 | start_row_count = milvus_instance.count() |
||
409 | logger.debug(milvus_instance.describe_index()) |
||
410 | logger.info(start_row_count) |
||
411 | start_time = time.time() |
||
412 | i = 0 |
||
413 | ids = [] |
||
414 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
415 | query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)] |
||
416 | View Code Duplication | while time.time() < start_time + during_time * 60: |
|
417 | i = i + 1 |
||
418 | for _ in range(insert_interval): |
||
419 | top_k = random.randint(l_top_k, g_top_k) |
||
420 | nq = random.randint(l_nq, g_nq) |
||
421 | search_param = {} |
||
422 | for k, v in search_params.items(): |
||
423 | search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1])) |
||
424 | logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param))) |
||
425 | result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param) |
||
426 | count = milvus_instance.count() |
||
427 | insert_ids = [(count+x) for x in range(len(insert_vectors))] |
||
428 | ids.extend(insert_ids) |
||
429 | _, res = milvus_instance.insert(insert_vectors, ids=insert_ids) |
||
430 | logger.debug("%d, row_count: %d" % (i, milvus_instance.count())) |
||
431 | milvus_instance.delete(ids[-delete_xb:]) |
||
432 | milvus_instance.flush() |
||
433 | milvus_instance.compact() |
||
434 | end_mem_usage = milvus_instance.get_mem_info()["memory_used"] |
||
435 | end_row_count = milvus_instance.count() |
||
436 | metrics = { |
||
437 | "during_time": during_time, |
||
438 | "start_mem_usage": start_mem_usage, |
||
439 | "end_mem_usage": end_mem_usage, |
||
440 | "diff_mem": end_mem_usage - start_mem_usage, |
||
441 | "row_count_increments": end_row_count - start_row_count |
||
442 | } |
||
443 | logger.info(metrics) |
||
444 | |||
445 | elif run_type == "loop_stability": |
||
446 | # init data |
||
447 | milvus_instance.clean_db() |
||
448 | pull_interval = collection["pull_interval"] |
||
449 | pull_interval_seconds = pull_interval * 60 |
||
450 | collection_num = collection["collection_num"] |
||
451 | dimension = collection["dimension"] if "dimension" in collection else 128 |
||
452 | insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000 |
||
453 | index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8'] |
||
454 | index_param = {"nlist": 2048} |
||
455 | collection_names = [] |
||
456 | milvus_instances_map = {} |
||
457 | insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)] |
||
458 | for i in range(collection_num): |
||
459 | name = utils.get_unique_name(prefix="collection_") |
||
460 | collection_names.append(name) |
||
461 | metric_type = random.choice(["l2", "ip"]) |
||
462 | index_file_size = random.randint(10, 20) |
||
463 | milvus_instance.create_collection(name, dimension, index_file_size, metric_type) |
||
464 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
465 | index_type = random.choice(index_types) |
||
466 | milvus_instance.create_index(index_type, index_param=index_param) |
||
467 | logger.info(milvus_instance.describe_index()) |
||
468 | insert_vectors = utils.normalize(metric_type, insert_vectors) |
||
469 | milvus_instance.insert(insert_vectors) |
||
470 | milvus_instance.flush() |
||
471 | milvus_instances_map.update({name: milvus_instance}) |
||
472 | logger.info(milvus_instance.describe_index()) |
||
473 | logger.info(milvus_instance.describe()) |
||
474 | |||
475 | tasks = ["insert_rand", "delete_rand", "query_rand", "flush"] |
||
476 | i = 1 |
||
477 | while True: |
||
478 | logger.info("Loop time: %d" % i) |
||
479 | start_time = time.time() |
||
480 | while time.time() - start_time < pull_interval_seconds: |
||
481 | # choose collection |
||
482 | tmp_collection_name = random.choice(collection_names) |
||
483 | # choose task from task |
||
484 | task_name = random.choice(tasks) |
||
485 | logger.info(tmp_collection_name) |
||
486 | logger.info(task_name) |
||
487 | # execute task |
||
488 | task_run = getattr(milvus_instances_map[tmp_collection_name], task_name) |
||
489 | task_run() |
||
490 | # new connection |
||
491 | for name in collection_names: |
||
492 | milvus_instance = MilvusClient(collection_name=name, host=self.host) |
||
493 | milvus_instances_map.update({name: milvus_instance}) |
||
494 | i = i + 1 |
||
495 | elif run_type == "locust_mix_performance": |
||
496 | (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name) |
||
497 | # ni_per = collection["ni_per"] |
||
498 | # build_index = collection["build_index"] |
||
499 | # if milvus_instance.exists_collection(): |
||
500 | # milvus_instance.drop() |
||
501 | # time.sleep(10) |
||
502 | # milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type) |
||
503 | # if build_index is True: |
||
504 | # index_type = collection["index_type"] |
||
505 | # index_param = collection["index_param"] |
||
506 | # milvus_instance.create_index(index_type, index_param) |
||
507 | # logger.debug(milvus_instance.describe_index()) |
||
508 | # res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per) |
||
509 | # milvus_instance.flush() |
||
510 | # logger.debug("Table row counts: %d" % milvus_instance.count()) |
||
511 | # if build_index is True: |
||
512 | # logger.debug("Start build index for last file") |
||
513 | # milvus_instance.create_index(index_type, index_param) |
||
514 | # logger.debug(milvus_instance.describe_index()) |
||
515 | task = collection["tasks"] |
||
516 | task_file = utils.get_unique_name() |
||
517 | task_file_script = task_file + '.py' |
||
518 | task_file_csv = task_file + '_stats.csv' |
||
519 | task_types = task["types"] |
||
520 | connection_type = "single" |
||
521 | connection_num = task["connection_num"] |
||
522 | if connection_num > 1: |
||
523 | connection_type = "multi" |
||
524 | clients_num = task["clients_num"] |
||
525 | hatch_rate = task["hatch_rate"] |
||
526 | during_time = task["during_time"] |
||
527 | def_strs = "" |
||
528 | for task_type in task_types: |
||
529 | _type = task_type["type"] |
||
530 | weight = task_type["weight"] |
||
531 | if _type == "flush": |
||
532 | def_str = """ |
||
533 | @task(%d) |
||
534 | def flush(self): |
||
535 | client = get_client(collection_name) |
||
536 | client.flush(collection_name=collection_name) |
||
537 | """ % weight |
||
538 | if _type == "compact": |
||
539 | def_str = """ |
||
540 | @task(%d) |
||
541 | def compact(self): |
||
542 | client = get_client(collection_name) |
||
543 | client.compact(collection_name) |
||
544 | """ % weight |
||
545 | if _type == "query": |
||
546 | def_str = """ |
||
547 | @task(%d) |
||
548 | def query(self): |
||
549 | client = get_client(collection_name) |
||
550 | params = %s |
||
551 | X = [[random.random() for i in range(dim)] for i in range(params["nq"])] |
||
552 | client.query(X, params["top_k"], params["search_param"], collection_name=collection_name) |
||
553 | """ % (weight, task_type["params"]) |
||
554 | if _type == "insert": |
||
555 | def_str = """ |
||
556 | @task(%d) |
||
557 | def insert(self): |
||
558 | client = get_client(collection_name) |
||
559 | params = %s |
||
560 | ids = [random.randint(10, 1000000) for i in range(params["nb"])] |
||
561 | X = [[random.random() for i in range(dim)] for i in range(params["nb"])] |
||
562 | client.insert(X,ids=ids, collection_name=collection_name) |
||
563 | """ % (weight, task_type["params"]) |
||
564 | if _type == "delete": |
||
565 | def_str = """ |
||
566 | @task(%d) |
||
567 | def delete(self): |
||
568 | client = get_client(collection_name) |
||
569 | ids = [random.randint(1, 1000000) for i in range(1)] |
||
570 | client.delete(ids, collection_name) |
||
571 | """ % weight |
||
572 | def_strs += def_str |
||
573 | print(def_strs) |
||
574 | code_str = """ |
||
575 | import random |
||
576 | import json |
||
577 | from locust import User, task, between |
||
578 | from locust_task import MilvusTask |
||
579 | from client import MilvusClient |
||
580 | |||
581 | host = '%s' |
||
582 | port = %s |
||
583 | collection_name = '%s' |
||
584 | dim = %s |
||
585 | connection_type = '%s' |
||
586 | m = MilvusClient(host=host, port=port) |
||
587 | |||
588 | |||
589 | def get_client(collection_name): |
||
590 | if connection_type == 'single': |
||
591 | return MilvusTask(m=m) |
||
592 | elif connection_type == 'multi': |
||
593 | return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name) |
||
594 | |||
595 | |||
596 | class MixTask(User): |
||
597 | wait_time = between(0.001, 0.002) |
||
598 | %s |
||
599 | """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs) |
||
600 | with open(task_file_script, "w+") as fd: |
||
601 | fd.write(code_str) |
||
602 | locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % ( |
||
603 | task_file_script, |
||
604 | task_file, |
||
605 | clients_num, |
||
606 | hatch_rate, |
||
607 | during_time) |
||
608 | logger.info(locust_cmd) |
||
609 | try: |
||
610 | res = os.system(locust_cmd) |
||
611 | except Exception as e: |
||
612 | logger.error(str(e)) |
||
613 | return |
||
614 | |||
615 | # . retrieve and collect test statistics |
||
616 | metric = None |
||
617 | with open(task_file_csv, newline='') as fd: |
||
618 | dr = csv.DictReader(fd) |
||
619 | for row in dr: |
||
620 | if row["Name"] != "Aggregated": |
||
621 | continue |
||
622 | metric = row |
||
623 | logger.info(metric) |
||
624 | |||
625 | else: |
||
626 | logger.warning("Run type not defined") |
||
627 | return |
||
628 | logger.debug("Test finished") |
||
629 |