milvus_benchmark.docker_runner   F
last analyzed

Complexity

Total Complexity 64

Size/Duplication

Total Lines 366
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 270
dl 0
loc 366
rs 3.28
c 0
b 0
f 0
wmc 64

2 Methods

Rating   Name   Duplication   Size   Complexity  
A DockerRunner.__init__() 0 3 1
F DockerRunner.run() 0 344 63

How to fix   Complexity   

Complexity

Complex classes like milvus_benchmark.docker_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import os
2
import logging
3
import pdb
4
import time
5
import random
6
from multiprocessing import Process
7
import numpy as np
8
from client import MilvusClient
9
import utils
10
import parser
11
from runner import Runner
12
13
logger = logging.getLogger("milvus_benchmark.docker")
14
15
16
class DockerRunner(Runner):
17
    """run docker mode"""
18
    def __init__(self, image):
19
        super(DockerRunner, self).__init__()
20
        self.image = image
21
        
22
    def run(self, definition, run_type=None):
23
        if run_type == "performance":
24
            for op_type, op_value in definition.items():
25
                # run docker mode
26
                run_count = op_value["run_count"]
27
                run_params = op_value["params"]
28
                container = None
29
                
30
                if op_type == "insert":
31
                    if not run_params:
32
                        logger.debug("No run params")
33
                        continue
34
                    for index, param in enumerate(run_params):
35
                        logger.info("Definition param: %s" % str(param))
36
                        collection_name = param["collection_name"]
37
                        volume_name = param["db_path_prefix"]
38
                        print(collection_name)
39
                        (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
40
                        for k, v in param.items():
41
                            if k.startswith("server."):
42
                                # Update server config
43
                                utils.modify_config(k, v, type="server", db_slave=None)
44
                        container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
45
                        time.sleep(2)
46
                        milvus = MilvusClient(collection_name)
47
                        # Check has collection or not
48
                        if milvus.exists_collection():
49
                            milvus.delete()
50
                            time.sleep(10)
51
                        milvus.create_collection(collection_name, dimension, index_file_size, metric_type)
52
                        # debug
53
                        # milvus.create_index("ivf_sq8", 16384)
54
                        res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"])
55
                        logger.info(res)
56
                        # wait for file merge
57
                        time.sleep(collection_size * dimension / 5000000)
58
                        # Clear up
59
                        utils.remove_container(container)
60
61
                elif op_type == "query":
62
                    for index, param in enumerate(run_params):
63
                        logger.info("Definition param: %s" % str(param))
64
                        collection_name = param["dataset"]
65
                        volume_name = param["db_path_prefix"]
66
                        (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
67
                        for k, v in param.items():
68
                            if k.startswith("server."):                   
69
                                utils.modify_config(k, v, type="server")
70
                        container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
71
                        time.sleep(2)
72
                        milvus = MilvusClient(collection_name)
73
                        logger.debug(milvus.show_collections())
74
                        # Check has collection or not
75
                        if not milvus.exists_collection():
76
                            logger.warning("Table %s not existed, continue exec next params ..." % collection_name)
77
                            continue
78
                        # parse index info
79
                        index_types = param["index.index_types"]
80
                        nlists = param["index.nlists"]
81
                        # parse top-k, nq, nprobe
82
                        top_ks, nqs, nprobes = parser.search_params_parser(param)
83
                        for index_type in index_types:
84
                            for nlist in nlists:
85
                                result = milvus.describe_index()
86
                                logger.info(result)
87
                                # milvus.drop_index()
88
                                # milvus.create_index(index_type, nlist)
89
                                result = milvus.describe_index()
90
                                logger.info(result)
91
                                logger.info(milvus.count())
92
                                # preload index
93
                                milvus.preload_collection()
94
                                logger.info("Start warm up query")
95
                                res = self.do_query(milvus, collection_name, [1], [1], 1, 1)
96
                                logger.info("End warm up query")
97
                                # Run query test
98
                                for nprobe in nprobes:
99
                                    logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe))
