Failed Conditions
Pull Request — master (#1990)
by Mischa
01:34
created

TestAction   A

Complexity

Total Complexity 1

Size/Duplication

Total Lines 4
Duplicated Lines 100 %
Metric Value
dl 4
loc 4
rs 10
wmc 1

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
import multiprocessing
2
import os
3
import platform
4
import queue
5
import re
6
import subprocess
7
import sys
8
import unittest
9
10
from pyprint.ConsolePrinter import ConsolePrinter
11
12
from coalib.output.printers.LogPrinter import LogPrinter
13
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
14
from coalib.processes.Processing import (
15
    ACTIONS, autoapply_actions, check_result_ignore, create_process_group,
16
    execute_section, filter_raising_callables, get_default_actions,
17
    get_file_dict, print_result, process_queues, simplify_section_result,
18
    yield_ignore_ranges)
19
from coalib.results.HiddenResult import HiddenResult
20
from coalib.results.Result import RESULT_SEVERITY, Result
21
from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction
22
from coalib.results.result_actions.PrintDebugMessageAction import (
23
    PrintDebugMessageAction)
24
from coalib.results.result_actions.ResultAction import ResultAction
25
from coalib.results.SourceRange import SourceRange
26
from coalib.settings.ConfigurationGathering import gather_configuration
27
from coalib.settings.Section import Section
28
from coalib.settings.Setting import Setting
29
30
31
process_group_test_code = """
32
import time, subprocess, os, platform, sys;
33
p=subprocess.Popen([sys.executable,
34
                  "-c",
35
                  "import time; time.sleep(0.1)"]);
36
pgid = p.pid if platform.system() == "Windows" else os.getpgid(p.pid);
37
print(p.pid, pgid)
38
p.terminate()
39
"""
40
41
42
class DummyProcess(multiprocessing.Process):
43
44
    def __init__(self, control_queue, starts_dead=False):
45
        multiprocessing.Process.__init__(self)
46
        self.control_queue = control_queue
47
        self.starts_dead = starts_dead
48
49
    def is_alive(self):
50
        return not self.control_queue.empty() and not self.starts_dead
51
52
53
class ProcessingTestLogPrinter(LogPrinter):
54
55
    def __init__(self, log_queue):
56
        LogPrinter.__init__(self, self)
57
        self.log_queue = log_queue
58
        self.set_up = False
59
60
    def log_message(self, log_message, timestamp=None, **kwargs):
61
        self.log_queue.put(log_message)
62
63
64
class ProcessingTest(unittest.TestCase):
65
66
    def setUp(self):
67
        config_path = os.path.abspath(os.path.join(
68
            os.path.dirname(__file__),
69
            "section_executor_test_files",
70
            ".coafile"))
71
        self.testcode_c_path = os.path.join(os.path.dirname(config_path),
72
                                            "testcode.c")
73
74
        self.result_queue = queue.Queue()
75
        self.queue = queue.Queue()
76
        self.log_queue = queue.Queue()
77
        log_printer = LogPrinter(ConsolePrinter())
78
        self.log_printer = ProcessingTestLogPrinter(self.log_queue)
79
80
        (self.sections,
81
         self.local_bears,
82
         self.global_bears,
83
         targets) = gather_configuration(lambda *args: True,
84
                                         log_printer,
85
                                         arg_list=["--config",
86
                                                   re.escape(config_path)])
87
        self.assertEqual(len(self.local_bears["default"]), 1)
88
        self.assertEqual(len(self.global_bears["default"]), 1)
89
        self.assertEqual(targets, [])
90
91
    def test_run(self):
92
        self.sections['default'].append(Setting('jobs', "1"))
93
        results = execute_section(self.sections["default"],
94
                                  self.global_bears["default"],
95
                                  self.local_bears["default"],
96
                                  lambda *args: self.result_queue.put(args[2]),
97
                                  self.log_printer)
98
        self.assertTrue(results[0])
99
100
        local_results = self.result_queue.get(timeout=0)
101
        global_results = self.result_queue.get(timeout=0)
102
        self.assertTrue(self.result_queue.empty())
103
104
        self.assertEqual(len(local_results), 1)
105
        self.assertEqual(len(global_results), 1)
106
        # Result dict also returned
107
        # One file
108
        self.assertEqual(len(results[1]), 1)
109
        # One global bear
110
        self.assertEqual(len(results[2]), 1)
111
112
        local_result = local_results[0]
113
        global_result = global_results[0]
114
115
        self.assertRegex(repr(local_result),
116
                         "<Result object\\(id={}, origin='LocalTestBear', "
117
                         "affected_code=\\(\\), severity=NORMAL, message='test"
118
                         " msg'\\) at 0x[0-9a-fA-F]+>".format(
119
                             hex(local_result.id)))
120
        self.assertRegex(repr(global_result),
121
                         "<Result object\\(id={}, origin='GlobalTestBear', "
122
                         "affected_code=\\(.*start=.*file=.*section_executor_"
123
                         "test_files.*line=None.*end=.*\\), severity=NORMAL, "
124
                         "message='test message'\\) at "
125
                         "0x[0-9a-fA-F]+>".format(hex(global_result.id)))
126
127
    def test_empty_run(self):
128
        self.sections['default'].append(Setting('jobs', "bogus!"))
129
        results = execute_section(self.sections["default"],
130
                                  [],
131
                                  [],
132
                                  lambda *args: self.result_queue.put(args[2]),
133
                                  self.log_printer)
134
        # No results
135
        self.assertFalse(results[0])
136
        # One file
137
        self.assertEqual(len(results[1]), 1)
138
        # No global bear
139
        self.assertEqual(len(results[2]), 0)
140
141
    def test_process_queues(self):
142
        ctrlq = queue.Queue()
143
144
        # Append custom controlling sequences.
145
146
        # Simulated process 1
147
        ctrlq.put((CONTROL_ELEMENT.LOCAL, 1))
148
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
149
        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))
