milvus_benchmark.local_runner.LocalRunner.run()   F
last analyzed

Complexity

Conditions 77

Size

Total Lines 596
Code Lines 444

Duplication

Lines 18
Ratio 3.02 %

Importance

Changes 0
Metric Value
cc 77
eloc 444
nop 3
dl 18
loc 596
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like milvus_benchmark.local_runner.LocalRunner.run() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import logging
3
import pdb
4
import time
5
import random
6
import string
7
import json
8
import csv
9
from multiprocessing import Process
10
import numpy as np
11
import concurrent.futures
12
from client import MilvusClient
13
import utils
14
import parser
15
from runner import Runner
16
17
DELETE_INTERVAL_TIME = 5
18
INSERT_INTERVAL = 50000
19
logger = logging.getLogger("milvus_benchmark.local_runner")
20
21
22
class LocalRunner(Runner):
23
    def __init__(self, host, port):
24
        """
25
        Run tests at local mode.
26
        
27
        Make sure the server has started
28
        """
29
        super(LocalRunner, self).__init__()
30
        self.host = host
31
        self.port = port
32
33
    def run(self, run_type, collection):
34
        logger.debug(run_type)
35
        logger.debug(collection)
36
        collection_name = collection["collection_name"] if "collection_name" in collection else None
37
        milvus_instance = MilvusClient(collection_name=collection_name, host=self.host, port=self.port)
38
        logger.info(milvus_instance.show_collections())
39
        env_value = milvus_instance.get_server_config()
40
        logger.debug(env_value)
41
42
        if run_type in ["insert_performance", "insert_flush_performance"]:
43
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
44
            ni_per = collection["ni_per"]
45
            build_index = collection["build_index"]
46
            if milvus_instance.exists_collection():
47
                milvus_instance.drop()
48
                time.sleep(10)
49
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
50
            if build_index is True:
51
                index_type = collection["index_type"]
52
                index_param = collection["index_param"]
53
                milvus_instance.create_index(index_type, index_param)
54
                logger.debug(milvus_instance.describe_index())
55
            res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
56
            milvus_instance.flush()
57
            logger.debug("Table row counts: %d" % milvus_instance.count())
58
            if build_index is True:
59
                logger.debug("Start build index for last file")
60
                milvus_instance.create_index(index_type, index_param)
0 ignored issues
show
introduced by
The variable index_param does not seem to be defined in case build_index is True on line 50 is False. Are you sure this can never be the case?
Loading history...
introduced by
The variable index_type does not seem to be defined in case build_index is True on line 50 is False. Are you sure this can never be the case?
Loading history...
61
                logger.debug(milvus_instance.describe_index())
62
63
        elif run_type == "delete_performance":
64
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
65
            ni_per = collection["ni_per"]
66
            if not milvus_instance.exists_collection():
67
                logger.error(milvus_instance.show_collections())
68
                logger.warning("Table: %s not found" % collection_name)
69
                return
70
            length = milvus_instance.count()
71
            ids = [i for i in range(length)] 
72
            loops = int(length / ni_per)
73
            for i in range(loops):
74
                delete_ids = ids[i*ni_per : i*ni_per+ni_per]
75
                logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1]))
76
                milvus_instance.delete(delete_ids)
77
                milvus_instance.flush()
78
                logger.debug("Table row counts: %d" % milvus_instance.count())
79
            logger.debug("Table row counts: %d" % milvus_instance.count())
80
            milvus_instance.flush()
81
            logger.debug("Table row counts: %d" % milvus_instance.count())
82
83
        elif run_type == "build_performance":
84
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
85
            index_type = collection["index_type"]
86
            index_param = collection["index_param"]
87
            if not milvus_instance.exists_collection():
88
                logger.error("Table name: %s not existed" % collection_name)
89
                return
90
            search_params = {}
91
            start_time = time.time()
92
            # drop index
93
            logger.debug("Drop index")
94
            milvus_instance.drop_index()
95
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
96
            milvus_instance.create_index(index_type, index_param)
97
            logger.debug(milvus_instance.describe_index())
98
            logger.debug("Table row counts: %d" % milvus_instance.count())
99
            end_time = time.time()
100
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
101
            logger.debug("Diff memory: %s, current memory usage: %s, build time: %s" % ((end_mem_usage - start_mem_usage), end_mem_usage, round(end_time - start_time, 1)))
