Issues (718)

tests/milvus_benchmark/main.py (1 issue)

Severity
1
import os
2
import sys
3
import time
4
import pdb
5
import argparse
6
import logging
7
import traceback
8
from multiprocessing import Process
9
from queue import Queue
10
from logging import handlers
11
from yaml import full_load, dump
12
from local_runner import LocalRunner
13
from docker_runner import DockerRunner
14
from k8s_runner import K8sRunner
15
import parser
16
17
DEFAULT_IMAGE = "milvusdb/milvus:latest"
18
LOG_FOLDER = "logs"
19
NAMESPACE = "milvus"
20
21
logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s',
22
    datefmt='%Y-%m-%d:%H:%M:%S',
23
    level=logging.DEBUG)
24
logger = logging.getLogger("milvus_benchmark")
25
26
27
def positive_int(s):
28
    i = None
29
    try:
30
        i = int(s)
31
    except ValueError:
32
        pass
33
    if not i or i < 1:
34
        raise argparse.ArgumentTypeError("%r is not a positive integer" % s)
35
    return i
36
37
38
def get_image_tag(image_version, image_type):
39
    return "%s-%s-centos7-release" % (image_version, image_type)
40
    # return "%s-%s-centos7-release" % ("0.7.1", image_type)
41
    # return "%s-%s-centos7-release" % ("PR-2780", image_type)
42
43
44
def queue_worker(queue):
45
    """
46
    Using queue to make sure only one test process on each host.
47
48
    Workers can be run concurrently on different host
49
    """
50
    while not queue.empty():
51
        q = queue.get()
52
        suite = q["suite"]
53
        server_host = q["server_host"]
54
        deploy_mode = q["deploy_mode"]
55
        image_type = q["image_type"]
56
        image_tag = q["image_tag"]
57
58
        with open(suite) as f:
59
            suite_dict = full_load(f)
60
            f.close()
61
        logger.debug(suite_dict)
62
63
        run_type, run_params = parser.operations_parser(suite_dict)
64
        collections = run_params["collections"]
65
        for collection in collections:
66
            # run tests
67
            server_config = collection["server"] if "server" in collection else None
68
            logger.debug(server_config)
69
            runner = K8sRunner()
70
            if runner.init_env(server_config, server_host, deploy_mode, image_type, image_tag):
71
                logger.debug("Start run tests")
72
                try:
73
                    runner.run(run_type, collection)
74
                except Exception as e:
75
                    logger.error(str(e))
76
                    logger.error(traceback.format_exc())
77
                finally:
78
                    time.sleep(10)
79
                    runner.clean_up()
80
            else:
81
                logger.error("Runner init failed")
82
    if server_host:
0 ignored issues
show
The variable server_host does not seem to be defined in case the while loop on line 50 is not entered. Are you sure this can never be the case?
Loading history...
83
        logger.debug("All task finished in queue: %s" % server_host)
84
85
86
def main():
87
    arg_parser = argparse.ArgumentParser(
88
        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
89
    # helm mode with scheduler
90
    arg_parser.add_argument(
91
        "--image-version",
92
        default="",
93
        help="image version")
94
    arg_parser.add_argument(
95
        "--schedule-conf",
96
        metavar='FILE',
97
        default='',
98
        help="load test schedule from FILE")
99
    arg_parser.add_argument(
100
        "--deploy-mode",
101
        default='',
102
        help="single or shards")
103
104
    # local mode
105
    arg_parser.add_argument(
106
        '--local',
107
        action='store_true',
108
        help='use local milvus server')
109
    arg_parser.add_argument(
110
        '--host',
111
        help='server host ip param for local mode',
112
        default='127.0.0.1')
113
    arg_parser.add_argument(
114
        '--port',
115
        help='server port param for local mode',
116
        default='19530')
117
    arg_parser.add_argument(
118
        '--suite',
119
        metavar='FILE',
120
        help='load test suite from FILE',
121
        default='')
122
123
    args = arg_parser.parse_args()
124
125
    if args.schedule_conf:
126
        if args.local:
127
            raise Exception("Helm mode with scheduler and other mode are incompatible")
128
        if not args.image_version:
129
            raise Exception("Image version not given")
130
        image_version = args.image_version
131
        deploy_mode = args.deploy_mode
132
        with open(args.schedule_conf) as f:
133
            schedule_config = full_load(f)
134
            f.close()
135
        queues = []
136
        # server_names = set()
137
        server_names = []
138
        for item in schedule_config:
139
            server_host = item["server"] if "server" in item else ""
140
            suite_params = item["suite_params"]
141
            server_names.append(server_host)
142
            q = Queue()
143
            for suite_param in suite_params:
144
                suite = "suites/"+suite_param["suite"]
145
                image_type = suite_param["image_type"]
146
                image_tag = get_image_tag(image_version, image_type)    
147
                q.put({
148
                    "suite": suite,
149
                    "server_host": server_host,
150
                    "deploy_mode": deploy_mode,
151
                    "image_tag": image_tag,
152
                    "image_type": image_type
153
                })
154
            queues.append(q)
155
        logging.error(queues)
156
        thread_num = len(server_names)
157
        processes = []
158
159
        for i in range(thread_num):
160
            x = Process(target=queue_worker, args=(queues[i], ))
161
            processes.append(x)
162
            x.start()
163
            time.sleep(10)
164
        for x in processes:
165
            x.join()
166
167
    elif args.local:
168
        # for local mode
169
        host = args.host
170
        port = args.port
171
        suite = args.suite
172
        with open(suite) as f:
173
            suite_dict = full_load(f)
174
            f.close()
175
        logger.debug(suite_dict)
176
        run_type, run_params = parser.operations_parser(suite_dict)
177
        collections = run_params["collections"]
178
        if len(collections) > 1:
179
            raise Exception("Multi collections not supported in Local Mode")
180
        collection = collections[0]
181
        runner = LocalRunner(host, port)
182
        logger.info("Start run local mode test, test type: %s" % run_type)
183
        runner.run(run_type, collection)
184
185
186
if __name__ == "__main__":
187
    main()
188