150
151
        # Simulated process 2
152
        ctrlq.put((CONTROL_ELEMENT.LOCAL, 2))
153
154
        # Simulated process 1
155
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
156
157
        # Simulated process 2
158
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
159
        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))
160
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
161
162
        first_local = Result.from_values("o", "The first result.", file="f")
163
        second_local = Result.from_values("ABear",
164
                                          "The second result.",
165
                                          file="f",
166
                                          line=1)
167
        third_local = Result.from_values("ABear",
168
                                         "The second result.",
169
                                         file="f",
170
                                         line=4)
171
        fourth_local = Result.from_values("ABear",
172
                                          "Another result.",
173
                                          file="f",
174
                                          line=7)
175
        first_global = Result("o", "The one and only global result.")
176
        section = Section("")
177
        section.append(Setting('min_severity', "normal"))
178
        process_queues(
179
            [DummyProcess(control_queue=ctrlq) for i in range(3)],
180
            ctrlq,
181
            {1: [first_local,
182
                 second_local,
183
                 third_local,
184
                 # The following are to be ignored
185
                 Result('o', 'm', severity=RESULT_SEVERITY.INFO),
186
                 Result.from_values("ABear", "u", "f", 2, 1),
187
                 Result.from_values("ABear", "u", "f", 3, 1)],
188
             2: [fourth_local,
189
                 # The following are to be ignored
190
                 HiddenResult("t", "c"),
191
                 Result.from_values("ABear", "u", "f", 5, 1),
192
                 Result.from_values("ABear", "u", "f", 6, 1)]},
193
            {1: [first_global]},
194
            {"f": ["first line  # stop ignoring, invalid ignore range\n",
195
                   "second line  # ignore all\n",
196
                   "third line\n",
197
                   "fourth line\n",
198
                   "# Start ignoring ABear, BBear and CBear\n",
199
                   "# Stop ignoring\n",
200
                   "seventh"]},
201
            lambda *args: self.queue.put(args[2]),
202
            section,
203
            self.log_printer)