100
                                    res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count)
101
                                    headers = ["Nq/Top-k"]
102
                                    headers.extend([str(top_k) for top_k in top_ks])
103
                                    utils.print_collection(headers, nqs, res)
104
                        utils.remove_container(container)
105
106
        elif run_type == "insert_performance":
107
            for op_type, op_value in definition.items():
108
                # run docker mode
109
                run_count = op_value["run_count"]
110
                run_params = op_value["params"]
111
                container = None
112
                if not run_params:
113
                    logger.debug("No run params")
114
                    continue
115
                for index, param in enumerate(run_params):
116
                    logger.info("Definition param: %s" % str(param))
117
                    collection_name = param["collection_name"]
118
                    volume_name = param["db_path_prefix"]
119
                    print(collection_name)
120
                    (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
121
                    for k, v in param.items():
122
                        if k.startswith("server."):
123
                            # Update server config
124
                            utils.modify_config(k, v, type="server", db_slave=None)
125
                    container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
126
                    time.sleep(2)
127
                    milvus = MilvusClient(collection_name)
128
                    # Check has collection or not
129
                    if milvus.exists_collection():
130
                        milvus.delete()
131
                        time.sleep(10)
132
                    milvus.create_collection(collection_name, dimension, index_file_size, metric_type)
133
                    # debug
134
                    # milvus.create_index("ivf_sq8", 16384)
135
                    res = self.do_insert(milvus, collection_name, data_type, dimension, collection_size, param["ni_per"])
136
                    logger.info(res)
137
                    # wait for file merge
138
                    time.sleep(collection_size * dimension / 5000000)
139
                    # Clear up
140
                    utils.remove_container(container)
141
142
        elif run_type == "search_performance":
143
            for op_type, op_value in definition.items():
144
                # run docker mode
145
                run_count = op_value["run_count"]
146
                run_params = op_value["params"]
147
                container = None
148
                for index, param in enumerate(run_params):
149
                    logger.info("Definition param: %s" % str(param))
150
                    collection_name = param["dataset"]
151
                    volume_name = param["db_path_prefix"]
152
                    (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
153
                    for k, v in param.items():
154
                        if k.startswith("server."):                   
155
                            utils.modify_config(k, v, type="server")
156
                    container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
157
                    time.sleep(2)
158
                    milvus = MilvusClient(collection_name)
159
                    logger.debug(milvus.show_collections())
160
                    # Check has collection or not
161
                    if not milvus.exists_collection():
162
                        logger.warning("Table %s not existed, continue exec next params ..." % collection_name)
163
                        continue
164
                    # parse index info
165
                    index_types = param["index.index_types"]
166
                    nlists = param["index.nlists"]
167
                    # parse top-k, nq, nprobe
168
                    top_ks, nqs, nprobes = parser.search_params_parser(param)
169
                    for index_type in index_types:
170
                        for nlist in nlists:
171
                            result = milvus.describe_index()
172
                            logger.info(result)
173
                            # milvus.drop_index()
174
                            # milvus.create_index(index_type, nlist)
175
                            result = milvus.describe_index()
176
                            logger.info(result)
177
                            logger.info(milvus.count())
178
                            # preload index
179
                            milvus.preload_collection()
180
                            logger.info("Start warm up query")
181
                            res = self.do_query(milvus, collection_name, [1], [1], 1, 1)
182
                            logger.info("End warm up query")
183
                            # Run query test
184
                            for nprobe in nprobes:
185
                                logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe))
186
                                res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count)
187
                                headers = ["Nq/Top-k"]
188
                                headers.extend([str(top_k) for top_k in top_ks])
189
                                utils.print_collection(headers, nqs, res)
190
                    utils.remove_container(container)
191
192
        elif run_type == "accuracy":