102
103
        elif run_type == "search_performance":
104
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
105
            run_count = collection["run_count"]
106
            top_ks = collection["top_ks"]
107
            nqs = collection["nqs"]
108
            search_params = collection["search_params"]
109
            # for debugging
110
            # time.sleep(3600)
111
            if not milvus_instance.exists_collection():
112
                logger.error("Table name: %s not existed" % collection_name)
113
                return
114
            logger.info(milvus_instance.count())
115
            result = milvus_instance.describe_index()
116
            logger.info(result)
117
            milvus_instance.preload_collection()
118
            mem_usage = milvus_instance.get_mem_info()["memory_used"]
119
            logger.info(mem_usage)
120
            for search_param in search_params:
121
                logger.info("Search param: %s" % json.dumps(search_param))
122
                res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param)
123
                headers = ["Nq/Top-k"]
124
                headers.extend([str(top_k) for top_k in top_ks])
125
                logger.info("Search param: %s" % json.dumps(search_param))
126
                utils.print_table(headers, nqs, res)
127
                mem_usage = milvus_instance.get_mem_info()["memory_used"]
128
                logger.info(mem_usage)
129
130
        elif run_type == "locust_search_performance":
131
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
132
            ### spawn locust requests
133
            collection_num = collection["collection_num"]
134
            task = collection["task"]
135
            #. generate task code
136
            task_file = utils.get_unique_name()
137
            task_file_script = task_file+'.py'
138
            task_file_csv = task_file+'_stats.csv'
139
            task_type = task["type"]
140
            connection_type = "single"
141
            connection_num = task["connection_num"]
142
            if connection_num > 1:
143
                connection_type = "multi"
144
            clients_num = task["clients_num"]
145
            hatch_rate = task["hatch_rate"]
146
            during_time = task["during_time"]
147
            def_name = task_type
148
            task_params = task["params"]
149
            collection_names = []
150
            for i in range(collection_num):
151
                suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
152
                collection_names.append(collection_name + "_" + suffix)
153
            # collection_names = ['sift_1m_1024_128_l2_Kg6co', 'sift_1m_1024_128_l2_egkBK', 'sift_1m_1024_128_l2_D0wtE',
154
            #                     'sift_1m_1024_128_l2_9naps', 'sift_1m_1024_128_l2_iJ0jj', 'sift_1m_1024_128_l2_nqUTm',
155
            #                     'sift_1m_1024_128_l2_GIF0D', 'sift_1m_1024_128_l2_EL2qk', 'sift_1m_1024_128_l2_qLRnC',
156
            #                     'sift_1m_1024_128_l2_8Ditg']
157
            # #####
158
            ni_per = collection["ni_per"]
159
            build_index = collection["build_index"]
160
            for c_name in collection_names:
161
                milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port)
162
                if milvus_instance.exists_collection(collection_name=c_name):
163
                    milvus_instance.drop(name=c_name)
164
                    time.sleep(10)
165
                milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type)
166
                if build_index is True:
167
                    index_type = collection["index_type"]
168
                    index_param = collection["index_param"]
169
                    milvus_instance.create_index(index_type, index_param)
170
                    logger.debug(milvus_instance.describe_index())
171
                res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per)
172
                milvus_instance.flush()
173
                logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name))
174
                if build_index is True:
175
                    logger.debug("Start build index for last file")
176
                    milvus_instance.create_index(index_type, index_param)
177
                    logger.debug(milvus_instance.describe_index())
178
            code_str = """
179
import random
180
import string
181
from locust import User, task, between
182
from locust_task import MilvusTask
183
from client import MilvusClient
184
185
host = '%s'
186
port = %s
187
dim = %s
188
connection_type = '%s'
189
collection_names = %s
190
m = MilvusClient(host=host, port=port)
191
192
193
def get_collection_name():
194
    return random.choice(collection_names)
195
    
196
    
197
def get_client(collection_name):
198
    if connection_type == 'single':
199
        return MilvusTask(m=m)
200
    elif connection_type == 'multi':
201
        return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
202
        
203
204
class QueryTask(User):
205
    wait_time = between(0.001, 0.002)
206
        
207
    @task()
208
    def %s(self):
209
        top_k = %s
210
        X = [[random.random() for i in range(dim)] for i in range(%s)]
211
        search_param = %s
212
        collection_name = get_collection_name()
213
        print(collection_name)
214
        client = get_client(collection_name)
215
        client.query(X, top_k, search_param, collection_name=collection_name)
216
            """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"])
