Completed
Pull Request — master (#1127)
by Mischa
01:51
created

coalib.tests.processes.ProcessingTestLogPrinter   A

Complexity

Total Complexity 2

Size/Duplication

Total Lines 8
Duplicated Lines 0 %
Metric Value
dl 0
loc 8
rs 10
wmc 2
1
import os
2
import queue
3
import unittest
4
import sys
5
import multiprocessing
6
import platform
7
import re
8
import subprocess
9
from pyprint.ConsolePrinter import ConsolePrinter
10
11
sys.path.insert(0, ".")
12
from coalib.results.HiddenResult import HiddenResult
13
from coalib.results.Result import Result, RESULT_SEVERITY
14
from coalib.settings.ConfigurationGathering import gather_configuration
15
from coalib.output.printers.LogPrinter import LogPrinter
16
from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT
17
from coalib.processes.Processing import (execute_section,
18
                                         process_queues,
19
                                         create_process_group,
20
                                         filter_raising_callables)
21
from coalib.settings.Section import Section
22
from coalib.settings.Setting import Setting
23
24
25
process_group_test_code = """
26
import time, subprocess, os, platform, sys;
27
p=subprocess.Popen([sys.executable,
28
                  "-c",
29
                  "import time; time.sleep(0.1)"]);
30
pgid = p.pid if platform.system() == "Windows" else os.getpgid(p.pid);
31
print(p.pid, pgid)
32
p.terminate()
33
"""
34
35
36
class DummyProcess(multiprocessing.Process):
37
    def __init__(self, control_queue):
38
        multiprocessing.Process.__init__(self)
39
        self.control_queue = control_queue
40
41
    def is_alive(self):
42
        return not self.control_queue.empty()
43
44
45
class ProcessingTestLogPrinter(LogPrinter):
46
    def __init__(self, log_queue):
47
        LogPrinter.__init__(self, self)
48
        self.log_queue = log_queue
49
        self.set_up = False
50
51
    def log_message(self, log_message, timestamp=None, **kwargs):
52
        self.log_queue.put(log_message)
53
54
55
class ProcessingTest(unittest.TestCase):
56
    def setUp(self):
57
        config_path = os.path.abspath(os.path.join(
58
            os.path.dirname(__file__),
59
            "section_executor_test_files",
60
            ".coafile"))
61
        self.testcode_c_path = os.path.join(os.path.dirname(config_path),
62
                                            "testcode.c")
63
64
        self.result_queue = queue.Queue()
65
        self.queue = queue.Queue()
66
        self.log_queue = queue.Queue()
67
        log_printer = LogPrinter(ConsolePrinter())
68
        self.log_printer = ProcessingTestLogPrinter(self.log_queue)
69
70
        (self.sections,
71
         self.local_bears,
72
         self.global_bears,
73
         targets) = gather_configuration(lambda *args: True,
74
                                         log_printer,
75
                                         ["--config", re.escape(config_path)])
76
        self.assertEqual(len(self.local_bears["default"]), 1)
77
        self.assertEqual(len(self.global_bears["default"]), 1)
78
        self.assertEqual(targets, [])
79
80
    def test_run(self):
81
        self.sections['default'].append(Setting('jobs', "1"))
82
        results = execute_section(self.sections["default"],
83
                                  self.global_bears["default"],
84
                                  self.local_bears["default"],
85
                                  lambda *args: self.result_queue.put(args[2]),
86
                                  self.log_printer)
87
        self.assertTrue(results[0])
88
89
        local_results = self.result_queue.get(timeout=0)
90
        global_results = self.result_queue.get(timeout=0)
91
        self.assertTrue(self.result_queue.empty())
92
93
        self.assertEqual(len(local_results), 1)
94
        self.assertEqual(len(global_results), 1)
95
        # Result dict also returned
96
        # One file
97
        self.assertEqual(len(results[1]), 1)
98
        # One global bear
99
        self.assertEqual(len(results[2]), 1)
100
101
        local_result = local_results[0]
102
        global_result = global_results[0]
103
104
        self.assertRegex(repr(local_result),
105
                         "<Result object\\(id={}, origin='LocalTestBear', "
106
                         "affected_code=\\(\\), severity=NORMAL, message='test"
107
                         " msg'\\) at 0x[0-9a-fA-F]+>".format(local_result.id))
108
        self.assertRegex(repr(global_result),
109
                         "<Result object\\(id={}, origin='GlobalTestBear', "
110
                         "affected_code=\\(.*start=.*file=.*section_executor_"
111
                         "test_files.*line=None.*end=.*\\), severity=NORMAL, "
112
                         "message='test message'\\) at "
113
                         "0x[0-9a-fA-F]+>".format(global_result.id))
114
115
    def test_empty_run(self):
116
        self.sections['default'].append(Setting('jobs', "bogus!"))
117
        results = execute_section(self.sections["default"],
118
                                  [],
119
                                  [],
120
                                  lambda *args: self.result_queue.put(args[2]),
121
                                  self.log_printer)
122
        # No results
123
        self.assertFalse(results[0])
124
        # One file
125
        self.assertEqual(len(results[1]), 1)
126
        # No global bear
127
        self.assertEqual(len(results[2]), 0)
128
129
    def test_process_queues(self):
130
        ctrlq = queue.Queue()
131
132
        # Append custom controlling sequences.
133
134
        # Simulated process 1
135
        ctrlq.put((CONTROL_ELEMENT.LOCAL, 1))
136
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
137
        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))
