1 | import multiprocessing |
||
2 | import os |
||
3 | import platform |
||
4 | import queue |
||
5 | import re |
||
6 | import subprocess |
||
7 | import sys |
||
8 | import unittest |
||
9 | |||
10 | from pyprint.ConsolePrinter import ConsolePrinter |
||
11 | |||
12 | from coalib.output.printers.LogPrinter import LogPrinter |
||
13 | from coalib.processes.CONTROL_ELEMENT import CONTROL_ELEMENT |
||
14 | from coalib.processes.Processing import ( |
||
15 | ACTIONS, autoapply_actions, check_result_ignore, create_process_group, |
||
16 | execute_section, filter_raising_callables, get_default_actions, |
||
17 | get_file_dict, print_result, process_queues, simplify_section_result, |
||
18 | yield_ignore_ranges) |
||
19 | from coalib.results.HiddenResult import HiddenResult |
||
20 | from coalib.results.Result import RESULT_SEVERITY, Result |
||
21 | from coalib.results.result_actions.ApplyPatchAction import ApplyPatchAction |
||
22 | from coalib.results.result_actions.PrintDebugMessageAction import ( |
||
23 | PrintDebugMessageAction) |
||
24 | from coalib.results.result_actions.ResultAction import ResultAction |
||
25 | from coalib.results.SourceRange import SourceRange |
||
26 | from coalib.settings.ConfigurationGathering import gather_configuration |
||
27 | from coalib.settings.Section import Section |
||
28 | from coalib.settings.Setting import Setting |
||
29 | from coalib.misc.Caching import FileCache |
||
30 | |||
31 | |||
32 | process_group_test_code = """ |
||
33 | import time, subprocess, os, platform, sys; |
||
34 | p=subprocess.Popen([sys.executable, |
||
35 | "-c", |
||
36 | "import time; time.sleep(0.1)"]); |
||
37 | pgid = p.pid if platform.system() == "Windows" else os.getpgid(p.pid); |
||
38 | print(p.pid, pgid) |
||
39 | p.terminate() |
||
40 | """ |
||
41 | |||
42 | |||
43 | class DummyProcess(multiprocessing.Process): |
||
44 | |||
45 | def __init__(self, control_queue, starts_dead=False): |
||
46 | multiprocessing.Process.__init__(self) |
||
47 | self.control_queue = control_queue |
||
48 | self.starts_dead = starts_dead |
||
49 | |||
50 | def is_alive(self): |
||
51 | return not self.control_queue.empty() and not self.starts_dead |
||
52 | |||
53 | |||
54 | class ProcessingTestLogPrinter(LogPrinter): |
||
55 | |||
56 | def __init__(self, log_queue): |
||
57 | LogPrinter.__init__(self, self) |
||
58 | self.log_queue = log_queue |
||
59 | self.set_up = False |
||
60 | |||
61 | def log_message(self, log_message, timestamp=None, **kwargs): |
||
62 | self.log_queue.put(log_message) |
||
63 | |||
64 | |||
65 | class ProcessingTest(unittest.TestCase): |
||
66 | |||
67 | def setUp(self): |
||
68 | config_path = os.path.abspath(os.path.join( |
||
69 | os.path.dirname(__file__), |
||
70 | "section_executor_test_files", |
||
71 | ".coafile")) |
||
72 | self.testcode_c_path = os.path.join(os.path.dirname(config_path), |
||
73 | "testcode.c") |
||
74 | |||
75 | self.result_queue = queue.Queue() |
||
76 | self.queue = queue.Queue() |
||
77 | self.log_queue = queue.Queue() |
||
78 | log_printer = LogPrinter(ConsolePrinter()) |
||
79 | self.log_printer = ProcessingTestLogPrinter(self.log_queue) |
||
80 | |||
81 | (self.sections, |
||
82 | self.local_bears, |
||
83 | self.global_bears, |
||
84 | targets) = gather_configuration(lambda *args: True, |
||
85 | log_printer, |
||
86 | arg_list=["--config", |
||
87 | re.escape(config_path)]) |
||
88 | self.assertEqual(len(self.local_bears["default"]), 1) |
||
89 | self.assertEqual(len(self.global_bears["default"]), 1) |
||
90 | self.assertEqual(targets, []) |
||
91 | |||
92 | def test_run(self): |
||
93 | self.sections['default'].append(Setting('jobs', "1")) |
||
94 | cache = FileCache(self.log_printer, "coala_test", flush_cache=True) |
||
95 | results = execute_section(self.sections["default"], |
||
96 | self.global_bears["default"], |
||
97 | self.local_bears["default"], |
||
98 | lambda *args: self.result_queue.put(args[2]), |
||
99 | cache, |
||
100 | self.log_printer) |
||
101 | self.assertTrue(results[0]) |
||
102 | |||
103 | local_results = self.result_queue.get(timeout=0) |
||
104 | global_results = self.result_queue.get(timeout=0) |
||
105 | self.assertTrue(self.result_queue.empty()) |
||
106 | |||
107 | self.assertEqual(len(local_results), 1) |
||
108 | self.assertEqual(len(global_results), 1) |
||
109 | # Result dict also returned |
||
110 | # One file |
||
111 | self.assertEqual(len(results[1]), 1) |
||
112 | # One global bear |
||
113 | self.assertEqual(len(results[2]), 1) |
||
114 | |||
115 | local_result = local_results[0] |
||
116 | global_result = global_results[0] |
||
117 | |||
118 | self.assertRegex(repr(local_result), |
||
119 | "<Result object\\(id={}, origin='LocalTestBear', aff" |
||
120 | "ected_code=\\(\\), severity=NORMAL, confidence=100" |
||
121 | ", message='test msg'\\) at 0x[0-9a-fA-F]+>".format( |
||
122 | hex(local_result.id))) |
||
123 | self.assertRegex(repr(global_result), |
||
124 | "<Result object\\(id={}, origin='GlobalTestBear', " |
||
125 | "affected_code=\\(.*start=.*file=.*section_executor_" |
||
126 | "test_files.*line=None.*end=.*\\), severity=NORMAL, " |
||
127 | "confidence=100, message='test message'\\) at " |
||
128 | "0x[0-9a-fA-F]+>".format(hex(global_result.id))) |
||
129 | |||
130 | def test_empty_run(self): |
||
131 | self.sections['default'].append(Setting('jobs', "bogus!")) |
||
132 | results = execute_section(self.sections["default"], |
||
133 | [], |
||
134 | [], |
||
135 | lambda *args: self.result_queue.put(args[2]), |
||
136 | None, |
||
137 | self.log_printer) |
||
138 | # No results |
||
139 | self.assertFalse(results[0]) |
||
140 | # One file |
||
141 | self.assertEqual(len(results[1]), 1) |
||
142 | # No global bear |
||
143 | self.assertEqual(len(results[2]), 0) |
||
144 | |||
145 | def test_process_queues(self): |
||
146 | ctrlq = queue.Queue() |
||
147 | |||
148 | # Append custom controlling sequences. |
||
149 | |||
150 | # Simulated process 1 |
||
151 | ctrlq.put((CONTROL_ELEMENT.LOCAL, 1)) |
||
152 | ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None)) |
||
153 | ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1)) |
||
154 | |||
155 | # Simulated process 2 |
||
156 | ctrlq.put((CONTROL_ELEMENT.LOCAL, 2)) |
||
157 | |||
158 | # Simulated process 1 |
||
159 | ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None)) |
||
160 | |||
161 | # Simulated process 2 |
||
162 | ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None)) |
||
163 | ctrlq.put((CONTROL_ELEMENT.GLOBAL, 1)) |
||
164 | ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None)) |
||
165 | |||
166 | first_local = Result.from_values("o", "The first result.", file="f") |
||
167 | second_local = Result.from_values("ABear", |
||
168 | "The second result.", |
||
169 | file="f", |
||
170 | line=1) |
||
171 | third_local = Result.from_values("ABear", |
||
172 | "The second result.", |
||
173 | file="f", |
||
174 | line=4) |
||
175 | fourth_local = Result.from_values("ABear", |
||
176 | "Another result.", |
||
177 | file="f", |
||
178 | line=7) |
||
179 | first_global = Result("o", "The one and only global result.") |
||
180 | section = Section("") |
||
181 | section.append(Setting('min_severity', "normal")) |
||
182 | process_queues( |
||
183 | [DummyProcess(control_queue=ctrlq) for i in range(3)], |
||
184 | ctrlq, |
||
185 | {1: [first_local, |
||
186 | second_local, |
||
187 | third_local, |
||
188 | # The following are to be ignored |
||
189 | Result('o', 'm', severity=RESULT_SEVERITY.INFO), |
||
190 | Result.from_values("ABear", "u", "f", 2, 1), |
||
191 | Result.from_values("ABear", "u", "f", 3, 1)], |
||
192 | 2: [fourth_local, |
||
193 | # The following are to be ignored |
||
194 | HiddenResult("t", "c"), |
||
195 | Result.from_values("ABear", "u", "f", 5, 1), |
||
196 | Result.from_values("ABear", "u", "f", 6, 1)]}, |
||
197 | {1: [first_global]}, |
||
198 | {"f": ["first line # stop ignoring, invalid ignore range\n", |
||
199 | "second line # ignore all\n", |
||
200 | "third line\n", |
||
201 | "fourth line # gnore shouldn't trigger without i!\n", |
||
202 | "# Start ignoring ABear, BBear and CBear\n", |
||
203 | "# Stop ignoring\n", |
||
204 | "seventh"]}, |
||
205 | lambda *args: self.queue.put(args[2]), |
||
206 | section, |
||
207 | None, |
||
208 | self.log_printer) |
||
209 | |||
210 | self.assertEqual(self.queue.get(timeout=0), ([first_local, |
||
211 | second_local, |
||
212 | third_local])) |
||
213 | self.assertEqual(self.queue.get(timeout=0), ([fourth_local])) |
||
214 | self.assertEqual(self.queue.get(timeout=0), ([first_global])) |
||
215 | self.assertEqual(self.queue.get(timeout=0), ([first_global])) |
||
216 | |||
217 | def test_dead_processes(self): |
||
218 | ctrlq = queue.Queue() |
||
219 | # Not enough FINISH elements in the queue, processes start already dead |
||
220 | # Also queue elements are reversed |
||
221 | ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None)) |
||
222 | ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None)) |
||
223 | |||
224 | process_queues( |
||
225 | [DummyProcess(ctrlq, starts_dead=True) for i in range(3)], |
||
226 | ctrlq, {}, {}, {}, |
||
227 | lambda *args: self.queue.put(args[2]), |
||
228 | Section(""), |
||
229 | None, |
||
230 | self.log_printer) |
||
231 | with self.assertRaises(queue.Empty): |
||
232 | self.queue.get(timeout=0) |
||
233 | |||
234 | # Not enough FINISH elements in the queue, processes start already dead |
||
235 | ctrlq.put((CONTROL_ELEMENT.LOCAL_FINISHED, None)) |
||
236 | ctrlq.put((CONTROL_ELEMENT.GLOBAL_FINISHED, None)) |
||
237 | |||
238 | process_queues( |
||
239 | [DummyProcess(ctrlq, starts_dead=True) for i in range(3)], |
||
240 | ctrlq, {}, {}, {}, |
||
241 | lambda *args: self.queue.put(args[2]), |
||
242 | Section(""), |
||
243 | None, |
||
244 | self.log_printer) |
||
245 | with self.assertRaises(queue.Empty): |
||
246 | self.queue.get(timeout=0) |
||
247 | |||
248 | def test_create_process_group(self): |
||
249 | p = create_process_group([sys.executable, |
||
250 | "-c", |
||
251 | process_group_test_code], |
||
252 | stdout=subprocess.PIPE, |
||
253 | stderr=subprocess.PIPE) |
||
254 | retval = p.wait() |
||
255 | if retval != 0: |
||
256 | for line in p.stderr: |
||
257 | print(line, end='') |
||
258 | raise Exception("Subprocess did not exit correctly") |
||
259 | output = [i for i in p.stdout] |
||
260 | p.stderr.close() |
||
261 | p.stdout.close() |
||
262 | pid, pgid = [int(i.strip()) for i_out in output for i in i_out.split()] |
||
263 | if platform.system() != "Windows": |
||
264 | # There is no way of testing this on windows with the current |
||
265 | # python modules subprocess and os |
||
266 | self.assertEqual(p.pid, pgid) |
||
267 | |||
268 | def test_filter_raising_callables(self): |
||
269 | class A(Exception): |
||
270 | pass |
||
271 | |||
272 | class B(Exception): |
||
273 | pass |
||
274 | |||
275 | class C(Exception): |
||
276 | pass |
||
277 | |||
278 | def create_exception_raiser(exception): |
||
279 | def raiser(exc): |
||
280 | if exception in exc: |
||
281 | raise exception |
||
282 | return exception |
||
283 | return raiser |
||
284 | |||
285 | raiseA, raiseB, raiseC = (create_exception_raiser(exc) |
||
286 | for exc in [A, B, C]) |
||
287 | |||
288 | test_list = [raiseA, raiseC, raiseB, raiseC] |
||
289 | self.assertEqual(list(filter_raising_callables(test_list, A, (A,))), |
||
290 | [C, B, C]) |
||
291 | |||
292 | self.assertEqual(list(filter_raising_callables(test_list, |
||
293 | (B, C), |
||
294 | exc=(B, C))), |
||
295 | [A]) |
||
296 | |||
297 | # Test whether non filtered exceptions bubble up. |
||
298 | with self.assertRaises(B): |
||
299 | list(filter_raising_callables(test_list, C, exc=(B, C))) |
||
300 | |||
301 | def test_get_file_dict(self): |
||
302 | file_dict = get_file_dict([self.testcode_c_path], self.log_printer) |
||
303 | self.assertEqual(len(file_dict), 1) |
||
304 | self.assertEqual(type(file_dict[self.testcode_c_path]), |
||
305 | tuple, |
||
306 | msg="files in file_dict should not be editable") |
||
307 | self.assertEqual("Files that will be checked:\n" + self.testcode_c_path, |
||
308 | self.log_printer.log_queue.get().message) |
||
309 | |||
310 | def test_get_file_dict_non_existent_file(self): |
||
311 | file_dict = get_file_dict(["non_existent_file"], self.log_printer) |
||
312 | self.assertEqual(file_dict, {}) |
||
313 | self.assertIn(("Failed to read file 'non_existent_file' because of " |
||
314 | "an unknown error."), |
||
315 | self.log_printer.log_queue.get().message) |
||
316 | |||
317 | def test_simplify_section_result(self): |
||
318 | results = (True, |
||
319 | {"file1": [Result("a", "b")], "file2": None}, |
||
320 | {"file3": [Result("a", "c")]}, |
||
321 | None) |
||
322 | yielded, yielded_unfixed, all_results = simplify_section_result(results) |
||
323 | self.assertEqual(yielded, True) |
||
324 | self.assertEqual(yielded_unfixed, True) |
||
325 | self.assertEqual(len(all_results), 2) |
||
326 | |||
327 | def test_ignore_results(self): |
||
328 | ranges = [([], SourceRange.from_values("f", 1, 1, 2, 2))] |
||
329 | result = Result.from_values("origin", |
||
330 | "message", |
||
331 | file="e", |
||
332 | line=1, |
||
333 | column=1, |
||
334 | end_line=2, |
||
335 | end_column=2) |
||
336 | |||
337 | self.assertFalse(check_result_ignore(result, ranges)) |
||
338 | |||
339 | ranges.append(([], SourceRange.from_values("e", 2, 3, 3, 3))) |
||
340 | self.assertFalse(check_result_ignore(result, ranges)) |
||
341 | |||
342 | ranges.append(([], SourceRange.from_values("e", 1, 1, 2, 2))) |
||
343 | self.assertTrue(check_result_ignore(result, ranges)) |
||
344 | |||
345 | result1 = Result.from_values("origin", "message", file="e") |
||
346 | self.assertFalse(check_result_ignore(result1, ranges)) |
||
347 | |||
348 | ranges = [(['something', 'else', 'not origin'], |
||
349 | SourceRange.from_values("e", 1, 1, 2, 2))] |
||
350 | self.assertFalse(check_result_ignore(result, ranges)) |
||
351 | |||
352 | ranges = [(['something', 'else', 'origin'], |
||
353 | SourceRange.from_values("e", 1, 1, 2, 2))] |
||
354 | self.assertTrue(check_result_ignore(result, ranges)) |
||
355 | |||
356 | def test_ignore_glob(self): |
||
357 | result = Result.from_values("LineLengthBear", |
||
358 | "message", |
||
359 | file="d", |
||
360 | line=1, |
||
361 | column=1, |
||
362 | end_line=2, |
||
363 | end_column=2) |
||
364 | ranges = [(["(line*|space*)", "py*"], |
||
365 | SourceRange.from_values("d", 1, 1, 2, 2))] |
||
366 | self.assertTrue(check_result_ignore(result, ranges)) |
||
367 | |||
368 | result = Result.from_values("SpaceConsistencyBear", |
||
369 | "message", |
||
370 | file="d", |
||
371 | line=1, |
||
372 | column=1, |
||
373 | end_line=2, |
||
374 | end_column=2) |
||
375 | ranges = [(["(line*|space*)", "py*"], |
||
376 | SourceRange.from_values("d", 1, 1, 2, 2))] |
||
377 | self.assertTrue(check_result_ignore(result, ranges)) |
||
378 | |||
379 | result = Result.from_values("XMLBear", |
||
380 | "message", |
||
381 | file="d", |
||
382 | line=1, |
||
383 | column=1, |
||
384 | end_line=2, |
||
385 | end_column=2) |
||
386 | ranges = [(["(line*|space*)", "py*"], |
||
387 | SourceRange.from_values("d", 1, 1, 2, 2))] |
||
388 | self.assertFalse(check_result_ignore(result, ranges)) |
||
389 | |||
390 | def test_yield_ignore_ranges(self): |
||
391 | test_file_dict_a = {'f': |
||
392 | ('# Ignore aBear\n', |
||
393 | 'a_string = "This string should be ignored"\n')} |
||
394 | test_ignore_range_a = list(yield_ignore_ranges(test_file_dict_a)) |
||
395 | for test_bears, test_source_range in test_ignore_range_a: |
||
396 | self.assertEqual(test_bears, ['abear']) |
||
397 | self.assertEqual(test_source_range.start.line, 1) |
||
398 | self.assertEqual(test_source_range.start.column, 1) |
||
399 | self.assertEqual(test_source_range.end.line, 2) |
||
400 | self.assertEqual(test_source_range.end.column, 43) |
||
401 | |||
402 | test_file_dict_b = {'f': |
||
403 | ('# start Ignoring bBear\n', |
||
404 | 'b_string = "This string should be ignored"\n', |
||
405 | '# stop ignoring\n')} |
||
406 | test_ignore_range_b = list(yield_ignore_ranges(test_file_dict_b)) |
||
407 | for test_bears, test_source_range in test_ignore_range_b: |
||
408 | self.assertEqual(test_bears, ['bbear']) |
||
409 | self.assertEqual(test_source_range.start.line, 1) |
||
410 | self.assertEqual(test_source_range.start.column, 1) |
||
411 | self.assertEqual(test_source_range.end.line, 3) |
||
412 | self.assertEqual(test_source_range.end.column, 16) |
||
413 | |||
414 | test_file_dict_c = {'f': |
||
415 | ('# Start ignoring cBear\n', |
||
416 | '# Stop ignoring cBear This & prev ignored\n')} |
||
417 | test_ignore_range_c = list(yield_ignore_ranges(test_file_dict_c)) |
||
418 | for test_bears, test_source_range in test_ignore_range_c: |
||
419 | self.assertEqual(test_bears, ['cbear']) |
||
420 | self.assertEqual(test_source_range.start.line, 1) |
||
421 | self.assertEqual(test_source_range.start.column, 1) |
||
422 | self.assertEqual(test_source_range.end.line, 2) |
||
423 | self.assertEqual(test_source_range.end.column, 42) |
||
424 | |||
425 | test_file_dict_d = {'f': |
||
426 | ('# Start ignoring cBear\n', |
||
427 | 'All of this ignored\n')} |
||
428 | test_ignore_range_d = list(yield_ignore_ranges(test_file_dict_d)) |
||
429 | for test_bears, test_source_range in test_ignore_range_d: |
||
430 | self.assertEqual(test_bears, ['cbear']) |
||
431 | self.assertEqual(test_source_range.start.line, 1) |
||
432 | self.assertEqual(test_source_range.start.column, 1) |
||
433 | self.assertEqual(test_source_range.end.line, 2) |
||
434 | self.assertEqual(test_source_range.end.column, 20) |
||
435 | |||
436 | # This case was a bug. |
||
437 | test_file_dict_single_line = {'f': ('# ignore XBEAR',)} |
||
438 | test_ignore_range_single_line = list(yield_ignore_ranges( |
||
439 | test_file_dict_single_line)) |
||
440 | |||
441 | self.assertEqual(len(test_ignore_range_single_line), 1) |
||
442 | bears, source_range = test_ignore_range_single_line[0] |
||
443 | self.assertEqual(bears, ['xbear']) |
||
444 | self.assertEqual(source_range.start.line, 1) |
||
445 | self.assertEqual(source_range.start.column, 1) |
||
446 | self.assertEqual(source_range.end.line, 1) |
||
447 | self.assertEqual(source_range.end.column, 14) |
||
448 | |||
449 | |||
450 | class ProcessingTest_GetDefaultActions(unittest.TestCase): |
||
451 | |||
452 | def setUp(self): |
||
453 | self.section = Section("X") |
||
454 | |||
455 | def test_no_key(self): |
||
456 | self.assertEqual(get_default_actions(self.section), ({}, {})) |
||
457 | |||
458 | def test_no_value(self): |
||
459 | self.section.append(Setting("default_actions", "")) |
||
460 | self.assertEqual(get_default_actions(self.section), ({}, {})) |
||
461 | |||
462 | def test_only_valid_actions(self): |
||
463 | self.section.append(Setting( |
||
464 | "default_actions", |
||
465 | "MyBear: PrintDebugMessageAction, ValidBear: ApplyPatchAction")) |
||
466 | self.assertEqual( |
||
467 | get_default_actions(self.section), |
||
468 | ({"MyBear": PrintDebugMessageAction, |
||
469 | "ValidBear": ApplyPatchAction}, |
||
470 | {})) |
||
471 | |||
472 | def test_valid_and_invalid_actions(self): |
||
473 | self.section.append(Setting( |
||
474 | "default_actions", |
||
475 | "MyBear: INVALID_action, ValidBear: ApplyPatchAction, XBear: ABC")) |
||
476 | self.assertEqual(get_default_actions(self.section), |
||
477 | ({"ValidBear": ApplyPatchAction}, |
||
478 | {"MyBear": "INVALID_action", "XBear": "ABC"})) |
||
479 | |||
480 | |||
481 | class ProcessingTest_AutoapplyActions(unittest.TestCase): |
||
482 | |||
483 | def setUp(self): |
||
484 | self.log_queue = queue.Queue() |
||
485 | self.log_printer = ProcessingTestLogPrinter(self.log_queue) |
||
486 | |||
487 | self.resultY = Result("YBear", "msg1") |
||
488 | self.resultZ = Result("ZBear", "msg2") |
||
489 | self.results = [self.resultY, self.resultZ] |
||
490 | self.section = Section("A") |
||
491 | |||
492 | def test_no_default_actions(self): |
||
493 | ret = autoapply_actions(self.results, |
||
494 | {}, |
||
495 | {}, |
||
496 | self.section, |
||
497 | self.log_printer) |
||
498 | self.assertEqual(ret, self.results) |
||
499 | self.assertTrue(self.log_queue.empty()) |
||
500 | |||
501 | def test_with_invalid_action(self): |
||
502 | self.section.append(Setting("default_actions", |
||
503 | "XBear: nonSENSE_action")) |
||
504 | ret = autoapply_actions(self.results, |
||
505 | {}, |
||
506 | {}, |
||
507 | self.section, |
||
508 | self.log_printer) |
||
509 | self.assertEqual(ret, self.results) |
||
510 | self.assertEqual(self.log_queue.get().message, |
||
511 | "Selected default action 'nonSENSE_action' for bear " |
||
512 | "'XBear' does not exist. Ignoring action.") |
||
513 | self.assertTrue(self.log_queue.empty()) |
||
514 | |||
515 | def test_without_default_action_and_unapplicable(self): |
||
516 | # Use a result where no default action is supplied for and another one |
||
517 | View Code Duplication | # where the action is not applicable. |
|
0 ignored issues
–
show
Duplication
introduced
by
![]() |
|||
518 | old_is_applicable = ApplyPatchAction.is_applicable |
||
519 | ApplyPatchAction.is_applicable = lambda *args: False |
||
520 | |||
521 | self.section.append(Setting( |
||
522 | "default_actions", |
||
523 | "NoBear: ApplyPatchAction, YBear: ApplyPatchAction")) |
||
524 | ret = autoapply_actions(self.results, |
||
525 | {}, |
||
526 | {}, |
||
527 | self.section, |
||
528 | self.log_printer) |
||
529 | self.assertEqual(ret, self.results) |
||
530 | self.assertEqual(self.log_queue.get().message, |
||
531 | "Selected default action 'ApplyPatchAction' for bear " |
||
532 | "'YBear' is not applicable. Action not applied.") |
||
533 | self.assertTrue(self.log_queue.empty()) |
||
534 | |||
535 | ApplyPatchAction.is_applicable = old_is_applicable |
||
536 | |||
537 | def test_applicable_action(self): |
||
538 | # Use a result whose action can be successfully applied. |
||
539 | log_printer = self.log_printer |
||
540 | |||
541 | class TestAction(ResultAction): |
||
542 | |||
543 | def apply(self, *args, **kwargs): |
||
544 | View Code Duplication | log_printer.debug("ACTION APPLIED SUCCESSFULLY.") |
|
0 ignored issues
–
show
|
|||
545 | |||
546 | ACTIONS.append(TestAction) |
||
547 | |||
548 | self.section.append(Setting("default_actions", "Z*: TestAction")) |
||
549 | ret = autoapply_actions(self.results, |
||
550 | {}, |
||
551 | {}, |
||
552 | self.section, |
||
553 | log_printer) |
||
554 | self.assertEqual(ret, [self.resultY]) |
||
555 | self.assertEqual(self.log_queue.get().message, |
||
556 | "ACTION APPLIED SUCCESSFULLY.") |
||
557 | self.assertEqual(self.log_queue.get().message, |
||
558 | "Applied 'TestAction' " |
||
559 | "on the whole project from 'ZBear'.") |
||
560 | self.assertTrue(self.log_queue.empty()) |
||
561 | |||
562 | ACTIONS.pop() |
||
563 | |||
564 | def test_failing_action(self): |
||
565 | class FailingTestAction(ResultAction): |
||
566 | |||
567 | def apply(self, *args, **kwargs): |
||
568 | raise RuntimeError("YEAH THAT'S A FAILING BEAR") |
||
569 | |||
570 | ACTIONS.append(FailingTestAction) |
||
571 | |||
572 | self.section.append(Setting("default_actions", |
||
573 | "YBear: FailingTestAction")) |
||
574 | ret = autoapply_actions(self.results, |
||
575 | {}, |
||
576 | {}, |
||
577 | self.section, |
||
578 | self.log_printer) |
||
579 | self.assertEqual(ret, self.results) |
||
580 | self.assertEqual(self.log_queue.get().message, |
||
581 | "Failed to execute action 'FailingTestAction'" |
||
582 | " with error: YEAH THAT'S A FAILING BEAR.") |
||
583 | self.assertIn("YEAH THAT'S A FAILING BEAR", |
||
584 | self.log_queue.get().message) |
||
585 | self.assertEqual(self.log_queue.get().message, |
||
586 | "-> for result " + repr(self.resultY) + ".") |
||
587 | self.assertTrue(self.log_queue.empty()) |
||
588 | |||
589 | ACTIONS.pop() |
||
590 | |||
591 | |||
592 | class ProcessingTest_PrintResult(unittest.TestCase): |
||
593 | |||
594 | def setUp(self): |
||
595 | self.section = Section('name') |
||
596 | self.log_printer = LogPrinter(ConsolePrinter(), log_level=0) |
||
597 | |||
598 | def test_autoapply_override(self): |
||
599 | """ |
||
600 | Tests that the default_actions aren't automatically applied when the |
||
601 | autoapply setting overrides that. |
||
602 | """ |
||
603 | self.section.append(Setting('default_actions', |
||
604 | 'somebear: PrintDebugMessageAction')) |
||
605 | |||
606 | # Verify that it would apply the action, i.e. remove the result |
||
607 | results = [5, HiddenResult('origin', []), |
||
608 | Result('somebear', 'message', debug_msg='debug')] |
||
609 | retval, newres = print_result(results, {}, 0, lambda *args: None, |
||
610 | self.section, self.log_printer, {}, []) |
||
611 | self.assertEqual(newres, []) |
||
612 | |||
613 | # Override and verify that result is unprocessed, i.e. not gone |
||
614 | self.section.append(Setting('autoapply', 'false')) |
||
615 | retval, newres = print_result(results, {}, 0, lambda *args: None, |
||
616 | self.section, self.log_printer, {}, []) |
||
617 | self.assertNotEqual(newres, []) |
||
618 |