K8sRunner.report_wrapper()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 15
nop 8
dl 0
loc 18
rs 9.65
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import os
2
import logging
3
import pdb
4
import string
5
import time
6
import re
7
import random
8
import traceback
9
import json
10
import csv
11
from multiprocessing import Process
12
import numpy as np
13
from yaml import full_load, dump
14
from concurrent import futures
15
from client import MilvusClient
16
import parser
17
from runner import Runner
18
from milvus_metrics.api import report
19
from milvus_metrics.models import Env, Hardware, Server, Metric
20
import utils
21
22
logger = logging.getLogger("milvus_benchmark.k8s_runner")
23
namespace = "milvus"
24
default_port = 19530
25
DELETE_INTERVAL_TIME = 5
26
# INSERT_INTERVAL = 100000
27
INSERT_INTERVAL = 50000
28
timestamp = int(time.time())
29
default_path = "/var/lib/milvus"
30
31
32
class K8sRunner(Runner):
33
    def __init__(self):
34
        """
35
        Run with helm mode.
36
37
        Upload test result after tests finished
38
        """
39
        super(K8sRunner, self).__init__()
40
        self.service_name = utils.get_unique_name()
41
        self.host = None
42
        self.port = default_port
43
        self.hostname = None
44
        self.env_value = None
45
        
46
    def init_env(self, server_config, server_host, deploy_mode, image_type, image_tag):
47
        """
48
        Deploy start server with using helm and clean up env.
49
50
        If deploy or start failed
51
        """
52
        logger.debug("Tests run on server host:")
53
        logger.debug(server_host)
54
        self.hostname = server_host
55
        # update values
56
        helm_path = os.path.join(os.getcwd(), "../milvus-helm/charts/milvus")
57
        values_file_path = helm_path+"/values.yaml"
58
        if not os.path.exists(values_file_path):
59
            raise Exception("File %s not existed" % values_file_path)
60
        if server_config:
61
            utils.update_values(values_file_path, deploy_mode, server_host, server_config)
62
        try:
63
            logger.debug("Start install server")
64
            self.host = utils.helm_install_server(helm_path, deploy_mode, image_tag, image_type, self.service_name, namespace)
65
        except Exception as e:
66
            logger.error("Helm install server failed: %s" % (str(e)))
67
            logger.error(traceback.format_exc())
68
            logger.debug(server_config)
69
            self.clean_up()
70
            return False
71
        # for debugging
72
        if not self.host:
73
            logger.error("Helm install server failed")
74
            self.clean_up()
75
            return False
76
        return True
77
78
    def clean_up(self):
79
        """
80
        Stop server with using helm.
81
82
        """
83
        logger.debug("Start clean up: %s" % self.service_name)
84
        utils.helm_del_server(self.service_name, namespace)
85
86
    def report_wrapper(self, milvus_instance, env_value, hostname, collection_info, index_info, search_params, run_params=None):
87
        """
88
        upload test result
89
        """
90
        metric = Metric()
91
        metric.set_run_id(timestamp)
92
        metric.env = Env(env_value)
93
        metric.env.OMP_NUM_THREADS = 0
94
        metric.hardware = Hardware(name=hostname)
95
        server_version = milvus_instance.get_server_version()
96
        server_mode = milvus_instance.get_server_mode()
97
        commit = milvus_instance.get_server_commit()
98
        metric.server = Server(version=server_version, mode=server_mode, build_commit=commit)
99
        metric.collection = collection_info
100
        metric.index = index_info
101
        metric.search = search_params
102
        metric.run_params = run_params
103
        return metric
104
105
    def run(self, run_type, collection):
106
        """
107
        override runner.run
108
        """
109
        logger.debug(run_type)
110
        logger.debug(collection)
111
        collection_name = collection["collection_name"] if "collection_name" in collection else None
112
        milvus_instance = MilvusClient(collection_name=collection_name, host=self.host)
113
        self.env_value = milvus_instance.get_server_config()
114
115
        # ugly implemention
116
        # remove some parts of result before uploading results
117
        self.env_value.pop("logs")
118
        if milvus_instance.get_server_mode() == "CPU":
119
            if "gpu" in self.env_value:
120
                self.env_value.pop("gpu")
121
        elif "cache.enable" in self.env_value["gpu"]:
122
            self.env_value["gpu"].pop("cache.enable")
123
124
        self.env_value.pop("network")
125
126
        if run_type == "insert_performance":
127
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
128
            ni_per = collection["ni_per"]
129
            build_index = collection["build_index"]
130
            if milvus_instance.exists_collection():
131
                milvus_instance.drop()
132
                time.sleep(10)
133
            index_info = {}
134
            search_params = {}
135
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
136
            if build_index is True:
137
                index_type = collection["index_type"]
138
                index_param = collection["index_param"]
139
                index_info = {
140
                    "index_type": index_type,
141
                    "index_param": index_param
142
                }
143
                milvus_instance.create_index(index_type, index_param)
144
                logger.debug(milvus_instance.describe_index())
145
            res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
146
            logger.info(res)
147
            if "flush" in collection and collection["flush"] == "no":
148
                logger.debug("No manual flush")
149
            else:
150
                milvus_instance.flush()
151
                logger.debug(milvus_instance.count())
152
            collection_info = {
153
                "dimension": dimension,
154
                "metric_type": metric_type,
155
                "dataset_name": collection_name
156
            }