217
            with open(task_file_script, 'w+') as fd:
218
                fd.write(code_str)
219
            locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
220
                    task_file_script,
221
                    task_file,
222
                    clients_num,
223
                    hatch_rate,
224
                    during_time)
225
            logger.info(locust_cmd)
226
            try:
227
                res = os.system(locust_cmd)
228
            except Exception as e:
229
                logger.error(str(e))
230
                return
231
            #. retrieve and collect test statistics
232
            metric = None
233
            with open(task_file_csv, newline='') as fd:
234
                dr = csv.DictReader(fd)
235
                for row in dr:
236
                    if row["Name"] != "Aggregated":
237
                        continue
238
                    metric = row
239
            logger.info(metric)
240
            # clean up temp files
241
242
        elif run_type == "search_ids_stability":
243
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
244
            search_params = collection["search_params"]
245
            during_time = collection["during_time"]
246
            ids_length = collection["ids_length"]
247
            ids = collection["ids"]
248
            logger.info(milvus_instance.count())
249
            index_info = milvus_instance.describe_index()
250
            logger.info(index_info)
251
            g_top_k = int(collection["top_ks"].split("-")[1])
252
            l_top_k = int(collection["top_ks"].split("-")[0])
253
            g_id = int(ids.split("-")[1])
254
            l_id = int(ids.split("-")[0])
255
            g_id_length = int(ids_length.split("-")[1])
256
            l_id_length = int(ids_length.split("-")[0])
257
258
            milvus_instance.preload_collection()
259
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
260
            logger.debug(start_mem_usage)
261
            start_time = time.time()
262
            while time.time() < start_time + during_time * 60:
263
                search_param = {}
264
                top_k = random.randint(l_top_k, g_top_k)
265
                ids_num = random.randint(l_id_length, g_id_length)
266
                l_ids = random.randint(l_id, g_id-ids_num)
267
                # ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)]
268
                ids_param = [id for id in range(l_ids, l_ids+ids_num)]
269
                for k, v in search_params.items():
270
                    search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1]))
271
                logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param)))
272
                result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param)
273
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
274
            metrics = {
275
                "during_time": during_time,
276
                "start_mem_usage": start_mem_usage,
277
                "end_mem_usage": end_mem_usage,
278
                "diff_mem": end_mem_usage - start_mem_usage,
279
            }
280
            logger.info(metrics)
281
282
        elif run_type == "search_performance_concurrents":
283
            data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name)
284
            hdf5_source_file = collection["source_file"]
285
            use_single_connection = collection["use_single_connection"]
286
            concurrents = collection["concurrents"]
287
            top_ks = collection["top_ks"]
288
            nqs = collection["nqs"]
289
            search_params = self.generate_combinations(collection["search_params"])
290
            if not milvus_instance.exists_collection():
291
                logger.error("Table name: %s not existed" % collection_name)
292
                return
293
            logger.info(milvus_instance.count())
294
            result = milvus_instance.describe_index()
295
            logger.info(result)
296
            milvus_instance.preload_collection()
297
            dataset = utils.get_dataset(hdf5_source_file)
298
            for concurrent_num in concurrents:
299
                top_k = top_ks[0] 
300
                for nq in nqs:
301
                    mem_usage = milvus_instance.get_mem_info()["memory_used"]
302
                    logger.info(mem_usage)
303
                    query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq]))
304
                    logger.debug(search_params)
305
                    for search_param in search_params:
306
                        logger.info("Search param: %s" % json.dumps(search_param))
307
                        total_time = 0.0
308
                        if use_single_connection is True:
309
                            connections = [MilvusClient(collection_name=collection_name, host=self.host, port=self.port)]
310
                            with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
311
                                future_results = {executor.submit(
312
                                    self.do_query_qps, connections[0], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)}
313
                        else:
314
                            connections = [MilvusClient(collection_name=collection_name, host=self.hos, port=self.port) for i in range(concurrent_num)]
315
                            with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
