1 | import os |
||
2 | import subprocess |
||
3 | import sys |
||
4 | from tempfile import TemporaryDirectory |
||
5 | import unittest |
||
6 | |||
7 | from coalib.misc.ContextManagers import ( |
||
8 | change_directory, make_temp, prepare_file, retrieve_stdout, |
||
9 | retrieve_stderr, simulate_console_inputs, subprocess_timeout, |
||
10 | suppress_stdout) |
||
11 | from coalib.processes.Processing import create_process_group |
||
12 | |||
13 | |||
14 | process_group_timeout_test_code = """ |
||
15 | import time, subprocess, sys; |
||
16 | p = subprocess.Popen([sys.executable, |
||
17 | "-c", |
||
18 | "import time; time.sleep(100)"]); |
||
19 | time.sleep(100); |
||
20 | """ |
||
21 | |||
22 | |||
23 | class ContextManagersTest(unittest.TestCase): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
24 | |||
25 | def test_subprocess_timeout(self): |
||
26 | p = subprocess.Popen([sys.executable, |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
27 | "-c", |
||
28 | "import time; time.sleep(0.5);"], |
||
29 | stderr=subprocess.PIPE) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
30 | with subprocess_timeout(p, 0.2) as timedout: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
31 | retval = p.wait() |
||
32 | p.stderr.close() |
||
33 | self.assertEqual(timedout.value, True) |
||
34 | self.assertNotEqual(retval, 0) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
35 | |||
36 | p = create_process_group([sys.executable, |
||
37 | "-c", |
||
38 | process_group_timeout_test_code]) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
39 | with subprocess_timeout(p, 0.5, kill_pg=True): |
||
40 | retval = p.wait() |
||
41 | self.assertEqual(timedout.value, True) |
||
42 | self.assertNotEqual(retval, 0) |
||
43 | |||
44 | p = subprocess.Popen([sys.executable, |
||
45 | "-c", |
||
46 | "import time"]) |
||
47 | with subprocess_timeout(p, 0.5) as timedout: |
||
48 | retval = p.wait() |
||
49 | self.assertEqual(timedout.value, False) |
||
50 | self.assertEqual(retval, 0) |
||
51 | |||
52 | p = subprocess.Popen([sys.executable, |
||
53 | "-c", |
||
54 | "import time"]) |
||
55 | with subprocess_timeout(p, 0) as timedout: |
||
56 | retval = p.wait() |
||
57 | self.assertEqual(timedout.value, False) |
||
58 | self.assertEqual(retval, 0) |
||
59 | |||
60 | def test_suppress_stdout(self): |
||
61 | def print_func(): |
||
62 | print("func") |
||
63 | raise NotImplementedError |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
64 | |||
65 | def no_print_func(): |
||
66 | with suppress_stdout(): |
||
67 | print("func") |
||
68 | raise NotImplementedError |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
69 | |||
70 | old_stdout = sys.stdout |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
71 | sys.stdout = False |
||
72 | |||
73 | self.assertRaises(AttributeError, print_func) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
74 | self.assertRaises(NotImplementedError, no_print_func) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
75 | |||
76 | sys.stdout = old_stdout |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
77 | |||
78 | def test_retrieve_stdout(self): |
||
79 | with retrieve_stdout() as sio: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
80 | print("test", file=sys.stdout) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
81 | self.assertEqual(sio.getvalue(), "test\n") |
||
82 | |||
83 | def test_retrieve_stderr(self): |
||
84 | with retrieve_stderr() as sio: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
85 | print("test", file=sys.stderr) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
86 | self.assertEqual(sio.getvalue(), "test\n") |
||
87 | |||
88 | def test_simulate_console_inputs(self): |
||
89 | with simulate_console_inputs(0, 1, 2) as generator: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
90 | self.assertEqual(input(), 0) |
||
91 | self.assertEqual(generator.last_input, 0) |
||
92 | generator.inputs.append(3) |
||
93 | self.assertEqual(input(), 1) |
||
94 | self.assertEqual(input(), 2) |
||
95 | self.assertEqual(input(), 3) |
||
96 | self.assertEqual(generator.last_input, 3) |
||
97 | |||
98 | with simulate_console_inputs("test"), self.assertRaises(ValueError): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
99 | self.assertEqual(input(), "test") |
||
100 | input() |
||
101 | |||
102 | def test_make_temp(self): |
||
103 | with make_temp() as f_a: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
104 | self.assertTrue(os.path.isfile(f_a)) |
||
105 | self.assertTrue(os.path.basename(f_a).startswith("tmp")) |
||
106 | |||
107 | with make_temp(suffix=".orig", prefix="pre") as f_b: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
108 | self.assertTrue(f_b.endswith(".orig")) |
||
109 | self.assertTrue(os.path.basename(f_b).startswith("pre")) |
||
110 | |||
111 | def test_prepare_file(self): |
||
112 | with prepare_file(['line1', 'line2\n'], |
||
113 | "/file/name", |
||
114 | force_linebreaks=True, |
||
115 | create_tempfile=True) as (lines, filename): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
116 | self.assertEqual(filename, '/file/name') |
||
117 | self.assertEqual(lines, ['line1\n', 'line2\n']) |
||
118 | |||
119 | with prepare_file(['line1', 'line2\n'], |
||
120 | None, |
||
121 | force_linebreaks=False, |
||
122 | create_tempfile=True) as (lines, filename): |
||
123 | self.assertTrue(os.path.isfile(filename)) |
||
124 | self.assertEqual(lines, ['line1', 'line2\n']) |
||
125 | |||
126 | with prepare_file(['line1', 'line2\n'], |
||
127 | None, |
||
128 | tempfile_kwargs={"suffix": ".test", |
||
129 | "prefix": "test_"}, |
||
130 | force_linebreaks=False, |
||
131 | create_tempfile=True) as (lines, filename): |
||
132 | self.assertTrue(os.path.isfile(filename)) |
||
133 | basename = os.path.basename(filename) |
||
134 | self.assertTrue(basename.endswith(".test")) |
||
135 | self.assertTrue(basename.startswith("test_")) |
||
136 | |||
137 | with prepare_file(['line1', 'line2\n'], |
||
138 | None, |
||
139 | force_linebreaks=False, |
||
140 | create_tempfile=False) as (lines, filename): |
||
141 | self.assertEqual(filename, "dummy_file_name") |
||
142 | |||
143 | def test_change_directory(self): |
||
144 | old_dir = os.getcwd() |
||
145 | with TemporaryDirectory("temp") as tempdir: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
146 | tempdir = os.path.realpath(tempdir) |
||
147 | with change_directory(tempdir): |
||
148 | self.assertEqual(os.getcwd(), tempdir) |
||
149 | self.assertEqual(os.getcwd(), old_dir) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
150 |