204
205
        self.assertEqual(self.queue.get(timeout=0), ([first_local,
206
                                                      second_local,
207
                                                      third_local]))
208
        self.assertEqual(self.queue.get(timeout=0), ([fourth_local]))
209
        self.assertEqual(self.queue.get(timeout=0), ([first_global]))
210
        self.assertEqual(self.queue.get(timeout=0), ([first_global]))
211
212
    def test_dead_processes(self):
213
        ctrlq = queue.Queue()
214
        # Not enough FINISH elements in the queue, processes start already dead
215
        # Also queue elements are reversed
216
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
217
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
218
219
        process_queues(
220
            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],
221
            ctrlq, {}, {}, {},
222
            lambda *args: self.queue.put(args[2]),
223
            Section(""),
224
            self.log_printer)
225
        with self.assertRaises(queue.Empty):
226
            self.queue.get(timeout=0)
227
228
        # Not enough FINISH elements in the queue, processes start already dead
229
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
230
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
231
232
        process_queues(
233
            [DummyProcess(ctrlq, starts_dead=True) for i in range(3)],
234
            ctrlq, {}, {}, {},
235
            lambda *args: self.queue.put(args[2]),
236
            Section(""),
237
            self.log_printer)
238
        with self.assertRaises(queue.Empty):
239
            self.queue.get(timeout=0)
240
241
    def test_create_process_group(self):
242
        p = create_process_group([sys.executable,
243
                                  "-c",
244
                                  process_group_test_code],
245
                                 stdout=subprocess.PIPE,
246
                                 stderr=subprocess.PIPE)
247
        retval = p.wait()
248
        if retval != 0:
249
            for line in p.stderr:
250
                print(line, end='')
251
            raise Exception("Subprocess did not exit correctly")
252
        output = [i for i in p.stdout]
253
        p.stderr.close()
254
        p.stdout.close()
255
        pid, pgid = [int(i.strip()) for i_out in output for i in i_out.split()]
256
        if platform.system() != "Windows":
257
            # There is no way of testing this on windows with the current
258
            # python modules subprocess and os
259
            self.assertEqual(p.pid, pgid)
260
261
    def test_filter_raising_callables(self):
262
        class A(Exception):
263
            pass
264
265
        class B(Exception):
266
            pass
267
268
        class C(Exception):
269
            pass
270
271
        def create_exception_raiser(exception):
272
            def raiser(exc):
273
                if exception in exc:
274
                    raise exception
275
                return exception
276
            return raiser
277
278
        raiseA, raiseB, raiseC = (create_exception_raiser(exc)
279
                                  for exc in [A, B, C])
280
281
        test_list = [raiseA, raiseC, raiseB, raiseC]
282
        self.assertEqual(list(filter_raising_callables(test_list, A, (A,))),
283
                         [C, B, C])
284
285
        self.assertEqual(list(filter_raising_callables(test_list,
286
                                                       (B, C),
287
                                                       exc=(B, C))),
288
                         [A])
289
290
        # Test whether non filtered exceptions bubble up.
291
        with self.assertRaises(B):
292
            list(filter_raising_callables(test_list, C, exc=(B, C)))
293
294
    def test_get_file_dict(self):
295
        file_dict = get_file_dict([self.testcode_c_path], self.log_printer)
296
        self.assertEqual(len(file_dict), 1)
297
        self.assertEqual(type(file_dict[self.testcode_c_path]),
298
                         tuple,
299
                         msg="files in file_dict should not be editable")
300
301
        # Non existent file
302
        file_dict = get_file_dict(["non_existent_file"], self.log_printer)
303
        self.assertEqual(file_dict, {})
304
        self.assertIn(("Failed to read file 'non_existent_file' because of "
305
                       "an unknown error."),
306
                      self.log_printer.log_queue.get().message)