157
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
158
            metric.metrics = {
159
                "type": run_type,
160
                "value": {
161
                    "total_time": res["total_time"],
162
                    "qps": res["qps"],
163
                    "ni_time": res["ni_time"]
164
                } 
165
            }
166
            report(metric)
167
            if build_index is True:
168
                logger.debug("Start build index for last file")
169
                milvus_instance.create_index(index_type, index_param)
0 ignored issues
show
introduced by
The variable index_param does not seem to be defined in case build_index is True on line 136 is False. Are you sure this can never be the case?
Loading history...
introduced by
The variable index_type does not seem to be defined in case build_index is True on line 136 is False. Are you sure this can never be the case?
Loading history...
170
                logger.debug(milvus_instance.describe_index())
171
172
        elif run_type == "insert_debug_performance":
173
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
174
            ni_per = collection["ni_per"]
175
            if milvus_instance.exists_collection():
176
                milvus_instance.drop()
177
                time.sleep(10)
178
            index_info = {}
179
            search_params = {}
180
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
181
            insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(ni_per)]
182
            start_time = time.time()
183
            i = 0
184
            while time.time() < start_time + 2 * 24 * 3600:
185
                i = i + 1
186
                logger.debug(i)
187
                logger.debug("Row count: %d" % milvus_instance.count())
188
                milvus_instance.insert(insert_vectors)
189
                time.sleep(0.1)
190
191
        elif run_type == "insert_performance_multi_collections":
192
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
193
            ni_per = collection["ni_per"]
194
            build_index = collection["build_index"]
195
            if milvus_instance.exists_collection():
196
                milvus_instance.drop()
197
                time.sleep(10)
198
            index_info = {}
199
            search_params = {}
200
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
201
            if build_index is True:
202
                index_type = collection["index_type"]
203
                index_param = collection["index_param"]
204
                index_info = {
205
                    "index_type": index_type,
206
                    "index_param": index_param
207
                }
208
                milvus_instance.create_index(index_type, index_param)
209
                logger.debug(milvus_instance.describe_index())
210
            res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
211
            logger.info(res)
212
            milvus_instance.flush()
213
            collection_info = {
214
                "dimension": dimension,
215
                "metric_type": metric_type,
216
                "dataset_name": collection_name
217
            }
218
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
219
            metric.metrics = {
220
                "type": run_type,
221
                "value": {
222
                    "total_time": res["total_time"],
223
                    "qps": res["qps"],
224
                    "ni_time": res["ni_time"]
225
                }
226
            }
227
            report(metric)
228
            if build_index is True:
229
                logger.debug("Start build index for last file")
230
                milvus_instance.create_index(index_type, index_param)
231
                logger.debug(milvus_instance.describe_index())
232
233
        elif run_type == "insert_flush_performance":
234
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
235
            ni_per = collection["ni_per"]
236
            if milvus_instance.exists_collection():
237
                milvus_instance.drop()
238
                time.sleep(10)
239
            index_info = {}
240
            search_params = {}
241
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
242
            res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
243
            logger.info(res)
244
            logger.debug(milvus_instance.count())
245
            start_time = time.time()
246
            milvus_instance.flush()
247
            end_time = time.time()
248
            logger.debug(milvus_instance.count())
249
            collection_info = {
250
                "dimension": dimension,
251
                "metric_type": metric_type,
252
                "dataset_name": collection_name
253
            }
254
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
255
            metric.metrics = {
256
                "type": run_type,
257
                "value": {
258
                    "flush_time": round(end_time - start_time, 1)
259
                }
260
            }
261
            report(metric)
262
263
        elif run_type == "build_performance":
264
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
265
            index_type = collection["index_type"]
266
            index_param = collection["index_param"]
267
            collection_info = {
268
                "dimension": dimension,
269
                "metric_type": metric_type,
270
                "index_file_size": index_file_size,
271
                "dataset_name": collection_name
272
            }
273
            index_info = {
274
                "index_type": index_type,
275
                "index_param": index_param
276
            }
277
            if not milvus_instance.exists_collection():
278
                logger.error("Table name: %s not existed" % collection_name)
279
                return
280
            search_params = {}
281
            start_time = time.time()
282
            # drop index
283
            logger.debug("Drop index")
284
            milvus_instance.drop_index()
285
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
286
            milvus_instance.create_index(index_type, index_param)
287
            logger.debug(milvus_instance.describe_index())
288
            logger.debug(milvus_instance.count())
289
            end_time = time.time()
290
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
291
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
292
            metric.metrics = {
293
                "type": "build_performance",
294
                "value": {
295
                    "build_time": round(end_time - start_time, 1),
296
                    "start_mem_usage": start_mem_usage,
297
                    "end_mem_usage": end_mem_usage,
298
                    "diff_mem": end_mem_usage - start_mem_usage
299
                } 
300
            }
301
            report(metric)
302
303
        elif run_type == "delete_performance":
304
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
305
            ni_per = collection["ni_per"]
306
            search_params = {}
307
            collection_info = {
308
                "dimension": dimension,
309
                "metric_type": metric_type,
310
                "dataset_name": collection_name
311
            }
312
            if not milvus_instance.exists_collection():
313
                logger.error("Table name: %s not existed" % collection_name)
314
                return
315
            length = milvus_instance.count()
316
            logger.info(length)
317
            index_info = milvus_instance.describe_index()
318
            logger.info(index_info)
319
            ids = [i for i in range(length)]
320
            loops = int(length / ni_per)
321
            milvus_instance.preload_collection()
322
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
323
            start_time = time.time()
324
            for i in range(loops):