316
                                future_results = {executor.submit(
317
                                    self.do_query_qps, connections[index], query_vectors, top_k, search_param=search_param) : index for index in range(concurrent_num)}
318
                        for future in concurrent.futures.as_completed(future_results):
319
                            total_time = total_time + future.result()
320
                        qps_value = total_time / concurrent_num 
321
                        logger.debug("QPS value: %f, total_time: %f, request_nums: %f" % (qps_value, total_time, concurrent_num))
322
                    mem_usage = milvus_instance.get_mem_info()["memory_used"]
323
                    logger.info(mem_usage)
324
325
        elif run_type == "ann_accuracy":
326
            hdf5_source_file = collection["source_file"]
327
            collection_name = collection["collection_name"]
328
            index_file_sizes = collection["index_file_sizes"]
329
            index_types = collection["index_types"]
330
            index_params = collection["index_params"]
331
            top_ks = collection["top_ks"]
332
            nqs = collection["nqs"]
333
            search_params = collection["search_params"]
334
            # mapping to search param list
335
            search_params = self.generate_combinations(search_params)
336
            # mapping to index param list
337
            index_params = self.generate_combinations(index_params)
338
339
            data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name)
340
            dataset = utils.get_dataset(hdf5_source_file)
341
            if milvus_instance.exists_collection(collection_name):
342
                logger.info("Re-create collection: %s" % collection_name)
343
                milvus_instance.drop()
344
                time.sleep(DELETE_INTERVAL_TIME)
345
            true_ids = np.array(dataset["neighbors"])
346
            for index_file_size in index_file_sizes:
347
                milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
348
                logger.info(milvus_instance.describe())
349
                insert_vectors = self.normalize(metric_type, np.array(dataset["train"]))
350
                logger.debug(len(insert_vectors))
351
                # Insert batch once
352
                # milvus_instance.insert(insert_vectors)
353
                loops = len(insert_vectors) // INSERT_INTERVAL + 1
354
                for i in range(loops):
355
                    start = i*INSERT_INTERVAL
356
                    end = min((i+1)*INSERT_INTERVAL, len(insert_vectors))
357
                    tmp_vectors = insert_vectors[start:end]
358
                    if start < end:
359
                        if not isinstance(tmp_vectors, list):
360
                            milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)])
361
                        else:
362
                            milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)])
363
                    milvus_instance.flush()
364
                logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count()))
365
                if milvus_instance.count() != len(insert_vectors):
366
                    logger.error("Table row count is not equal to insert vectors")
367
                    return
368
                for index_type in index_types:
369
                    for index_param in index_params:
370
                        logger.debug("Building index with param: %s" % json.dumps(index_param))
371
                        milvus_instance.create_index(index_type, index_param=index_param)
372
                        logger.info(milvus_instance.describe_index())
373
                        logger.info("Start preload collection: %s" % collection_name)
374
                        milvus_instance.preload_collection()
375
                        for search_param in search_params:
376
                            for nq in nqs:
377
                                query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq]))
378
                                for top_k in top_ks:
379
                                    logger.debug("Search nq: %d, top-k: %d, search_param: %s" % (nq, top_k, json.dumps(search_param)))
380
                                    if not isinstance(query_vectors, list):
381
                                        result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param)
382
                                    else:
383
                                        result = milvus_instance.query(query_vectors, top_k, search_param=search_param)
384
                                    result_ids = result.id_array
385
                                    acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids)
386
                                    logger.info("Query ann_accuracy: %s" % acc_value)
387
388
389
        elif run_type == "stability":
390
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
391
            search_params = collection["search_params"]
392
            insert_xb = collection["insert_xb"]
393
            insert_interval = collection["insert_interval"]
394
            delete_xb = collection["delete_xb"]
395
            # flush_interval = collection["flush_interval"]
396
            # compact_interval = collection["compact_interval"]
397
            during_time = collection["during_time"]
398
            if not milvus_instance.exists_collection():
399
                logger.error(milvus_instance.show_collections())
400
                logger.error("Table name: %s not existed" % collection_name)
401
                return
402
            g_top_k = int(collection["top_ks"].split("-")[1])
403
            g_nq = int(collection["nqs"].split("-")[1])
404
            l_top_k = int(collection["top_ks"].split("-")[0])
405
            l_nq = int(collection["nqs"].split("-")[0])
