1 | import os |
||
2 | import sys |
||
3 | import time |
||
4 | import pdb |
||
5 | import argparse |
||
6 | import logging |
||
7 | import traceback |
||
8 | from multiprocessing import Process |
||
9 | from queue import Queue |
||
10 | from logging import handlers |
||
11 | from yaml import full_load, dump |
||
12 | from local_runner import LocalRunner |
||
13 | from docker_runner import DockerRunner |
||
14 | from k8s_runner import K8sRunner |
||
15 | import parser |
||
16 | |||
17 | DEFAULT_IMAGE = "milvusdb/milvus:latest" |
||
18 | LOG_FOLDER = "logs" |
||
19 | NAMESPACE = "milvus" |
||
20 | |||
21 | logging.basicConfig(format='%(asctime)s,%(msecs)d %(levelname)-8s [%(filename)s:%(lineno)d] %(message)s', |
||
22 | datefmt='%Y-%m-%d:%H:%M:%S', |
||
23 | level=logging.DEBUG) |
||
24 | logger = logging.getLogger("milvus_benchmark") |
||
25 | |||
26 | |||
27 | def positive_int(s): |
||
28 | i = None |
||
29 | try: |
||
30 | i = int(s) |
||
31 | except ValueError: |
||
32 | pass |
||
33 | if not i or i < 1: |
||
34 | raise argparse.ArgumentTypeError("%r is not a positive integer" % s) |
||
35 | return i |
||
36 | |||
37 | |||
38 | def get_image_tag(image_version, image_type): |
||
39 | return "%s-%s-centos7-release" % (image_version, image_type) |
||
40 | # return "%s-%s-centos7-release" % ("0.7.1", image_type) |
||
41 | # return "%s-%s-centos7-release" % ("PR-2780", image_type) |
||
42 | |||
43 | |||
44 | def queue_worker(queue): |
||
45 | """ |
||
46 | Using queue to make sure only one test process on each host. |
||
47 | |||
48 | Workers can be run concurrently on different host |
||
49 | """ |
||
50 | while not queue.empty(): |
||
51 | q = queue.get() |
||
52 | suite = q["suite"] |
||
53 | server_host = q["server_host"] |
||
54 | deploy_mode = q["deploy_mode"] |
||
55 | image_type = q["image_type"] |
||
56 | image_tag = q["image_tag"] |
||
57 | |||
58 | with open(suite) as f: |
||
59 | suite_dict = full_load(f) |
||
60 | f.close() |
||
61 | logger.debug(suite_dict) |
||
62 | |||
63 | run_type, run_params = parser.operations_parser(suite_dict) |
||
64 | collections = run_params["collections"] |
||
65 | for collection in collections: |
||
66 | # run tests |
||
67 | server_config = collection["server"] if "server" in collection else None |
||
68 | logger.debug(server_config) |
||
69 | runner = K8sRunner() |
||
70 | if runner.init_env(server_config, server_host, deploy_mode, image_type, image_tag): |
||
71 | logger.debug("Start run tests") |
||
72 | try: |
||
73 | runner.run(run_type, collection) |
||
74 | except Exception as e: |
||
75 | logger.error(str(e)) |
||
76 | logger.error(traceback.format_exc()) |
||
77 | finally: |
||
78 | time.sleep(10) |
||
79 | runner.clean_up() |
||
80 | else: |
||
81 | logger.error("Runner init failed") |
||
82 | if server_host: |
||
0 ignored issues
–
show
introduced
by
![]() |
|||
83 | logger.debug("All task finished in queue: %s" % server_host) |
||
84 | |||
85 | |||
86 | def main(): |
||
87 | arg_parser = argparse.ArgumentParser( |
||
88 | formatter_class=argparse.ArgumentDefaultsHelpFormatter) |
||
89 | # helm mode with scheduler |
||
90 | arg_parser.add_argument( |
||
91 | "--image-version", |
||
92 | default="", |
||
93 | help="image version") |
||
94 | arg_parser.add_argument( |
||
95 | "--schedule-conf", |
||
96 | metavar='FILE', |
||
97 | default='', |
||
98 | help="load test schedule from FILE") |
||
99 | arg_parser.add_argument( |
||
100 | "--deploy-mode", |
||
101 | default='', |
||
102 | help="single or shards") |
||
103 | |||
104 | # local mode |
||
105 | arg_parser.add_argument( |
||
106 | '--local', |
||
107 | action='store_true', |
||
108 | help='use local milvus server') |
||
109 | arg_parser.add_argument( |
||
110 | '--host', |
||
111 | help='server host ip param for local mode', |
||
112 | default='127.0.0.1') |
||
113 | arg_parser.add_argument( |
||
114 | '--port', |
||
115 | help='server port param for local mode', |
||
116 | default='19530') |
||
117 | arg_parser.add_argument( |
||
118 | '--suite', |
||
119 | metavar='FILE', |
||
120 | help='load test suite from FILE', |
||
121 | default='') |
||
122 | |||
123 | args = arg_parser.parse_args() |
||
124 | |||
125 | if args.schedule_conf: |
||
126 | if args.local: |
||
127 | raise Exception("Helm mode with scheduler and other mode are incompatible") |
||
128 | if not args.image_version: |
||
129 | raise Exception("Image version not given") |
||
130 | image_version = args.image_version |
||
131 | deploy_mode = args.deploy_mode |
||
132 | with open(args.schedule_conf) as f: |
||
133 | schedule_config = full_load(f) |
||
134 | f.close() |
||
135 | queues = [] |
||
136 | # server_names = set() |
||
137 | server_names = [] |
||
138 | for item in schedule_config: |
||
139 | server_host = item["server"] if "server" in item else "" |
||
140 | suite_params = item["suite_params"] |
||
141 | server_names.append(server_host) |
||
142 | q = Queue() |
||
143 | for suite_param in suite_params: |
||
144 | suite = "suites/"+suite_param["suite"] |
||
145 | image_type = suite_param["image_type"] |
||
146 | image_tag = get_image_tag(image_version, image_type) |
||
147 | q.put({ |
||
148 | "suite": suite, |
||
149 | "server_host": server_host, |
||
150 | "deploy_mode": deploy_mode, |
||
151 | "image_tag": image_tag, |
||
152 | "image_type": image_type |
||
153 | }) |
||
154 | queues.append(q) |
||
155 | logging.error(queues) |
||
156 | thread_num = len(server_names) |
||
157 | processes = [] |
||
158 | |||
159 | for i in range(thread_num): |
||
160 | x = Process(target=queue_worker, args=(queues[i], )) |
||
161 | processes.append(x) |
||
162 | x.start() |
||
163 | time.sleep(10) |
||
164 | for x in processes: |
||
165 | x.join() |
||
166 | |||
167 | elif args.local: |
||
168 | # for local mode |
||
169 | host = args.host |
||
170 | port = args.port |
||
171 | suite = args.suite |
||
172 | with open(suite) as f: |
||
173 | suite_dict = full_load(f) |
||
174 | f.close() |
||
175 | logger.debug(suite_dict) |
||
176 | run_type, run_params = parser.operations_parser(suite_dict) |
||
177 | collections = run_params["collections"] |
||
178 | if len(collections) > 1: |
||
179 | raise Exception("Multi collections not supported in Local Mode") |
||
180 | collection = collections[0] |
||
181 | runner = LocalRunner(host, port) |
||
182 | logger.info("Start run local mode test, test type: %s" % run_type) |
||
183 | runner.run(run_type, collection) |
||
184 | |||
185 | |||
186 | if __name__ == "__main__": |
||
187 | main() |
||
188 |