325
                delete_ids = ids[i*ni_per : i*ni_per+ni_per]
326
                logger.debug("Delete %d - %d" % (delete_ids[0], delete_ids[-1]))
327
                milvus_instance.delete(delete_ids)
328
                # milvus_instance.flush()
329
                logger.debug("Table row counts: %d" % milvus_instance.count())
330
            logger.debug("Table row counts: %d" % milvus_instance.count())
331
            milvus_instance.flush()
332
            end_time = time.time()
333
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
334
            logger.debug("Table row counts: %d" % milvus_instance.count())
335
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
336
            metric.metrics = {
337
                "type": "delete_performance",
338
                "value": {
339
                    "delete_time": round(end_time - start_time, 1),
340
                    "start_mem_usage": start_mem_usage,
341
                    "end_mem_usage": end_mem_usage,
342
                    "diff_mem": end_mem_usage - start_mem_usage
343
                }
344
            }
345
            report(metric)
346
347
        elif run_type == "get_ids_performance":
348
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
349
            ids_length_per_segment = collection["ids_length_per_segment"]
350
            if not milvus_instance.exists_collection():
351
                logger.error("Table name: %s not existed" % collection_name)
352
                return
353
            collection_info = {
354
                "dimension": dimension,
355
                "metric_type": metric_type,
356
                "index_file_size": index_file_size,
357
                "dataset_name": collection_name
358
            }
359
            search_params = {}
360
            logger.info(milvus_instance.count())
361
            index_info = milvus_instance.describe_index()
362
            logger.info(index_info)
363
            for ids_num in ids_length_per_segment:
364
                segment_num, get_ids = milvus_instance.get_rand_ids_each_segment(ids_num)
365
                start_time = time.time()
366
                _ = milvus_instance.get_entities(get_ids)
367
                total_time = time.time() - start_time
368
                avg_time = total_time / segment_num
369
                run_params = {"ids_num": ids_num}
370
                logger.info("Segment num: %d, ids num per segment: %d, run_time: %f" % (segment_num, ids_num, total_time))
371
                metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params=run_params)
372
                metric.metrics = {
373
                    "type": run_type,
374
                    "value": {
375
                        "total_time": round(total_time, 1),
376
                        "avg_time": round(avg_time, 1)
377
                    }
378
                }
379
                report(metric)
380
381
        elif run_type == "search_performance":
382
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
383
            run_count = collection["run_count"]
384
            top_ks = collection["top_ks"]
385
            nqs = collection["nqs"]
386
            search_params = collection["search_params"]
387
            collection_info = {
388
                "dimension": dimension,
389
                "metric_type": metric_type,
390
                "index_file_size": index_file_size,
391
                "dataset_name": collection_name
392
            }
393
            if not milvus_instance.exists_collection():
394
                logger.error("Table name: %s not existed" % collection_name)
395
                return
396
397
            logger.info(milvus_instance.count())
398
            index_info = milvus_instance.describe_index()
399
            logger.info(index_info)
400
            milvus_instance.preload_collection()
401
            logger.info("Start warm up query")
402
            res = self.do_query(milvus_instance, collection_name, [1], [1], 2, search_param=search_params[0])
403
            logger.info("End warm up query")
404
            for search_param in search_params:
405
                logger.info("Search param: %s" % json.dumps(search_param))
406
                res = self.do_query(milvus_instance, collection_name, top_ks, nqs, run_count, search_param)
407
                headers = ["Nq/Top-k"]
408
                headers.extend([str(top_k) for top_k in top_ks])
409
                logger.info("Search param: %s" % json.dumps(search_param))
410
                utils.print_table(headers, nqs, res)
411
                for index_nq, nq in enumerate(nqs):
412
                    for index_top_k, top_k in enumerate(top_ks):
413
                        search_param_group = {
414
                            "nq": nq,
415
                            "topk": top_k,
416
                            "search_param": search_param
417
                        }
418
                        search_time = res[index_nq][index_top_k]
419
                        metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group)
420
                        metric.metrics = {
421
                            "type": "search_performance",
422
                            "value": {
423
                                "search_time": search_time
424
                            } 
425
                        }
426
                        report(metric)
427
428
        elif run_type == "locust_search_performance":
429
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(
430
                collection_name)
431
            ### clear db
432
            ### spawn locust requests
433
            collection_num = collection["collection_num"]
434
            task = collection["task"]
435
            # . generate task code
436
            task_file = utils.get_unique_name()
437
            task_file_script = task_file + '.py'
438
            task_file_csv = task_file + '_stats.csv'
439
            task_type = task["type"]
440
            connection_type = "single"
441
            connection_num = task["connection_num"]
442
            if connection_num > 1:
443
                connection_type = "multi"
444
            clients_num = task["clients_num"]
445
            hatch_rate = task["hatch_rate"]
446
            during_time = task["during_time"]
447
            def_name = task_type
448
            task_params = task["params"]
449
            collection_names = []
450
            for i in range(collection_num):
451
                suffix = "".join(random.choice(string.ascii_letters + string.digits) for _ in range(5))
452
                collection_names.append(collection_name + "_" + suffix)
453
            # #####
454
            ni_per = collection["ni_per"]
455
            build_index = collection["build_index"]
456
            # TODO: debug
457
            for c_name in collection_names:
458
                milvus_instance = MilvusClient(collection_name=c_name, host=self.host, port=self.port)
459
                if milvus_instance.exists_collection(collection_name=c_name):
460
                    milvus_instance.drop(name=c_name)