406
            milvus_instance.preload_collection()
407
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
408
            start_row_count = milvus_instance.count()
409
            logger.debug(milvus_instance.describe_index())
410
            logger.info(start_row_count)
411
            start_time = time.time()
412
            i = 0
413
            ids = []
414
            insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
415
            query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)]
416 View Code Duplication
            while time.time() < start_time + during_time * 60:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
417
                i = i + 1
418
                for _ in range(insert_interval):
419
                    top_k = random.randint(l_top_k, g_top_k)
420
                    nq = random.randint(l_nq, g_nq)
421
                    search_param = {}
422
                    for k, v in search_params.items():
423
                        search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1]))
424
                    logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param)))
425
                    result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param)
426
                count = milvus_instance.count()
427
                insert_ids = [(count+x) for x in range(len(insert_vectors))]
428
                ids.extend(insert_ids)
429
                _, res = milvus_instance.insert(insert_vectors, ids=insert_ids)
430
                logger.debug("%d, row_count: %d" % (i, milvus_instance.count()))
431
                milvus_instance.delete(ids[-delete_xb:])
432
                milvus_instance.flush()
433
                milvus_instance.compact()
434
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
435
            end_row_count = milvus_instance.count()
436
            metrics = {
437
                "during_time": during_time,
438
                "start_mem_usage": start_mem_usage,
439
                "end_mem_usage": end_mem_usage,
440
                "diff_mem": end_mem_usage - start_mem_usage,
441
                "row_count_increments": end_row_count - start_row_count
442
            }
443
            logger.info(metrics)
444
445
        elif run_type == "loop_stability":
446
            # init data
447
            milvus_instance.clean_db()
448
            pull_interval = collection["pull_interval"]
449
            pull_interval_seconds = pull_interval * 60
450
            collection_num = collection["collection_num"]
451
            dimension = collection["dimension"] if "dimension" in collection else 128
452
            insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000
453
            index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8']
454
            index_param = {"nlist": 2048}
455
            collection_names = []
456
            milvus_instances_map = {}
457
            insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
458
            for i in range(collection_num):
459
                name = utils.get_unique_name(prefix="collection_")
460
                collection_names.append(name)
461
                metric_type = random.choice(["l2", "ip"])
462
                index_file_size = random.randint(10, 20)
463
                milvus_instance.create_collection(name, dimension, index_file_size, metric_type)
464
                milvus_instance = MilvusClient(collection_name=name, host=self.host)
465
                index_type = random.choice(index_types)
466
                milvus_instance.create_index(index_type, index_param=index_param)
467
                logger.info(milvus_instance.describe_index())
468
                insert_vectors = utils.normalize(metric_type, insert_vectors)
469
                milvus_instance.insert(insert_vectors)
470
                milvus_instance.flush()
471
                milvus_instances_map.update({name: milvus_instance})
472
                logger.info(milvus_instance.describe_index())
473
                logger.info(milvus_instance.describe())
474
475
            tasks = ["insert_rand", "delete_rand", "query_rand", "flush"]
476
            i = 1
477
            while True:
478
                logger.info("Loop time: %d" % i)
479
                start_time = time.time()
480
                while time.time() - start_time < pull_interval_seconds:
481
                    # choose collection
482
                    tmp_collection_name = random.choice(collection_names)
483
                    # choose task from task
484
                    task_name = random.choice(tasks)
485
                    logger.info(tmp_collection_name)
486
                    logger.info(task_name)
487
                    # execute task
488
                    task_run = getattr(milvus_instances_map[tmp_collection_name], task_name)
489
                    task_run()
490
                # new connection
491
                for name in collection_names:
492
                    milvus_instance = MilvusClient(collection_name=name, host=self.host)
493
                    milvus_instances_map.update({name: milvus_instance})
494
                i = i + 1
495
        elif run_type == "locust_mix_performance":
496
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
497
            # ni_per = collection["ni_per"]
498
            # build_index = collection["build_index"]
499
            # if milvus_instance.exists_collection():
500
            #     milvus_instance.drop()
501
            #     time.sleep(10)
502
            # milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
503
            # if build_index is True:
504
            #     index_type = collection["index_type"]
505
            #     index_param = collection["index_param"]
506
            #     milvus_instance.create_index(index_type, index_param)