307
308
    def test_simplify_section_result(self):
309
        results = (True,
310
                   {"file1": [Result("a", "b")], "file2": None},
311
                   {"file3": [Result("a", "c")]},
312
                   None)
313
        yielded, yielded_unfixed, all_results = simplify_section_result(results)
314
        self.assertEqual(yielded, True)
315
        self.assertEqual(yielded_unfixed, True)
316
        self.assertEqual(len(all_results), 2)
317
318
    def test_ignore_results(self):
319
        ranges = [([], SourceRange.from_values("f", 1, 1, 2, 2))]
320
        result = Result.from_values("origin",
321
                                    "message",
322
                                    file="e",
323
                                    line=1,
324
                                    column=1,
325
                                    end_line=2,
326
                                    end_column=2)
327
328
        self.assertFalse(check_result_ignore(result, ranges))
329
330
        ranges.append(([], SourceRange.from_values("e", 2, 3, 3, 3)))
331
        self.assertFalse(check_result_ignore(result, ranges))
332
333
        ranges.append(([], SourceRange.from_values("e", 1, 1, 2, 2)))
334
        self.assertTrue(check_result_ignore(result, ranges))
335
336
        result1 = Result.from_values("origin", "message", file="e")
337
        self.assertFalse(check_result_ignore(result1, ranges))
338
339
        ranges = [(['something', 'else', 'not origin'],
340
                   SourceRange.from_values("e", 1, 1, 2, 2))]
341
        self.assertFalse(check_result_ignore(result, ranges))
342
343
        ranges = [(['something', 'else', 'origin'],
344
                   SourceRange.from_values("e", 1, 1, 2, 2))]
345
        self.assertTrue(check_result_ignore(result, ranges))
346
347
    def test_ignore_glob(self):
348
        result = Result.from_values("LineLengthBear",
349
                                    "message",
350
                                    file="d",
351
                                    line=1,
352
                                    column=1,
353
                                    end_line=2,
354
                                    end_column=2)
355
        ranges = [(["(line*|space*)", "py*"],
356
                   SourceRange.from_values("d", 1, 1, 2, 2))]
357
        self.assertTrue(check_result_ignore(result, ranges))
358
359
        result = Result.from_values("SpaceConsistencyBear",
360
                                    "message",
361
                                    file="d",
362
                                    line=1,
363
                                    column=1,
364
                                    end_line=2,
365
                                    end_column=2)
366
        ranges = [(["(line*|space*)", "py*"],
367
                   SourceRange.from_values("d", 1, 1, 2, 2))]
368
        self.assertTrue(check_result_ignore(result, ranges))
369
370
        result = Result.from_values("XMLBear",
371
                                    "message",
372
                                    file="d",
373
                                    line=1,
374
                                    column=1,
375
                                    end_line=2,
376
                                    end_column=2)
377
        ranges = [(["(line*|space*)", "py*"],
378
                   SourceRange.from_values("d", 1, 1, 2, 2))]
379
        self.assertFalse(check_result_ignore(result, ranges))
380
381
    def test_yield_ignore_ranges(self):
382
        test_file_dict_a = {'f':
383
                            ('# Ignore aBear\n',
384
                             'a_string = "This string should be ignored"\n')}
385
        test_ignore_range_a = list(yield_ignore_ranges(test_file_dict_a))
386
        for test_bears, test_source_range in test_ignore_range_a:
387
            self.assertEqual(test_bears, ['abear'])
388
            self.assertEqual(test_source_range.start.line, 1)
389
            self.assertEqual(test_source_range.start.column, 1)
390
            self.assertEqual(test_source_range.end.line, 2)
391
            self.assertEqual(test_source_range.end.column, 43)
392
393
        test_file_dict_b = {'f':
394
                            ('# start Ignoring bBear\n',
395
                             'b_string = "This string should be ignored"\n',
396
                             '# stop ignoring\n')}
397
        test_ignore_range_b = list(yield_ignore_ranges(test_file_dict_b))