461
                    time.sleep(10)
462
                milvus_instance.create_collection(c_name, dimension, index_file_size, metric_type)
463
                index_info = {
464
                    "build_index": build_index
465
                }
466
                if build_index is True:
467
                    index_type = collection["index_type"]
468
                    index_param = collection["index_param"]
469
                    index_info.update({
470
                        "index_type": index_type,
471
                        "index_param": index_param
472
                    })
473
                    milvus_instance.create_index(index_type, index_param)
474
                    logger.debug(milvus_instance.describe_index())
475
                res = self.do_insert(milvus_instance, c_name, data_type, dimension, collection_size, ni_per)
476
                logger.info(res)
477
                if "flush" in collection and collection["flush"] == "no":
478
                    logger.debug("No manual flush")
479
                else:
480
                    milvus_instance.flush()
481
                logger.debug("Table row counts: %d" % milvus_instance.count(name=c_name))
482
                if build_index is True:
483
                    logger.debug("Start build index for last file")
484
                    milvus_instance.create_index(index_type, index_param)
485
                    logger.debug(milvus_instance.describe_index())
486
            code_str = """
487
import random
488
import string
489
from locust import User, task, between
490
from locust_task import MilvusTask
491
from client import MilvusClient
492
493
host = '%s'
494
port = %s
495
dim = %s
496
connection_type = '%s'
497
collection_names = %s
498
m = MilvusClient(host=host, port=port)
499
500
501
def get_collection_name():
502
    return random.choice(collection_names)
503
504
505
def get_client(collection_name):
506
    if connection_type == 'single':
507
        return MilvusTask(m=m)
508
    elif connection_type == 'multi':
509
        return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
510
        
511
        
512
class QueryTask(User):
513
    wait_time = between(0.001, 0.002)
514
        
515
    @task()
516
    def %s(self):
517
        top_k = %s
518
        X = [[random.random() for i in range(dim)] for i in range(%s)]
519
        search_param = %s
520
        collection_name = get_collection_name()
521
        client = get_client(collection_name)
522
        client.query(X, top_k, search_param, collection_name=collection_name)
523
            """ % (self.host, self.port, dimension, connection_type, collection_names, def_name, task_params["top_k"], task_params["nq"], task_params["search_param"])
524
            with open(task_file_script, 'w+') as fd:
525
                fd.write(code_str)
526
            locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
527
                task_file_script,
528
                task_file,
529
                clients_num,
530
                hatch_rate,
531
                during_time)
532
            logger.info(locust_cmd)
533
            try:
534
                res = os.system(locust_cmd)
535
            except Exception as e:
536
                logger.error(str(e))
537
                return
538
539
            # . retrieve and collect test statistics
540
            locust_stats = None
541
            with open(task_file_csv, newline='') as fd:
542
                dr = csv.DictReader(fd)
543
                for row in dr:
544
                    if row["Name"] != "Aggregated":
545
                        continue
546
                    locust_stats = row
547
            logger.info(locust_stats)
548
            # clean up temp files
549
            search_params = {
550
                "top_k": task_params["top_k"],
551
                "nq": task_params["nq"],
552
                "nprobe": task_params["search_param"]["nprobe"]
553
            }
554
            run_params = {
555
                "connection_num": connection_num,
556
                "clients_num": clients_num,
557
                "hatch_rate": hatch_rate,
558
                "during_time": during_time
559
            }
560
            collection_info = {
561
                "dimension": dimension,
562
                "metric_type": metric_type,
563
                "index_file_size": index_file_size,
564
                "dataset_name": collection_name
565
            }
566
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params, run_params)
0 ignored issues
show
introduced by
The variable index_info does not seem to be defined in case the for loop on line 457 is not entered. Are you sure this can never be the case?
Loading history...
567
            metric.metrics = {
568
                "type": run_type,
569
                "value": {
570
                    "during_time": during_time,
571
                    "request_count": int(locust_stats["Request Count"]),
572
                    "failure_count": int(locust_stats["Failure Count"]),
573
                    "qps": locust_stats["Requests/s"],
574
                    "min_response_time": int(locust_stats["Min Response Time"]),
575
                    "max_response_time": int(locust_stats["Max Response Time"]),
576
                    "median_response_time": int(locust_stats["Median Response Time"]),
577
                    "avg_response_time": int(locust_stats["Average Response Time"])
578
                }
579
            }
580
            report(metric)
581
582
        elif run_type == "search_ids_stability":
583
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
584
            search_params = collection["search_params"]
585
            during_time = collection["during_time"]
586
            ids_length = collection["ids_length"]
587
            ids = collection["ids"]
588
            collection_info = {
589
                "dimension": dimension,
590
                "metric_type": metric_type,
591
                "index_file_size": index_file_size,
592
                "dataset_name": collection_name
593
            }
594
            if not milvus_instance.exists_collection():
595
                logger.error("Table name: %s not existed" % collection_name)
596
                return
597
            logger.info(milvus_instance.count())
598
            index_info = milvus_instance.describe_index()
599
            logger.info(index_info)
600
            g_top_k = int(collection["top_ks"].split("-")[1])
601
            l_top_k = int(collection["top_ks"].split("-")[0])
602
            # g_id = int(ids.split("-")[1])
603
            # l_id = int(ids.split("-")[0])
604
            g_id_length = int(ids_length.split("-")[1])
605
            l_id_length = int(ids_length.split("-")[0])
606
607
            milvus_instance.preload_collection()
