Failed Conditions
Pull Request — master (#1182)
by Lasse
01:42
created

coalib.tests.run_tests()   B

Complexity

Conditions 6

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 42
rs 7.5385
1
import argparse
2
import importlib
0 ignored issues
show
Unused Code introduced by
The import importlib seems to be unused.
Loading history...
3
import inspect
0 ignored issues
show
Unused Code introduced by
The import inspect seems to be unused.
Loading history...
4
import os
5
import subprocess
6
import sys
7
import shutil
8
import webbrowser
9
import multiprocessing
10
import functools
11
import tempfile
12
from coalib.misc.ContextManagers import (suppress_stdout,
0 ignored issues
show
Unused Code introduced by
Unused preserve_sys_path imported from coalib.misc.ContextManagers
Loading history...
13
                                         preserve_sys_path,
14
                                         subprocess_timeout)
15
from coalib.processes.Processing import create_process_group, get_cpu_count
16
17
18
def create_argparser(**kwargs):
19
    parser = argparse.ArgumentParser(**kwargs)
20
    parser.add_argument("-t",
21
                        "--test-only",
22
                        help="Execute only the tests with the "
23
                             "given base name",
24
                        nargs="+")
25
    parser.add_argument("-c",
26
                        "--cover",
27
                        help="Measure code coverage",
28
                        action="store_true")
29
    parser.add_argument("-H",
30
                        "--html",
31
                        help="Generate html code coverage, implies -c",
32
                        action="store_true")
33
    parser.add_argument("-v",
34
                        "--verbose",
35
                        help="More verbose output",
36
                        action="store_true")
37
    parser.add_argument("-o",
38
                        "--omit",
39
                        help="Base names of tests to omit",
40
                        nargs="+")
41
    parser.add_argument("-T",
42
                        "--timeout",
43
                        default=20,
44
                        type=int,
45
                        help="Amount of time to wait for a test to run "
46
                             "before killing it. To not use any timeout, "
47
                             "set this to 0")
48
    parser.add_argument("-j",
49
                        "--jobs",
50
                        default=get_cpu_count(),
51
                        type=int,
52
                        help="Number of jobs to use in parallel.")
53
54
    return parser
55
56
57
def execute_coverage_command(*args):
58
    commands = [sys.executable,
59
                "-m",
60
                "coverage"] + list(args)
61
    return subprocess.call(commands)
62
63
64
def parse_args(parser):
65
    """
66
    Parses the CLI arguments.
67
68
    :param parser: A argparse.ArgumentParser created with the
69
                   create_argparser method of TestHelper module.
70
    :return args:  The parsed arguments.
71
    """
72
    args = parser.parse_args()
73
    args = resolve_implicit_args(args, parser)
74
75
    return args
76
77
78
def resolve_implicit_args(args, parser):
79
    args.cover = args.cover or args.html
80
    if args.omit is not None and args.test_only is not None:
81
        parser.error("Incompatible options: --omit and --test_only")
82
    if args.omit is None:
83
        args.omit = []
84
    if args.test_only is None:
85
        args.test_only = []
86
87
    return args
88
89
90
def is_eligible_test(filename, test_only, omit):
91
    """
92
    Checks if the filename is a Test or not. The conditions are:
93
     - Ends with "Test.py"
94
     - Is not present in `omit`
95
     - If test_only is not empty, it should be present in test_only
96
97
    :param filename:  The filename to check eligibility for.
98
    :param test_only: Only execute files within the filenames in this list.
99
    :param omit:      The filename should not be in this list.
100
    return:           True if the file is eligible to be run as a test,
101
                      else False.
102
    """
103
    if not filename.endswith("Test.py"):
104
        return False
105
    name = os.path.splitext(os.path.basename(filename))[0]
106
    if name in omit:
107
        return False
108
    if (len(test_only) > 0) and (name not in test_only):
109
        return False
110
111
    return True
112
113
114
def delete_coverage(silent=False):
115
    """
116
    Deletes previous coverage data.
117
118
    :return: False if coverage3 cannot be executed.
119
    """
120
    coverage_available = False
121
    with suppress_stdout():
122
        coverage_available = (execute_coverage_command("combine") == 0 and
123
                              execute_coverage_command("erase") == 0)
124
125
    if not coverage_available and not silent:
126
        print("Coverage failed. Falling back to standard unit tests."
127
              "Install code coverage measurement for python3. Package"
128
              "name should be something like: python-coverage3/coverage")
129
130
    return coverage_available
131
132
133
def execute_command_array(command_array, timeout, verbose):
134
    """
135
    Executes the given command array in a subprocess group.
136
137
    :param command_array: The command array to execute.
138
    :param timeout:       Time to wait until killing the process.
139
    :param verbose:       Return the stdout and stderr of the subprocess or
140
                          not.
141
    :return:              A tuple of (result, message) where message gives
142
                          text information of what happened.
143
    """
144
    message = ""
145
    stdout_file = tempfile.TemporaryFile()
146
    p = create_process_group(command_array,
147
                             stdout=stdout_file,
148
                             stderr=subprocess.STDOUT,
149
                             universal_newlines=True)
150
    with subprocess_timeout(p,
151
                            timeout,
152
                            kill_pg=True) as timedout:
153
        retval = p.wait()
154
        timed_out = timedout.value
155
156
    if retval != 0 or verbose:
157
        stdout_file.seek(0)
158
        message += stdout_file.read().decode(sys.stdout.encoding,
159
                                             errors="replace")
160
161
    stdout_file.close()
162
163
    if timed_out:
164
        message += ("This test failed because it was taking more than %f sec "
165
                    "to execute. To change the timeout setting use the `-T` "
166
                    "or `--timeout` argument.\n" % timeout)
167
        return 1, message  # Guaranteed fail, especially on race condition
168
169
    return retval, message
170
171
172
def show_coverage_results(html):
173
    execute_coverage_command("combine")
174
    execute_coverage_command("report", "-m")
175
176
    if html:
177
        shutil.rmtree(".htmlreport", ignore_errors=True)
178
        print("Generating HTML report to .htmlreport...")
179
        execute_coverage_command("html", "-d", ".htmlreport")
180
181
        try:
182
            webbrowser.open_new_tab(os.path.join(".htmlreport",
183
                                                 "index.html"))
184
        except webbrowser.Error:
185
            pass
186
187
188
def execute_python_file(filename, ignored_files, cover, timeout, verbose):
189
    if not cover:
190
        return execute_command_array([sys.executable,
191
                                      filename],
192
                                     timeout=timeout,
193
                                     verbose=verbose)
194
195
    return execute_command_array([sys.executable,
196
                                  "-m",
197
                                  "coverage",
198
                                  "run",
199
                                  "-p",  # make it collectable later
200
                                  "--branch",
201
                                  "--omit",
202
                                  ignored_files,
203
                                  filename],
204
                                 timeout=timeout,
205
                                 verbose=verbose)
206
207
208
def show_nonexistent_tests(test_only, test_file_names):
209
    nonexistent_tests = 0
210
    number = len(test_only)
211
    digits = str(len(str(number)))
212
    format_string = (" {:>" + digits + "}/{:<} | {}, Cannot execute: This "
213
                     "test does not exist.")
214
    for test in test_only:
215
        if test not in test_file_names:
216
            nonexistent_tests += 1
217
            print(format_string.format(nonexistent_tests, number, test))
218
219
    return nonexistent_tests, number
220
221
222
def execute_test(filename,
223
                 ignored_files,
224
                 verbose,
225
                 cover,
226
                 timeout):
227
    """
228
    Executes the given test and counts up failed_tests if needed.
229
230
    :param filename:      Filename of test to execute.
231
    :param ignored_files: Comma separated list of files to ignore for coverage.
232
    :param verbose:       Boolean to show more information.
233
    :param cover:         Boolean to calculate coverage information or not.
234
    :param timeout:       Time in seconds to wait for the test to complete
235
                          before killing it. Floats are allowed for units
236
                          smaller than a second.
237
    :return:              Returns a tuple with (failed_tests, message).
238
    """
239
    result, stdout = execute_python_file(filename,
240
                                         ignored_files,
241
                                         cover=cover,
242
                                         timeout=timeout,
243
                                         verbose=verbose)
244
    return result, stdout
245
246
247
def print_test_results(test_file, test_nr, test_count, message):
248
    basename = os.path.splitext(os.path.basename(test_file))[0]
249
    digits = str(len(str(test_count)))
250
251
    print((" {:>" + digits + "}/{:<} | {}").format(test_nr,
252
                                                   test_count,
253
                                                   basename))
254
    # Decode and encode the message string while replacing errors so
255
    # unmapped characters can be printed too.
256
    print(message.encode(sys.stdout.encoding, errors="replace")
257
              .decode(sys.stdout.encoding),
258
          end="")
259
    if message:
260
        print("#" * 70)
261
262
263
def get_test_files(testdir, test_only, omit):
264
    """
265
    Searches within a directory for all files which could contain tests. Uses
266
    the `is_eligible_test` function internally to get a list of files.
267
268
    :param testdir:   The directory to search in.
269
    :param test_only: Only accepts tests within the filenames in this list.
270
    :param omit:      Does not use filenames in this list.
271
    :return:          A tuple containing a list of file paths which need to be
272
                      executed and a list of the name of the file (without the
273
                      extension).
274
275
    """
276
    test_files = []
277
    test_file_names = []
278
    for (dirpath, dirnames, filenames) in os.walk(testdir):
279
        for filename in filenames:
280
            if is_eligible_test(filename,
281
                                test_only=test_only,
282
                                omit=omit):
283
                test_files.append(os.path.join(dirpath, filename))
284
                test_file_names.append(
285
                    os.path.splitext(os.path.basename(filename))[0])
286
    return test_files, test_file_names
287
288
289
def run_tests(ignore_list, args, test_files, test_file_names):
290
    failed_tests = 0
291
    if args.cover:
292
        args.cover = delete_coverage()
293
294
    if len(args.test_only) > 0:
295
        (nonexistent_tests,
296
         max_nr) = show_nonexistent_tests(args.test_only,
297
                                          test_file_names)
298
        failed_tests += nonexistent_tests
299
    else:
300
        max_nr = len(test_files)
301
302
    # Sort tests alphabetically.
303
    test_files.sort(key=lambda fl: str.lower(os.path.split(fl)[1]))
304
305
    pool = multiprocessing.Pool(args.jobs)
306
    partial_execute_test = functools.partial(
307
        execute_test,
308
        ignored_files=",".join(ignore_list),
309
        verbose=args.verbose,
310
        cover=args.cover,
311
        timeout=args.timeout)
312
313
    pool_outputs = pool.imap(partial_execute_test, test_files)
314
    curr_nr = 0
315
    for failed, message in pool_outputs:
316
        curr_nr += 1
317
        failed_tests += failed
318
        print_test_results(test_files[curr_nr-1],
319
                           curr_nr,
320
                           max_nr,
321
                           message)
322
323
    print("\nTests finished: failures in {} of {} test modules.".format(
324
        failed_tests,
325
        max_nr))
326
327
    if args.cover:
328
        show_coverage_results(args.html)
329
330
    return failed_tests
331