138
139
        # Simulated process 2
140
        ctrlq.put((CONTROL_ELEMENT.LOCAL, 2))
141
142
        # Simulated process 1
143
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
144
145
        # Simulated process 2
146
        ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None))
147
        ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1))
148
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
149
150
        first_local = Result.from_values("o", "The first result.", file="f")
151
        second_local = Result.from_values("ABear",
152
                                          "The second result.",
153
                                          file="f",
154
                                          line=1)
155
        third_local = Result.from_values("ABear",
156
                                          "The second result.",
157
                                          file="f",
158
                                          line=4)
159
        fourth_local = Result.from_values("ABear",
160
                                          "Another result.",
161
                                          file="f",
162
                                          line=7)
163
        first_global = Result("o", "The one and only global result.")
164
        section = Section("")
165
        section.append(Setting('min_severity', "normal"))
166
        process_queues(
167
            [DummyProcess(control_queue=ctrlq) for i in range(3)],
168
            ctrlq,
169
            {1: [first_local,
170
                 second_local,
171
                 third_local,
172
                 # The following are to be ignored
173
                 Result('o', 'm', severity=RESULT_SEVERITY.INFO),
174
                 Result.from_values("ABear", "u", file="f", line=2),
175
                 Result.from_values("ABear", "u", file="f", line=3)],
176
             2: [fourth_local,
177
                 # The following are to be ignored
178
                 HiddenResult("t", "c"),
179
                 Result.from_values("ABear", "u", file="f", line=5),
180
                 Result.from_values("ABear", "u", file="f", line=6)]},
181
            {1: [first_global]},
182
            {"f": ["first line  # stop ignoring, invalid ignore range\n",
183
                   "second line  # ignore all\n",
184
                   "third line\n",
185
                   "fourth line\n",
186
                   "# Start ignoring ABear, BBear and CBear\n",
187
                   "# Stop ignoring\n",
188
                   "seventh"]},
189
            lambda *args: self.queue.put(args[2]),
190
            section,
191
            self.log_printer)
192
193
        self.assertEqual(self.queue.get(timeout=0), ([first_local,
194
                                                      second_local,
195
                                                      third_local]))
196
        self.assertEqual(self.queue.get(timeout=0), ([fourth_local]))
197
        self.assertEqual(self.queue.get(timeout=0), ([first_global]))
198
        self.assertEqual(self.queue.get(timeout=0), ([first_global]))
199
200
        # No valid FINISH element in the queue
201
        ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None))
202
203
        process_queues(
204
            [DummyProcess(control_queue=ctrlq) for i in range(3)],
205
            ctrlq,
206
            {1: "The first result.", 2: "The second result."},
207
            {1: "The one and only global result."},
208
            {},
209
            lambda *args: self.queue.put(args[2]),
210
            Section(""),
211
            self.log_printer)
212
        with self.assertRaises(queue.Empty):
213
            self.queue.get(timeout=0)
214
215
    def test_create_process_group(self):
216
        p = create_process_group([sys.executable,
217
                                  "-c",
218
                                  process_group_test_code],
219
                                 stdout=subprocess.PIPE,
220
                                 stderr=subprocess.PIPE)
221
        retval = p.wait()
222
        if retval != 0:
223
            for line in p.stderr:
224
                print(line, end='')
225
            raise Exception("Subprocess did not exit correctly")
226
        output = [i for i in p.stdout]
227
        p.stderr.close()
228
        p.stdout.close()
229
        pid, pgid = [int(i.strip()) for i_out in output for i in i_out.split()]
230
        if platform.system() != "Windows":
231
            # There is no way of testing this on windows with the current
232
            # python modules subprocess and os
233
            self.assertEqual(p.pid, pgid)
234
235
    def test_filter_raising_callables(self):
236
        class A(Exception):
237
            pass
238
        class B(Exception):
239
            pass
240
        class C(Exception):
241
            pass
242
243
        def create_exception_raiser(exception):
244
            def raiser(exc):
245
                if exception in exc:
246
                    raise exception
247
                return exception
248
            return raiser
249
250
        raiseA, raiseB, raiseC = (create_exception_raiser(exc)
251
                                  for exc in [A, B, C])
252
253
        test_list = [raiseA, raiseC, raiseB, raiseC]
254
        self.assertEqual(list(filter_raising_callables(test_list, A, (A,))),
255
                         [C, B, C])
256
257
        self.assertEqual(list(filter_raising_callables(test_list,
258
                                                       (B, C),
259
                                                       exc=(B, C))),
260
                         [A])
261
262
        # Test whether non filtered exceptions bubble up.
263
        with self.assertRaises(B):
264
            list(filter_raising_callables(test_list, C, exc=(B, C)))
265
266
267
if __name__ == '__main__':
268
    unittest.main(verbosity=2)
269