608
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
609
            logger.debug(start_mem_usage)
610
            start_time = time.time()
611
            while time.time() < start_time + during_time * 60:
612
                search_param = {}
613
                top_k = random.randint(l_top_k, g_top_k)
614
                ids_num = random.randint(l_id_length, g_id_length)
615
                ids_param = [random.randint(l_id_length, g_id_length) for _ in range(ids_num)]
616
                for k, v in search_params.items():
617
                    search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1]))
618
                logger.debug("Query top-k: %d, ids_num: %d, param: %s" % (top_k, ids_num, json.dumps(search_param)))
619
                result = milvus_instance.query_ids(top_k, ids_param, search_param=search_param)
620
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
621
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {})
622
            metric.metrics = {
623
                "type": "search_ids_stability",
624
                "value": {
625
                    "during_time": during_time,
626
                    "start_mem_usage": start_mem_usage,
627
                    "end_mem_usage": end_mem_usage,
628
                    "diff_mem": end_mem_usage - start_mem_usage
629
                }
630
            }
631
            report(metric)
632
633
        # for sift/deep datasets
634
        # TODO: enable
635
        elif run_type == "accuracy":
636
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
637
            search_params = collection["search_params"]
638
            # mapping to search param list
639
            search_params = self.generate_combinations(search_params)
640
641
            top_ks = collection["top_ks"]
642
            nqs = collection["nqs"]
643
            collection_info = {
644
                "dimension": dimension,
645
                "metric_type": metric_type,
646
                "index_file_size": index_file_size,
647
                "dataset_name": collection_name
648
            }
649
            if not milvus_instance.exists_collection():
650
                logger.error("Table name: %s not existed" % collection_name)
651
                return
652
            logger.info(milvus_instance.count())
653
            index_info = milvus_instance.describe_index()
654
            logger.info(index_info)
655
            milvus_instance.preload_collection()
656
            true_ids_all = self.get_groundtruth_ids(collection_size)
657
            for search_param in search_params:
658
                for top_k in top_ks:
659
                    for nq in nqs:
660
                        # total = 0
661
                        search_param_group = {
662
                            "nq": nq,
663
                            "topk": top_k,
664
                            "search_param": search_param
665
                        }
666
                        logger.info("Query params: %s" % json.dumps(search_param_group))
667
                        result_ids, _ = self.do_query_ids(milvus_instance, collection_name, top_k, nq, search_param=search_param)
668
                        acc_value = self.get_recall_value(true_ids_all[:nq, :top_k].tolist(), result_ids)
669
                        logger.info("Query accuracy: %s" % acc_value)
670
                        metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group)
671
                        metric.metrics = {
672
                            "type": "accuracy",
673
                            "value": {
674
                                "acc": acc_value
675
                            } 
676
                        }
677
                        report(metric)
678
679
        elif run_type == "ann_accuracy":
680
            hdf5_source_file = collection["source_file"]
681
            collection_name = collection["collection_name"]
682
            index_file_sizes = collection["index_file_sizes"]
683
            index_types = collection["index_types"]
684
            index_params = collection["index_params"]
685
            top_ks = collection["top_ks"]
686
            nqs = collection["nqs"]
687
            search_params = collection["search_params"]
688
            # mapping to search param list
689
            search_params = self.generate_combinations(search_params)
690
            # mapping to index param list
691
            index_params = self.generate_combinations(index_params)
692
693
            data_type, dimension, metric_type = parser.parse_ann_collection_name(collection_name)
694
            dataset = utils.get_dataset(hdf5_source_file)
695
            true_ids = np.array(dataset["neighbors"])
696
            for index_file_size in index_file_sizes:
697
                collection_info = {
698
                    "dimension": dimension,
699
                    "metric_type": metric_type,
700
                    "index_file_size": index_file_size,
701
                    "dataset_name": collection_name
702
                }
703
                if milvus_instance.exists_collection(collection_name):
704
                    logger.info("Re-create collection: %s" % collection_name)
705
                    milvus_instance.drop()
706
                    time.sleep(DELETE_INTERVAL_TIME)
707
708
                milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
709
                logger.info(milvus_instance.describe())
710
                insert_vectors = self.normalize(metric_type, np.array(dataset["train"]))
711
                # Insert batch once
712
                # milvus_instance.insert(insert_vectors)
713
                loops = len(insert_vectors) // INSERT_INTERVAL + 1
714
                for i in range(loops):
715
                    start = i*INSERT_INTERVAL
716
                    end = min((i+1)*INSERT_INTERVAL, len(insert_vectors))
717
                    tmp_vectors = insert_vectors[start:end]
718
                    if start < end:
719
                        if not isinstance(tmp_vectors, list):
720
                            milvus_instance.insert(tmp_vectors.tolist(), ids=[i for i in range(start, end)])
721
                        else:
722
                            milvus_instance.insert(tmp_vectors, ids=[i for i in range(start, end)])
723
                milvus_instance.flush()
724
                logger.info("Table: %s, row count: %s" % (collection_name, milvus_instance.count()))
725
                if milvus_instance.count() != len(insert_vectors):
726
                    logger.error("Table row count is not equal to insert vectors")
727
                    return
728
                for index_type in index_types:
729
                    for index_param in index_params:
730
                        logger.debug("Building index with param: %s" % json.dumps(index_param))
731
                        milvus_instance.create_index(index_type, index_param=index_param)
732
                        logger.info(milvus_instance.describe_index())
733
                        logger.info("Start preload collection: %s" % collection_name)