193
            """
194
            {
195
                "dataset": "random_50m_1024_512", 
196
                "index.index_types": ["flat", ivf_flat", "ivf_sq8"],
197
                "index.nlists": [16384],
198
                "nprobes": [1, 32, 128], 
199
                "nqs": [100],
200
                "top_ks": [1, 64], 
201
                "server.use_blas_threshold": 1100, 
202
                "server.cpu_cache_capacity": 256
203
            }
204
            """
205
            for op_type, op_value in definition.items():
206
                if op_type != "query":
207
                    logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type)
208
                    break
209
                run_count = op_value["run_count"]
210
                run_params = op_value["params"]
211
                container = None
212
213
                for index, param in enumerate(run_params):
214
                    logger.info("Definition param: %s" % str(param))
215
                    collection_name = param["dataset"]
216
                    sift_acc = False
217
                    if "sift_acc" in param:
218
                        sift_acc = param["sift_acc"]
219
                    (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
220
                    for k, v in param.items():
221
                        if k.startswith("server."):                   
222
                            utils.modify_config(k, v, type="server")
223
                    volume_name = param["db_path_prefix"]
224
                    container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
225
                    time.sleep(2)
226
                    milvus = MilvusClient(collection_name)
227
                    # Check has collection or not
228
                    if not milvus.exists_collection():
229
                        logger.warning("Table %s not existed, continue exec next params ..." % collection_name)
230
                        continue
231
232
                    # parse index info
233
                    index_types = param["index.index_types"]
234
                    nlists = param["index.nlists"]
235
                    # parse top-k, nq, nprobe
236
                    top_ks, nqs, nprobes = parser.search_params_parser(param)
237
                    if sift_acc is True:
238
                        # preload groundtruth data
239
                        true_ids_all = self.get_groundtruth_ids(collection_size)
240
                    acc_dict = {}
241
                    for index_type in index_types:
242
                        for nlist in nlists:
243
                            result = milvus.describe_index()
244
                            logger.info(result)
245
                            milvus.create_index(index_type, nlist)
246
                            # preload index
247
                            milvus.preload_collection()
248
                            # Run query test
249
                            for nprobe in nprobes:
250
                                logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe))
251
                                for top_k in top_ks:
252
                                    for nq in nqs:
253
                                        result_ids = []
254
                                        id_prefix = "%s_index_%s_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \
255
                                                    (collection_name, index_type, nlist, metric_type, nprobe, top_k, nq)
256
                                        if sift_acc is False:
257
                                            self.do_query_acc(milvus, collection_name, top_k, nq, nprobe, id_prefix)
258
                                            if index_type != "flat":
259
                                                # Compute accuracy
260
                                                base_name = "%s_index_flat_nlist_%s_metric_type_%s_nprobe_%s_top_k_%s_nq_%s" % \
261
                                                    (collection_name, nlist, metric_type, nprobe, top_k, nq)
262
                                                avg_acc = self.compute_accuracy(base_name, id_prefix)
263
                                                logger.info("Query: <%s> accuracy: %s" % (id_prefix, avg_acc))
264
                                        else:
265
                                            result_ids, result_distances = self.do_query_ids(milvus, collection_name, top_k, nq, nprobe)
266
                                            debug_file_ids = "0.5.3_result_ids"
267
                                            debug_file_distances = "0.5.3_result_distances"
268
                                            with open(debug_file_ids, "w+") as fd:
269
                                                total = 0
270
                                                for index, item in enumerate(result_ids):
271
                                                    true_item = true_ids_all[:nq, :top_k].tolist()[index]
0 ignored issues
show
introduced by
The variable true_ids_all does not seem to be defined for all execution paths.
Loading history...
272
                                                    tmp = set(item).intersection(set(true_item))
273
                                                    total = total + len(tmp)
274
                                                    fd.write("query: N-%d, intersection: %d, total: %d\n" % (index, len(tmp), total))
275
                                                    fd.write("%s\n" % str(item))
276
                                                    fd.write("%s\n" % str(true_item))
277
                                            acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids)
278
                                            logger.info("Query: <%s> accuracy: %s" % (id_prefix, acc_value))
