Failed Conditions
Pull Request — master (#1182)
by Lasse
01:53
created

coalib.tests.check_module_skip()   B

Complexity

Conditions 7

Size

Total Lines 17

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 7
dl 0
loc 17
rs 7.3333
1
import argparse
2
import os
3
import subprocess
4
import sys
5
import shutil
6
import webbrowser
7
import multiprocessing
8
import functools
9
import tempfile
10
11
from coalib.misc.ContextManagers import suppress_stdout, subprocess_timeout
12
from coalib.processes.Processing import create_process_group, get_cpu_count
13
14
15
def create_argparser(**kwargs):
16
    parser = argparse.ArgumentParser(**kwargs)
17
    parser.add_argument("-t",
18
                        "--test-only",
19
                        help="Execute only the tests with the "
20
                             "given base name",
21
                        nargs="+")
22
    parser.add_argument("-c",
23
                        "--cover",
24
                        help="Measure code coverage",
25
                        action="store_true")
26
    parser.add_argument("-H",
27
                        "--html",
28
                        help="Generate html code coverage, implies -c",
29
                        action="store_true")
30
    parser.add_argument("-v",
31
                        "--verbose",
32
                        help="More verbose output",
33
                        action="store_true")
34
    parser.add_argument("-o",
35
                        "--omit",
36
                        help="Base names of tests to omit",
37
                        nargs="+")
38
    parser.add_argument("-T",
39
                        "--timeout",
40
                        default=20,
41
                        type=int,
42
                        help="Amount of time to wait for a test to run "
43
                             "before killing it. To not use any timeout, "
44
                             "set this to 0")
45
    parser.add_argument("-j",
46
                        "--jobs",
47
                        default=get_cpu_count(),
48
                        type=int,
49
                        help="Number of jobs to use in parallel.")
50
51
    return parser
52
53
54
def execute_coverage_command(*args):
55
    commands = [sys.executable,
56
                "-m",
57
                "coverage"] + list(args)
58
    return subprocess.call(commands)
59
60
61
def parse_args(parser):
62
    """
63
    Parses the CLI arguments.
64
65
    :param parser: A argparse.ArgumentParser created with the
66
                   create_argparser method of TestHelper module.
67
    :return args:  The parsed arguments.
68
    """
69
    args = parser.parse_args()
70
    args = resolve_implicit_args(args, parser)
71
72
    return args
73
74
75
def resolve_implicit_args(args, parser):
76
    args.cover = args.cover or args.html
77
    if args.omit is not None and args.test_only is not None:
78
        parser.error("Incompatible options: --omit and --test_only")
79
    if args.omit is None:
80
        args.omit = []
81
    if args.test_only is None:
82
        args.test_only = []
83
84
    return args
85
86
87
def is_eligible_test(filename, test_only, omit):
88
    """
89
    Checks if the filename is a Test or not. The conditions are:
90
     - Ends with "Test.py"
91
     - Is not present in `omit`
92
     - If test_only is not empty, it should be present in test_only
93
94
    :param filename:  The filename to check eligibility for.
95
    :param test_only: Only execute files within the filenames in this list.
96
    :param omit:      The filename should not be in this list.
97
    return:           True if the file is eligible to be run as a test,
98
                      else False.
99
    """
100
    if not filename.endswith("Test.py"):
101
        return False
102
    name = os.path.splitext(os.path.basename(filename))[0]
103
    if name in omit:
104
        return False
105
    if (len(test_only) > 0) and (name not in test_only):
106
        return False
107
108
    return True
109
110
111
def delete_coverage(silent=False):
112
    """
113
    Deletes previous coverage data.
114
115
    :return: False if coverage3 cannot be executed.
116
    """
117
    coverage_available = False
118
    with suppress_stdout():
119
        coverage_available = (execute_coverage_command("combine") == 0 and
120
                              execute_coverage_command("erase") == 0)
121
122
    if not coverage_available and not silent:
123
        print("Coverage failed. Falling back to standard unit tests."
124
              "Install code coverage measurement for python3. Package"
125
              "name should be something like: python-coverage3/coverage")
126
127
    return coverage_available
128
129
130
def execute_command_array(command_array, timeout, verbose):
131
    """
132
    Executes the given command array in a subprocess group.
133
134
    :param command_array: The command array to execute.
135
    :param timeout:       Time to wait until killing the process.
136
    :param verbose:       Return the stdout and stderr of the subprocess or
137
                          not.
138
    :return:              A tuple of (result, message) where message gives
139
                          text information of what happened.
140
    """
141
    message = ""
142
    stdout_file = tempfile.TemporaryFile()
143
    p = create_process_group(command_array,
144
                             stdout=stdout_file,
145
                             stderr=subprocess.STDOUT,
146
                             universal_newlines=True)
147
    with subprocess_timeout(p,
148
                            timeout,
149
                            kill_pg=True) as timedout:
150
        retval = p.wait()
151
        timed_out = timedout.value
152
153
    if retval != 0 or verbose:
154
        stdout_file.seek(0)
155
        message += stdout_file.read().decode(sys.stdout.encoding,
156
                                             errors="replace")
157
158
    stdout_file.close()
159
160
    if timed_out:
161
        message += ("This test failed because it was taking more than %f sec "
162
                    "to execute. To change the timeout setting use the `-T` "
163
                    "or `--timeout` argument.\n" % timeout)
164
        return 1, message  # Guaranteed fail, especially on race condition
165
166
    return retval, message
167
168
169
def show_coverage_results(html):
170
    execute_coverage_command("combine")
171
    execute_coverage_command("report", "-m")
172
173
    if html:
174
        shutil.rmtree(".htmlreport", ignore_errors=True)
175
        print("Generating HTML report to .htmlreport...")
176
        execute_coverage_command("html", "-d", ".htmlreport")
177
178
        try:
179
            webbrowser.open_new_tab(os.path.join(".htmlreport",
180
                                                 "index.html"))
181
        except webbrowser.Error:
182
            pass
183
184
185
def execute_python_file(filename, ignored_files, cover, timeout, verbose):
186
    if not cover:
187
        return execute_command_array([sys.executable,
188
                                      filename],
189
                                     timeout=timeout,
190
                                     verbose=verbose)
191
192
    return execute_command_array([sys.executable,
193
                                  "-m",
194
                                  "coverage",
195
                                  "run",
196
                                  "-p",  # make it collectable later
197
                                  "--branch",
198
                                  "--omit",
199
                                  ignored_files,
200
                                  filename],
201
                                 timeout=timeout,
202
                                 verbose=verbose)
203
204
205
def show_nonexistent_tests(test_only, test_file_names):
206
    nonexistent_tests = 0
207
    number = len(test_only)
208
    digits = str(len(str(number)))
209
    format_string = (" {:>" + digits + "}/{:<} | {}, Cannot execute: This "
210
                     "test does not exist.")
211
    for test in test_only:
212
        if test not in test_file_names:
213
            nonexistent_tests += 1
214
            print(format_string.format(nonexistent_tests, number, test))
215
216
    return nonexistent_tests, number
217
218
219
def execute_test(filename,
220
                 ignored_files,
221
                 verbose,
222
                 cover,
223
                 timeout):
224
    """
225
    Executes the given test and counts up failed_tests if needed.
226
227
    :param filename:      Filename of test to execute.
228
    :param ignored_files: Comma separated list of files to ignore for coverage.
229
    :param verbose:       Boolean to show more information.
230
    :param cover:         Boolean to calculate coverage information or not.
231
    :param timeout:       Time in seconds to wait for the test to complete
232
                          before killing it. Floats are allowed for units
233
                          smaller than a second.
234
    :return:              Returns a tuple with (failed_tests, message).
235
    """
236
    result, stdout = execute_python_file(filename,
237
                                         ignored_files,
238
                                         cover=cover,
239
                                         timeout=timeout,
240
                                         verbose=verbose)
241
    return result, stdout
242
243
244
def print_test_results(test_file, test_nr, test_count, message):
245
    basename = os.path.splitext(os.path.basename(test_file))[0]
246
    digits = str(len(str(test_count)))
247
248
    print((" {:>" + digits + "}/{:<} | {}").format(test_nr,
249
                                                   test_count,
250
                                                   basename))
251
    # Decode and encode the message string while replacing errors so
252
    # unmapped characters can be printed too.
253
    print(message.encode(sys.stdout.encoding, errors="replace")
254
              .decode(sys.stdout.encoding),
255
          end="")
256
    if message:
257
        print("#" * 70)
258
259
260
def get_test_files(testdir, test_only, omit):
261
    """
262
    Searches within a directory for all files which could contain tests. Uses
263
    the `is_eligible_test` function internally to get a list of files.
264
265
    :param testdir:   The directory to search in.
266
    :param test_only: Only accepts tests within the filenames in this list.
267
    :param omit:      Does not use filenames in this list.
268
    :return:          A tuple containing a list of file paths which need to be
269
                      executed and a list of the name of the file (without the
270
                      extension).
271
272
    """
273
    test_files = []
274
    test_file_names = []
275
    for (dirpath, dirnames, filenames) in os.walk(testdir):
276
        for filename in filenames:
277
            if is_eligible_test(filename,
278
                                test_only=test_only,
279
                                omit=omit):
280
                test_files.append(os.path.join(dirpath, filename))
281
                test_file_names.append(
282
                    os.path.splitext(os.path.basename(filename))[0])
283
    return test_files, test_file_names
284
285
286
def run_tests(ignore_list, args, test_files, test_file_names):
287
    failed_tests = 0
288
    if args.cover:
289
        args.cover = delete_coverage()
290
291
    if len(args.test_only) > 0:
292
        (nonexistent_tests,
293
         max_nr) = show_nonexistent_tests(args.test_only,
294
                                          test_file_names)
295
        failed_tests += nonexistent_tests
296
    else:
297
        max_nr = len(test_files)
298
299
    # Sort tests alphabetically.
300
    test_files.sort(key=lambda fl: str.lower(os.path.split(fl)[1]))
301
302
    pool = multiprocessing.Pool(args.jobs)
303
    partial_execute_test = functools.partial(
304
        execute_test,
305
        ignored_files=",".join(ignore_list),
306
        verbose=args.verbose,
307
        cover=args.cover,
308
        timeout=args.timeout)
309
310
    pool_outputs = pool.imap(partial_execute_test, test_files)
311
    curr_nr = 0
312
    for failed, message in pool_outputs:
313
        curr_nr += 1
314
        failed_tests += failed
315
        print_test_results(test_files[curr_nr-1],
316
                           curr_nr,
317
                           max_nr,
318
                           message)
319
320
    print("\nTests finished: failures in {} of {} test modules.".format(
321
        failed_tests,
322
        max_nr))
323
324
    if args.cover:
325
        show_coverage_results(args.html)
326
327
    return failed_tests
328