507
            #     logger.debug(milvus_instance.describe_index())
508
            # res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
509
            # milvus_instance.flush()
510
            # logger.debug("Table row counts: %d" % milvus_instance.count())
511
            # if build_index is True:
512
            #     logger.debug("Start build index for last file")
513
            #     milvus_instance.create_index(index_type, index_param)
514
            #     logger.debug(milvus_instance.describe_index())
515
            task = collection["tasks"]
516
            task_file = utils.get_unique_name()
517
            task_file_script = task_file + '.py'
518
            task_file_csv = task_file + '_stats.csv'
519
            task_types = task["types"]
520
            connection_type = "single"
521
            connection_num = task["connection_num"]
522
            if connection_num > 1:
523
                connection_type = "multi"
524
            clients_num = task["clients_num"]
525
            hatch_rate = task["hatch_rate"]
526
            during_time = task["during_time"]
527
            def_strs = ""
528
            for task_type in task_types:
529
                _type = task_type["type"]
530
                weight = task_type["weight"]
531
                if _type == "flush":
532
                    def_str = """
533
    @task(%d)
534
    def flush(self):
535
        client = get_client(collection_name)
536
        client.flush(collection_name=collection_name)
537
                    """ % weight
538
                if _type == "compact":
539
                    def_str = """
540
    @task(%d)
541
    def compact(self):
542
        client = get_client(collection_name)
543
        client.compact(collection_name)
544
                    """ % weight
545
                if _type == "query":
546
                    def_str = """
547
    @task(%d)
548
    def query(self):
549
        client = get_client(collection_name)
550
        params = %s
551
        X = [[random.random() for i in range(dim)] for i in range(params["nq"])]
552
        client.query(X, params["top_k"], params["search_param"], collection_name=collection_name)
553
                    """ % (weight, task_type["params"])
554
                if _type == "insert":
555
                    def_str = """
556
    @task(%d)
557
    def insert(self):
558
        client = get_client(collection_name)
559
        params = %s
560
        ids = [random.randint(10, 1000000) for i in range(params["nb"])]
561
        X = [[random.random() for i in range(dim)] for i in range(params["nb"])]
562
        client.insert(X,ids=ids, collection_name=collection_name)
563
                """ % (weight, task_type["params"])
564
                if _type == "delete":
565
                    def_str = """
566
    @task(%d)
567
    def delete(self):
568
        client = get_client(collection_name)
569
        ids = [random.randint(1, 1000000) for i in range(1)]
570
        client.delete(ids, collection_name)
571
                    """ % weight
572
                def_strs += def_str
0 ignored issues
show
introduced by
The variable def_str does not seem to be defined for all execution paths.
Loading history...
573
                print(def_strs)
574
                code_str = """
575
import random
576
import json
577
from locust import User, task, between
578
from locust_task import MilvusTask
579
from client import MilvusClient
580
581
host = '%s'
582
port = %s
583
collection_name = '%s'
584
dim = %s
585
connection_type = '%s'
586
m = MilvusClient(host=host, port=port)
587
588
589
def get_client(collection_name):
590
    if connection_type == 'single':
591
        return MilvusTask(m=m)
592
    elif connection_type == 'multi':
593
        return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
594
        
595
        
596
class MixTask(User):
597
    wait_time = between(0.001, 0.002)
598
    %s
599
    """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs)
600
            with open(task_file_script, "w+") as fd:
601
                fd.write(code_str)
0 ignored issues
show
introduced by
The variable code_str does not seem to be defined in case the for loop on line 528 is not entered. Are you sure this can never be the case?
Loading history...
602
            locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
603
                task_file_script,
604
                task_file,
605
                clients_num,
606
                hatch_rate,
607
                during_time)
608
            logger.info(locust_cmd)
609
            try:
610
                res = os.system(locust_cmd)
611
            except Exception as e:
612
                logger.error(str(e))
613
                return
614
615
            # . retrieve and collect test statistics
616
            metric = None
617
            with open(task_file_csv, newline='') as fd:
618
                dr = csv.DictReader(fd)
619
                for row in dr:
620
                    if row["Name"] != "Aggregated":
621
                        continue
622
                    metric = row
623
            logger.info(metric)
624
625
        else:
626
            logger.warning("Run type not defined")
627
            return
628
        logger.debug("Test finished")
629