398
        for test_bears, test_source_range in test_ignore_range_b:
399
            self.assertEqual(test_bears, ['bbear'])
400
            self.assertEqual(test_source_range.start.line, 1)
401
            self.assertEqual(test_source_range.start.column, 1)
402
            self.assertEqual(test_source_range.end.line, 3)
403
            self.assertEqual(test_source_range.end.column, 16)
404
405
        test_file_dict_c = {'f':
406
                            ('# Start ignoring cBear\n',
407
                             '# Stop ignoring cBear This & prev ignored\n')}
408
        test_ignore_range_c = list(yield_ignore_ranges(test_file_dict_c))
409
        for test_bears, test_source_range in test_ignore_range_c:
410
            self.assertEqual(test_bears, ['cbear'])
411
            self.assertEqual(test_source_range.start.line, 1)
412
            self.assertEqual(test_source_range.start.column, 1)
413
            self.assertEqual(test_source_range.end.line, 2)
414
            self.assertEqual(test_source_range.end.column, 42)
415
416
417
class ProcessingTest_GetDefaultActions(unittest.TestCase):
418
419
    def setUp(self):
420
        self.section = Section("X")
421
422
    def test_no_key(self):
423
        self.assertEqual(get_default_actions(self.section), ({}, {}))
424
425
    def test_no_value(self):
426
        self.section.append(Setting("default_actions", ""))
427
        self.assertEqual(get_default_actions(self.section), ({}, {}))
428
429
    def test_only_valid_actions(self):
430
        self.section.append(Setting(
431
            "default_actions",
432
            "MyBear: PrintDebugMessageAction, ValidBear: ApplyPatchAction"))
433
        self.assertEqual(
434
            get_default_actions(self.section),
435
            ({"MyBear": PrintDebugMessageAction,
436
              "ValidBear": ApplyPatchAction},
437
             {}))
438
439
    def test_valid_and_invalid_actions(self):
440
        self.section.append(Setting(
441
            "default_actions",
442
            "MyBear: INVALID_action, ValidBear: ApplyPatchAction, XBear: ABC"))
443
        self.assertEqual(get_default_actions(self.section),
444
                         ({"ValidBear": ApplyPatchAction},
445
                          {"MyBear": "INVALID_action", "XBear": "ABC"}))
446
447
448
class ProcessingTest_AutoapplyActions(unittest.TestCase):
449
450
    def setUp(self):
451
        self.log_queue = queue.Queue()
452
        self.log_printer = ProcessingTestLogPrinter(self.log_queue)
453
454
        self.resultY = Result("YBear", "msg1")
455
        self.resultZ = Result("ZBear", "msg2")
456
        self.results = [self.resultY, self.resultZ]
457
        self.section = Section("A")
458
459
    def test_no_default_actions(self):
460
        ret = autoapply_actions(self.results,
461
                                {},
462
                                {},
463
                                self.section,
464
                                self.log_printer)
465
        self.assertEqual(ret, self.results)
466
        self.assertTrue(self.log_queue.empty())
467
468
    def test_with_invalid_action(self):
469
        self.section.append(Setting("default_actions",
470
                                    "XBear: nonSENSE_action"))
471
        ret = autoapply_actions(self.results,
472
                                {},
473
                                {},
474
                                self.section,
475
                                self.log_printer)
476
        self.assertEqual(ret, self.results)
477
        self.assertEqual(self.log_queue.get().message,
478
                         "Selected default action 'nonSENSE_action' for bear "
479
                         "'XBear' does not exist. Ignoring action.")
480
        self.assertTrue(self.log_queue.empty())
481
482
    def test_without_default_action_and_unapplicable(self):
483
        # Use a result where no default action is supplied for and another one
484
        # where the action is not applicable.
485
        old_is_applicable = ApplyPatchAction.is_applicable
486
        ApplyPatchAction.is_applicable = lambda *args: False
