Failed Conditions
Pull Request — master (#1152)
by Lasse
03:36
created

coalib.tests.run_tests()   C

Complexity

Conditions 7

Size

Total Lines 49

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 7
dl 0
loc 49
rs 5.5
1
import argparse
2
import importlib
3
import inspect
4
import os
5
import subprocess
6
import sys
7
import shutil
8
import webbrowser
9
import multiprocessing
10
import functools
11
import tempfile
12
from coalib.misc.ContextManagers import (suppress_stdout,
13
                                         preserve_sys_path,
14
                                         subprocess_timeout)
15
from coalib.processes.Processing import create_process_group, get_cpu_count
16
17
18
def create_argparser(**kwargs):
19
    parser = argparse.ArgumentParser(**kwargs)
20
    parser.add_argument("-t",
21
                        "--test-only",
22
                        help="Execute only the tests with the "
23
                             "given base name",
24
                        nargs="+")
25
    parser.add_argument("-c",
26
                        "--cover",
27
                        help="Measure code coverage",
28
                        action="store_true")
29
    parser.add_argument("-H",
30
                        "--html",
31
                        help="Generate html code coverage, implies -c",
32
                        action="store_true")
33
    parser.add_argument("-v",
34
                        "--verbose",
35
                        help="More verbose output",
36
                        action="store_true")
37
    parser.add_argument("-o",
38
                        "--omit",
39
                        help="Base names of tests to omit",
40
                        nargs="+")
41
    parser.add_argument("-s",
42
                        "--disallow-test-skipping",
43
                        help="Return nonzero if any tests are skipped "
44
                             "or fail",
45
                        action="store_true")
46
    parser.add_argument("-T",
47
                        "--timeout",
48
                        default=20,
49
                        type=int,
50
                        help="Amount of time to wait for a test to run "
51
                             "before killing it. To not use any timeout, "
52
                             "set this to 0")
53
    parser.add_argument("-j",
54
                        "--jobs",
55
                        default=get_cpu_count(),
56
                        type=int,
57
                        help="Number of jobs to use in parallel.")
58
59
    return parser
60
61
62
def execute_coverage_command(*args):
63
    commands = [sys.executable,
64
                "-m",
65
                "coverage"] + list(args)
66
    return subprocess.call(commands)
67
68
69
def parse_args(parser):
70
    """
71
    Parses the CLI arguments.
72
73
    :param parser: A argparse.ArgumentParser created with the
74
                   create_argparser method of TestHelper module.
75
    :return args:  The parsed arguments.
76
    """
77
    args = parser.parse_args()
78
    args = resolve_implicit_args(args, parser)
79
80
    return args
81
82
83
def resolve_implicit_args(args, parser):
84
    args.cover = args.cover or args.html
85
    if args.omit is not None and args.test_only is not None:
86
        parser.error("Incompatible options: --omit and --test_only")
87
    if args.omit is None:
88
        args.omit = []
89
    if args.test_only is None:
90
        args.test_only = []
91
92
    return args
93
94
95
def is_eligible_test(filename, test_only, omit):
96
    """
97
    Checks if the filename is a Test or not. The conditions are:
98
     - Ends with "Test.py"
99
     - Is not present in `omit`
100
     - If test_only is not empty, it should be present in test_only
101
102
    :param filename:  The filename to check eligibility for.
103
    :param test_only: Only execute files within the filenames in this list.
104
    :param omit:      The filename should not be in this list.
105
    return:           True if the file is eligible to be run as a test,
106
                      else False.
107
    """
108
    if not filename.endswith("Test.py"):
109
        return False
110
    name = os.path.splitext(os.path.basename(filename))[0]
111
    if name in omit:
112
        return False
113
    if (len(test_only) > 0) and (name not in test_only):
114
        return False
115
116
    return True
117
118
119
def delete_coverage(silent=False):
120
    """
121
    Deletes previous coverage data.
122
123
    :return: False if coverage3 cannot be executed.
124
    """
125
    coverage_available = False
126
    with suppress_stdout():
127
        coverage_available = (execute_coverage_command("combine") == 0 and
128
                              execute_coverage_command("erase") == 0)
129
130
    if not coverage_available and not silent:
131
        print("Coverage failed. Falling back to standard unit tests."
132
              "Install code coverage measurement for python3. Package"
133
              "name should be something like: python-coverage3/coverage")
134
135
    return coverage_available
136
137
138
def execute_command_array(command_array, timeout, verbose):
139
    """
140
    Executes the given command array in a subprocess group.
141
142
    :param command_array: The command array to execute.
143
    :param timeout:       Time to wait until killing the process.
144
    :param verbose:       Return the stdout and stderr of the subprocess or
145
                          not.
146
    :return:              A tuple of (result, message) where message gives
147
                          text information of what happened.
148
    """
149
    message = ""
150
    stdout_file = tempfile.TemporaryFile()
151
    p = create_process_group(command_array,
152
                             stdout=stdout_file,
153
                             stderr=subprocess.STDOUT,
154
                             universal_newlines=True)
155
    with subprocess_timeout(p,
156
                            timeout,
157
                            kill_pg=True) as timedout:
158
        retval = p.wait()
159
        timed_out = timedout.value
160
161
    if retval != 0 or verbose:
162
        stdout_file.seek(0)
163
        message += stdout_file.read().decode(sys.stdout.encoding,
164
                                             errors="replace")
165
166
    stdout_file.close()
167
168
    if timed_out:
169
        message += ("This test failed because it was taking more than %f sec "
170
                    "to execute. To change the timeout setting use the `-T` "
171
                    "or `--timeout` argument.\n" % timeout)
172
        return 1, message  # Guaranteed fail, especially on race condition
173
174
    return retval, message
175
176
177
def check_module_skip(filename):
178
    with preserve_sys_path(), suppress_stdout():
179
        module_dir = os.path.dirname(filename)
180
        if module_dir not in sys.path:
181
            sys.path.insert(0, module_dir)
182
183
        try:
184
            module = importlib.import_module(
185
                os.path.basename(os.path.splitext(filename)[0]))
186
187
            for name, obj in inspect.getmembers(module):
188
                if inspect.isfunction(obj) and name == "skip_test":
189
                    return obj()
190
        except ImportError as exception:
191
            return str(exception)
192
193
        return False
194
195
196
def show_coverage_results(html):
197
    execute_coverage_command("combine")
198
    execute_coverage_command("report", "-m")
199
200
    if html:
201
        shutil.rmtree(".htmlreport", ignore_errors=True)
202
        print("Generating HTML report to .htmlreport...")
203
        execute_coverage_command("html", "-d", ".htmlreport")
204
205
        try:
206
            webbrowser.open_new_tab(os.path.join(".htmlreport",
207
                                                 "index.html"))
208
        except webbrowser.Error:
209
            pass
210
211
212
def execute_python_file(filename, ignored_files, cover, timeout, verbose):
213
    if not cover:
214
        return execute_command_array([sys.executable,
215
                                      filename],
216
                                     timeout=timeout,
217
                                     verbose=verbose)
218
219
    return execute_command_array([sys.executable,
220
                                  "-m",
221
                                  "coverage",
222
                                  "run",
223
                                  "-p",  # make it collectable later
224
                                  "--branch",
225
                                  "--omit",
226
                                  ignored_files,
227
                                  filename],
228
                                 timeout=timeout,
229
                                 verbose=verbose)
230
231
232
def show_nonexistent_tests(test_only, test_file_names):
233
    nonexistent_tests = 0
234
    number = len(test_only)
235
    digits = str(len(str(number)))
236
    format_string = (" {:>" + digits + "}/{:<} | {}, Cannot execute: This "
237
                     "test does not exist.")
238
    for test in test_only:
239
        if test not in test_file_names:
240
            nonexistent_tests += 1
241
            print(format_string.format(nonexistent_tests, number, test))
242
243
    return nonexistent_tests, number
244
245
246
def execute_test(filename,
247
                 ignored_files,
248
                 verbose,
249
                 cover,
250
                 timeout):
251
    """
252
    Executes the given test and counts up failed_tests or skipped_tests if
253
    needed.
254
255
    :param filename:      Filename of test to execute.
256
    :param ignored_files: Comma separated list of files to ignore for coverage.
257
    :param verbose:       Boolean to show more information.
258
    :param cover:         Boolean to calculate coverage information or not.
259
    :param timeout:       Time in seconds to wait for the test to complete
260
                          before killing it. Floats are allowed for units
261
                          smaller than a second.
262
    :return:              Returns a tuple with (failed_tests, skipped_tests,
263
                          message).
264
    """
265
    reason = check_module_skip(filename)
266
    if reason is not False:
267
        return 0, 1, reason
268
    else:
269
        result, stdout = execute_python_file(filename,
270
                                             ignored_files,
271
                                             cover=cover,
272
                                             timeout=timeout,
273
                                             verbose=verbose)
274
        return result, 0, stdout
275
276
277
def print_test_results(test_file, test_nr, test_count, skipped, message):
278
    basename = os.path.splitext(os.path.basename(test_file))[0]
279
    digits = str(len(str(test_count)))
280
    if skipped:
281
        print((" {:>" + digits + "}/{:<} | {}, Skipping: {}").format(
282
            test_nr,
283
            test_count,
284
            basename,
285
            message))
286
    else:
287
        print((" {:>" + digits + "}/{:<} | {}").format(test_nr,
288
                                                       test_count,
289
                                                       basename))
290
        # Decode and encode the message string while replacing errors so
291
        # unmapped characters can be printed too.
292
        print(message.encode(sys.stdout.encoding, errors="replace")
293
                  .decode(sys.stdout.encoding),
294
              end="")
295
        if message:
296
            print("#" * 70)
297
298
299
def get_test_files(testdir, test_only, omit):
300
    """
301
    Searches within a directory for all files which could contain tests. Uses
302
    the `is_eligible_test` function internally to get a list of files.
303
304
    :param testdir:   The directory to search in.
305
    :param test_only: Only accepts tests within the filenames in this list.
306
    :param omit:      Does not use filenames in this list.
307
    :return:          A tuple containing a list of file paths which need to be
308
                      executed and a list of the name of the file (without the
309
                      extension).
310
311
    """
312
    test_files = []
313
    test_file_names = []
314
    for (dirpath, dirnames, filenames) in os.walk(testdir):
315
        for filename in filenames:
316
            if is_eligible_test(filename,
317
                                test_only=test_only,
318
                                omit=omit):
319
                test_files.append(os.path.join(dirpath, filename))
320
                test_file_names.append(
321
                    os.path.splitext(os.path.basename(filename))[0])
322
    return test_files, test_file_names
323
324
325
def run_tests(ignore_list, args, test_files, test_file_names):
326
    failed_tests = 0
327
    skipped_tests = 0
328
    if args.cover:
329
        args.cover = delete_coverage()
330
331
    if len(args.test_only) > 0:
332
        (nonexistent_tests,
333
         max_nr) = show_nonexistent_tests(args.test_only,
334
                                          test_file_names)
335
        failed_tests += nonexistent_tests
336
    else:
337
        max_nr = len(test_files)
338
339
    # Sort tests alphabetically.
340
    test_files.sort(key=lambda fl: str.lower(os.path.split(fl)[1]))
341
342
    pool = multiprocessing.Pool(args.jobs)
343
    partial_execute_test = functools.partial(
344
        execute_test,
345
        ignored_files=",".join(ignore_list),
346
        verbose=args.verbose,
347
        cover=args.cover,
348
        timeout=args.timeout)
349
350
    pool_outputs = pool.imap(partial_execute_test, test_files)
351
    curr_nr = 0
352
    for failed, skipped, message in pool_outputs:
353
        curr_nr += 1
354
        failed_tests += failed
355
        skipped_tests += skipped
356
        print_test_results(test_files[curr_nr-1],
357
                           curr_nr,
358
                           max_nr,
359
                           skipped,
360
                           message)
361
362
    print("\nTests finished: failures in {} of {} test modules, skipped "
363
          "{} test modules.".format(failed_tests,
364
                                    max_nr,
365
                                    skipped_tests))
366
367
    if args.cover:
368
        show_coverage_results(args.html)
369
370
    if not args.disallow_test_skipping:
371
        return failed_tests
372
    else:
373
        return failed_tests + skipped_tests
374