734
                        milvus_instance.preload_collection()
735
                        index_info = {
736
                            "index_type": index_type,
737
                            "index_param": index_param
738
                        }
739
                        logger.debug(index_info)
740
                        for search_param in search_params:
741
                            for nq in nqs:
742
                                query_vectors = self.normalize(metric_type, np.array(dataset["test"][:nq]))
743
                                for top_k in top_ks:
744
                                    search_param_group = {
745
                                        "nq": len(query_vectors),
746
                                        "topk": top_k,
747
                                        "search_param": search_param 
748
                                    }
749
                                    logger.debug(search_param_group)
750
                                    if not isinstance(query_vectors, list):
751
                                        result = milvus_instance.query(query_vectors.tolist(), top_k, search_param=search_param)
752
                                    else:
753
                                        result = milvus_instance.query(query_vectors, top_k, search_param=search_param)
754
                                    if len(result):
755
                                        logger.debug(len(result))
756
                                        logger.debug(len(result[0]))
757
                                    result_ids = result.id_array
758
                                    acc_value = self.get_recall_value(true_ids[:nq, :top_k].tolist(), result_ids)
759
                                    logger.info("Query ann_accuracy: %s" % acc_value)
760
                                    metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_param_group)
761
                                    metric.metrics = {
762
                                        "type": "ann_accuracy",
763
                                        "value": {
764
                                            "acc": acc_value
765
                                        } 
766
                                    }
767
                                    report(metric)
768
769
        elif run_type == "search_stability":
770
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
771
            search_params = collection["search_params"]
772
            during_time = collection["during_time"]
773
            collection_info = {
774
                "dimension": dimension,
775
                "metric_type": metric_type,
776
                "dataset_name": collection_name
777
            }
778
            if not milvus_instance.exists_collection():
779
                logger.error("Table name: %s not existed" % collection_name)
780
                return
781
            logger.info(milvus_instance.count())
782
            index_info = milvus_instance.describe_index()
783
            logger.info(index_info)
784
            g_top_k = int(collection["top_ks"].split("-")[1])
785
            g_nq = int(collection["nqs"].split("-")[1])
786
            l_top_k = int(collection["top_ks"].split("-")[0])
787
            l_nq = int(collection["nqs"].split("-")[0])
788
            milvus_instance.preload_collection()
789
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
790
            logger.debug(start_mem_usage)
791
            start_row_count = milvus_instance.count()
792
            logger.debug(milvus_instance.describe_index())
793
            logger.info(start_row_count)
794
            start_time = time.time()
795
            while time.time() < start_time + during_time * 60:
796
                search_param = {}
797
                top_k = random.randint(l_top_k, g_top_k)
798
                nq = random.randint(l_nq, g_nq)
799
                for k, v in search_params.items():
800
                    search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1]))
801
                query_vectors = [[random.random() for _ in range(dimension)] for _ in range(nq)]
802
                logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param)))
803
                result = milvus_instance.query(query_vectors, top_k, search_param=search_param)
804
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
805
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {})
806
            metric.metrics = {
807
                "type": "search_stability",
808
                "value": {
809
                    "during_time": during_time,
810
                    "start_mem_usage": start_mem_usage,
811
                    "end_mem_usage": end_mem_usage,
812
                    "diff_mem": end_mem_usage - start_mem_usage
813
                } 
814
            }
815
            report(metric)
816
817
        elif run_type == "loop_stability":
818
            # init data
819
            milvus_instance.clean_db()
820
            pull_interval = collection["pull_interval"]
821
            collection_num = collection["collection_num"]
822
            concurrent = collection["concurrent"] if "concurrent" in collection else False
823
            concurrent_num = collection_num
824
            dimension = collection["dimension"] if "dimension" in collection else 128
825
            insert_xb = collection["insert_xb"] if "insert_xb" in collection else 100000
826
            index_types = collection["index_types"] if "index_types" in collection else ['ivf_sq8']
827
            index_param = {"nlist": 2048}
828
            collection_names = []
829
            milvus_instances_map = {}
830
            insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
831
            for i in range(collection_num):
832
                name = utils.get_unique_name(prefix="collection_")
833
                collection_names.append(name)
834
                metric_type = random.choice(["l2", "ip"])
835
                index_file_size = random.randint(10, 20)
836
                milvus_instance.create_collection(name, dimension, index_file_size, metric_type)
837
                milvus_instance = MilvusClient(collection_name=name, host=self.host)
838
                index_type = random.choice(index_types)
839
                milvus_instance.create_index(index_type, index_param=index_param)
840
                logger.info(milvus_instance.describe_index())
841
                insert_vectors = utils.normalize(metric_type, insert_vectors)
842
                milvus_instance.insert(insert_vectors)
843
                milvus_instance.flush()
844
                milvus_instances_map.update({name: milvus_instance})
845
                logger.info(milvus_instance.describe_index())
846
                logger.info(milvus_instance.describe())
847
848
            # loop time unit: min -> s
849
            pull_interval_seconds = pull_interval * 60
850
            tasks = ["insert_rand", "delete_rand", "query_rand", "flush", "compact"]
851
            i = 1
852
            while True:
853
                logger.info("Loop time: %d" % i)
854
                start_time = time.time()
855
                while time.time() - start_time < pull_interval_seconds:
856
                    if concurrent:
857
                        mp = []
858
                        for _ in range(concurrent_num):
859
                            tmp_collection_name = random.choice(collection_names)
860
                            task_name = random.choice(tasks)