279
                    # # print accuracy collection
280
                    # headers = [collection_name]
281
                    # headers.extend([str(top_k) for top_k in top_ks])
282
                    # utils.print_collection(headers, nqs, res)
283
284
                    # remove container, and run next definition
285
                    logger.info("remove container, and run next definition")
286
                    utils.remove_container(container)
287
288
        elif run_type == "stability":
289
            for op_type, op_value in definition.items():
290
                if op_type != "query":
291
                    logger.warning("invalid operation: %s in accuracy test, only support query operation" % op_type)
292
                    break
293
                run_count = op_value["run_count"]
294
                run_params = op_value["params"]
295
                container = None
296
                for index, param in enumerate(run_params):
297
                    logger.info("Definition param: %s" % str(param))
298
                    collection_name = param["dataset"]
299
                    index_type = param["index_type"]
300
                    volume_name = param["db_path_prefix"]
301
                    (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
302
                    
303
                    # set default test time
304
                    if "during_time" not in param:
305
                        during_time = 100 # seconds
306
                    else:
307
                        during_time = int(param["during_time"]) * 60
308
                    # set default query process num
309
                    if "query_process_num" not in param:
310
                        query_process_num = 10
311
                    else:
312
                        query_process_num = int(param["query_process_num"])
313
314
                    for k, v in param.items():
315
                        if k.startswith("server."):                   
316
                            utils.modify_config(k, v, type="server")
317
318
                    container = utils.run_server(self.image, test_type="remote", volume_name=volume_name, db_slave=None)
319
                    time.sleep(2)
320
                    milvus = MilvusClient(collection_name)
321
                    # Check has collection or not
322
                    if not milvus.exists_collection():
323
                        logger.warning("Table %s not existed, continue exec next params ..." % collection_name)
324
                        continue
325
326
                    start_time = time.time()
327
                    insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)]
328
                    i = 0
329
                    while time.time() < start_time + during_time:
330
                        i = i + 1
331
                        processes = []
332
                        # do query
333
                        # for i in range(query_process_num):
334
                        #     milvus_instance = MilvusClient(collection_name)
335
                        #     top_k = random.choice([x for x in range(1, 100)])
336
                        #     nq = random.choice([x for x in range(1, 100)])
337
                        #     nprobe = random.choice([x for x in range(1, 1000)])
338
                        #     # logger.info("index_type: %s, nlist: %s, metric_type: %s, nprobe: %s" % (index_type, nlist, metric_type, nprobe))
339
                        #     p = Process(target=self.do_query, args=(milvus_instance, collection_name, [top_k], [nq], [nprobe], run_count, ))
340
                        #     processes.append(p)
341
                        #     p.start()
342
                        #     time.sleep(0.1)
343
                        # for p in processes:
344
                        #     p.join()
345
                        milvus_instance = MilvusClient(collection_name)
346
                        top_ks = random.sample([x for x in range(1, 100)], 3)
347
                        nqs = random.sample([x for x in range(1, 1000)], 3)
348
                        nprobe = random.choice([x for x in range(1, 500)])
349
                        res = self.do_query(milvus, collection_name, top_ks, nqs, nprobe, run_count)
350
                        if i % 10 == 0:
351
                            status, res = milvus_instance.insert(insert_vectors, ids=[x for x in range(len(insert_vectors))])
352
                            if not status.OK():
353
                                logger.error(status)
354
                            # status = milvus_instance.drop_index()
355
                            # if not status.OK():
356
                            #     logger.error(status)
357
                            # index_type = random.choice(["flat", "ivf_flat", "ivf_sq8"])
358
                            milvus_instance.create_index(index_type, 16384)
359
                            result = milvus.describe_index()
360
                            logger.info(result)
361
                            # milvus_instance.create_index("ivf_sq8", 16384)
362
                    utils.remove_container(container)
363
364
        else:
365
            logger.warning("Run type: %s not supported" % run_type)
366
367