487
488
        self.section.append(Setting(
489
            "default_actions",
490
            "NoBear: ApplyPatchAction, YBear: ApplyPatchAction"))
491
        ret = autoapply_actions(self.results,
492
                                {},
493
                                {},
494
                                self.section,
495
                                self.log_printer)
496
        self.assertEqual(ret, self.results)
497
        self.assertEqual(self.log_queue.get().message,
498
                         "Selected default action 'ApplyPatchAction' for bear "
499
                         "'YBear' is not applicable. Action not applied.")
500
        self.assertTrue(self.log_queue.empty())
501
502
        ApplyPatchAction.is_applicable = old_is_applicable
503
504 View Code Duplication
    def test_applicable_action(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
505
        # Use a result whose action can be successfully applied.
506
        log_printer = self.log_printer
507
508
        class TestAction(ResultAction):
509
510
            def apply(self, *args, **kwargs):
511
                log_printer.debug("ACTION APPLIED SUCCESSFULLY.")
512
513
        ACTIONS.append(TestAction)
514
515
        self.section.append(Setting("default_actions", "ZBear: TestAction"))
516
        ret = autoapply_actions(self.results,
517
                                {},
518
                                {},
519
                                self.section,
520
                                log_printer)
521
        self.assertEqual(ret, [self.resultY])
522
        self.assertEqual(self.log_queue.get().message,
523
                         "ACTION APPLIED SUCCESSFULLY.")
524
        self.assertEqual(self.log_queue.get().message,
525
                         "Applied 'TestAction' "
526
                         "on the whole project from 'ZBear'.")
527
        self.assertTrue(self.log_queue.empty())
528
529
        ACTIONS.pop()
530
531 View Code Duplication
    def test_failing_action(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
532
        class FailingTestAction(ResultAction):
533
534
            def apply(self, *args, **kwargs):
535
                raise RuntimeError("YEAH THAT'S A FAILING BEAR")
536
537
        ACTIONS.append(FailingTestAction)
538
539
        self.section.append(Setting("default_actions",
540
                                    "YBear: FailingTestAction"))
541
        ret = autoapply_actions(self.results,
542
                                {},
543
                                {},
544
                                self.section,
545
                                self.log_printer)
546
        self.assertEqual(ret, self.results)
547
        self.assertEqual(self.log_queue.get().message,
548
                         "Failed to execute action 'FailingTestAction'"
549
                         " with error: YEAH THAT'S A FAILING BEAR.")
550
        self.assertIn("YEAH THAT'S A FAILING BEAR",
551
                      self.log_queue.get().message)
552
        self.assertEqual(self.log_queue.get().message,
553
                         "-> for result " + repr(self.resultY) + ".")
554
        self.assertTrue(self.log_queue.empty())
555
556
        ACTIONS.pop()
557
558
559
class ProcessingTest_PrintResult(unittest.TestCase):
560
561
    def setUp(self):
562
        self.section = Section('name')
563
        self.log_printer = LogPrinter(ConsolePrinter(), log_level=0)
564
565
    def test_autoapply_override(self):
566
        """
567
        Tests that the default_actions aren't automatically applied when the
568
        autoapply setting overrides that.
569
        """
570
        self.section.append(Setting('default_actions',
571
                                    'somebear: PrintDebugMessageAction'))
572
573
        # Verify that it would apply the action, i.e. remove the result
574
        results = [5, HiddenResult('origin', []),
575
                   Result('somebear', 'message', debug_msg='debug')]
576
        retval, newres = print_result(results, {}, 0, lambda *args: None,
577
                                      self.section, self.log_printer, {}, [])
578
        self.assertEqual(newres, [])
579
580
        # Override and verify that result is unprocessed, i.e. not gone
581
        self.section.append(Setting('autoapply', 'false'))
582
        retval, newres = print_result(results, {}, 0, lambda *args: None,
583
                                      self.section, self.log_printer, {}, [])
584
        self.assertNotEqual(newres, [])
585