861
                            mp.append((tmp_collection_name, task_name))
862
863
                        with futures.ThreadPoolExecutor(max_workers=concurrent_num) as executor:
864
                            future_results = {executor.submit(getattr(milvus_instances_map[mp[j][0]], mp[j][1])): j for j in range(concurrent_num)}
865
                            for future in futures.as_completed(future_results):
866
                                future.result()
867
868
                    else:
869
                        tmp_collection_name = random.choice(collection_names)
870
                        task_name = random.choice(tasks)
871
                        logger.info(tmp_collection_name)
872
                        logger.info(task_name)
873
                        task_run = getattr(milvus_instances_map[tmp_collection_name], task_name)
874
                        task_run()
875
    
876
                logger.debug("Restart server")
877
                utils.restart_server(self.service_name, namespace)
878
                # new connection
879
                for name in collection_names:
880
                    milvus_instance = MilvusClient(collection_name=name, host=self.host)
881
                    milvus_instances_map.update({name: milvus_instance})
882
                i = i + 1
883
884
        elif run_type == "stability":
885
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(collection_name)
886
            search_params = collection["search_params"]
887
            insert_xb = collection["insert_xb"]
888
            insert_interval = collection["insert_interval"]
889
            delete_xb = collection["delete_xb"]
890
            during_time = collection["during_time"]
891
            collection_info = {
892
                "dimension": dimension,
893
                "metric_type": metric_type,
894
                "dataset_name": collection_name
895
            }
896
            if not milvus_instance.exists_collection():
897
                logger.error("Table name: %s not existed" % collection_name)
898
                return
899
            logger.info(milvus_instance.count())
900
            index_info = milvus_instance.describe_index()
901
            logger.info(index_info)
902
            g_top_k = int(collection["top_ks"].split("-")[1])
903
            g_nq = int(collection["nqs"].split("-")[1])
904
            l_top_k = int(collection["top_ks"].split("-")[0])
905
            l_nq = int(collection["nqs"].split("-")[0])
906
            milvus_instance.preload_collection()
907
            start_mem_usage = milvus_instance.get_mem_info()["memory_used"]
908
            start_row_count = milvus_instance.count()
909
            logger.debug(milvus_instance.describe_index())
910
            logger.info(start_row_count)
911
            start_time = time.time()
912
            i = 0
913
            ids = []
914
            insert_vectors = [[random.random() for _ in range(dimension)] for _ in range(insert_xb)]
915
            query_vectors = [[random.random() for _ in range(dimension)] for _ in range(10000)]
916 View Code Duplication
            while time.time() < start_time + during_time * 60:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
917
                i = i + 1
918
                for j in range(insert_interval):
919
                    top_k = random.randint(l_top_k, g_top_k)
920
                    nq = random.randint(l_nq, g_nq)
921
                    search_param = {}
922
                    for k, v in search_params.items():
923
                        search_param[k] = random.randint(int(v.split("-")[0]), int(v.split("-")[1]))
924
                    logger.debug("Query nq: %d, top-k: %d, param: %s" % (nq, top_k, json.dumps(search_param)))
925
                    result = milvus_instance.query(query_vectors[0:nq], top_k, search_param=search_param)
926
                count = milvus_instance.count()
927
                insert_ids = [(count+x) for x in range(len(insert_vectors))]
928
                ids.extend(insert_ids)
929
                status, res = milvus_instance.insert(insert_vectors, ids=insert_ids)
930
                logger.debug("%d, row_count: %d" % (i, milvus_instance.count()))
931
                milvus_instance.delete(ids[-delete_xb:])
932
                milvus_instance.flush()
933
                milvus_instance.compact()
934
            end_mem_usage = milvus_instance.get_mem_info()["memory_used"]
935
            end_row_count = milvus_instance.count()
936
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, {})
937
            metric.metrics = {
938
                "type": "stability",
939
                "value": {
940
                    "during_time": during_time,
941
                    "start_mem_usage": start_mem_usage,
942
                    "end_mem_usage": end_mem_usage,
943
                    "diff_mem": end_mem_usage - start_mem_usage,
944
                    "row_count_increments": end_row_count - start_row_count
945
                } 
946
            }
947
            report(metric)
948
949
        elif run_type == "locust_mix_performance":
950
            (data_type, collection_size, index_file_size, dimension, metric_type) = parser.collection_parser(
951
                collection_name)
952
            ni_per = collection["ni_per"]
953
            build_index = collection["build_index"]
954
            # # TODO: debug
955
            if milvus_instance.exists_collection():
956
                milvus_instance.drop()
957
                time.sleep(10)
958
            index_info = {}
959
            search_params = {}
960
            milvus_instance.create_collection(collection_name, dimension, index_file_size, metric_type)
961
            if build_index is True:
962
                index_type = collection["index_type"]
963
                index_param = collection["index_param"]
964
                index_info = {
965
                    "index_tyoe": index_type,
966
                    "index_param": index_param
967
                }
968
                milvus_instance.create_index(index_type, index_param)
969
                logger.debug(milvus_instance.describe_index())
970
            res = self.do_insert(milvus_instance, collection_name, data_type, dimension, collection_size, ni_per)
971
            logger.info(res)
972
            if "flush" in collection and collection["flush"] == "no":
973
                logger.debug("No manual flush")
974
            else:
975
                milvus_instance.flush()
976
            if build_index is True:
977
                logger.debug("Start build index for last file")
978
                milvus_instance.create_index(index_type, index_param)
979
                logger.debug(milvus_instance.describe_index())
980
            ### spawn locust requests
981
            task = collection["tasks"]
982
            # generate task code
983
            task_file = utils.get_unique_name()
984
            task_file_script = task_file + '.py'
985
            task_file_csv = task_file + '_stats.csv'
986
            task_types = task["types"]
987
            connection_type = "single"
988
            connection_num = task["connection_num"]
989
            if connection_num > 1:
990
                connection_type = "multi"
991
            clients_num = task["clients_num"]
992
            hatch_rate = task["hatch_rate"]
993
            during_time = task["during_time"]
994
            def_strs = ""
995
            for task_type in task_types:
996
                _type = task_type["type"]
997
                weight = task_type["weight"]
998
                if _type == "flush":
999
                    def_str = """
1000
    @task(%d)
1001
    def flush(self):
1002
        client = get_client(collection_name)
1003
        client.flush(collection_name=collection_name)
1004
                            """ % weight
1005
                if _type == "compact":
1006
                    def_str = """
1007
    @task(%d)
1008
    def compact(self):
1009
        client = get_client(collection_name)
1010
        client.compact(collection_name)
1011
                            """ % weight
1012
                if _type == "query":
1013
                    def_str = """
1014
    @task(%d)
1015
    def query(self):
1016
        client = get_client(collection_name)
1017
        params = %s
1018
        X = [[random.random() for i in range(dim)] for i in range(params["nq"])]
1019
        client.query(X, params["top_k"], params["search_param"], collection_name=collection_name)
1020
                    """ % (weight, task_type["params"])
1021
                if _type == "insert":
1022
                    def_str = """
1023
    @task(%d)
1024
    def insert(self):
1025
        client = get_client(collection_name)
1026
        params = %s
1027
        ids = [random.randint(10, 1000000) for i in range(params["nb"])]
1028
        X = [[random.random() for i in range(dim)] for i in range(params["nb"])]
1029
        client.insert(X,ids=ids, collection_name=collection_name)
1030
                        """ % (weight, task_type["params"])
1031
                if _type == "delete":
1032
                    def_str = """
1033
    @task(%d)
1034
    def delete(self):
1035
        client = get_client(collection_name)
1036
        ids = [random.randint(1, 1000000) for i in range(1)]
1037
        client.delete(ids, collection_name)
1038
                            """ % weight
1039
                def_strs += def_str
0 ignored issues
show
introduced by
The variable def_str does not seem to be defined for all execution paths.
Loading history...
1040
                code_str = """
1041
import random
1042
import json
1043
from locust import User, task, between
1044
from locust_task import MilvusTask
1045
from client import MilvusClient
1046
1047
host = '%s'
1048
port = %s
1049
collection_name = '%s'
1050
dim = %s
1051
connection_type = '%s'
1052
m = MilvusClient(host=host, port=port)
1053
1054
def get_client(collection_name):
1055
    if connection_type == 'single':
1056
        return MilvusTask(m=m)
1057
    elif connection_type == 'multi':
1058
        return MilvusTask(connection_type='multi', host=host, port=port, collection_name=collection_name)
1059
        
1060
        
1061
class MixTask(User):
1062
    wait_time = between(0.001, 0.002)
1063
    %s
1064
            """ % (self.host, self.port, collection_name, dimension, connection_type, def_strs)
1065
            print(def_strs)
1066
            with open(task_file_script, "w+") as fd:
1067
                fd.write(code_str)
0 ignored issues
show
introduced by
The variable code_str does not seem to be defined in case the for loop on line 995 is not entered. Are you sure this can never be the case?
Loading history...
1068
            locust_cmd = "locust -f %s --headless --csv=%s -u %d -r %d -t %s" % (
1069
                task_file_script,
1070
                task_file,
1071
                clients_num,
1072
                hatch_rate,
1073
                during_time)
1074
            logger.info(locust_cmd)
1075
            try:
1076
                res = os.system(locust_cmd)
1077
            except Exception as e:
1078
                logger.error(str(e))
1079
                return
1080
            # . retrieve and collect test statistics
1081
            locust_stats = None
1082
            with open(task_file_csv, newline='') as fd:
1083
                dr = csv.DictReader(fd)
1084
                for row in dr:
1085
                    if row["Name"] != "Aggregated":
1086
                        continue
1087
                    locust_stats = row
1088
            logger.info(locust_stats)
1089
            collection_info = {
1090
                "dimension": dimension,
1091
                "metric_type": metric_type,
1092
                "dataset_name": collection_name
1093
            }
1094
            metric = self.report_wrapper(milvus_instance, self.env_value, self.hostname, collection_info, index_info, search_params)
1095
            metric.metrics = {
1096
                "type": run_type,
1097
                "value": {
1098
                    "during_time": during_time,
1099
                    "request_count": int(locust_stats["Request Count"]),
1100
                    "failure_count": int(locust_stats["Failure Count"]),
1101
                    "qps": locust_stats["Requests/s"],
1102
                    "min_response_time": int(locust_stats["Min Response Time"]),
1103
                    "max_response_time": int(locust_stats["Max Response Time"]),
1104
                    "median_response_time": int(locust_stats["Median Response Time"]),
1105
                    "avg_response_time": int(locust_stats["Average Response Time"])
1106
                }
1107
            }
1108
            report(metric)
1109
1110
        else:
1111
            logger.warning("Run type: %s not defined" % run_type)
1112
            return
1113
        logger.debug("Test finished")
1114