Completed
Pull Request — master (#1183)
by Lasse
02:01 queued 10s
created

coalib.tests.misc.ContextManagersTest.test_preserve_sys_path()   A

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 6
rs 9.4286
1
import unittest
2
import sys
3
import subprocess
4
5
sys.path.insert(0, ".")
6
from coalib.misc.ContextManagers import (suppress_stdout,
7
                                         retrieve_stdout,
8
                                         simulate_console_inputs,
9
                                         subprocess_timeout)
10
from coalib.processes.Processing import create_process_group
11
12
13
process_group_timeout_test_code = """
14
import time, subprocess, sys;
15
p = subprocess.Popen([sys.executable,
16
                     "-c",
17
                     "import time; time.sleep(100)"]);
18
time.sleep(100);
19
"""
20
21
22
class ContextManagersTest(unittest.TestCase):
23
    def test_subprocess_timeout(self):
24
        p = subprocess.Popen([sys.executable,
25
                              "-c",
26
                              "import time; time.sleep(0.5);"],
27
                             stderr=subprocess.PIPE)
28
        with subprocess_timeout(p, 0.2) as timedout:
29
            retval = p.wait()
30
            p.stderr.close()
31
            self.assertEqual(timedout.value, True)
32
        self.assertNotEqual(retval, 0)
33
34
        p = create_process_group([sys.executable,
35
                                  "-c",
36
                                  process_group_timeout_test_code])
37
        with subprocess_timeout(p, 0.5, kill_pg=True):
38
            retval = p.wait()
39
            self.assertEqual(timedout.value, True)
40
        self.assertNotEqual(retval, 0)
41
42
        p = subprocess.Popen([sys.executable,
43
                              "-c",
44
                              "import time"])
45
        with subprocess_timeout(p, 0.5) as timedout:
46
            retval = p.wait()
47
            self.assertEqual(timedout.value, False)
48
        self.assertEqual(retval, 0)
49
50
        p = subprocess.Popen([sys.executable,
51
                              "-c",
52
                              "import time"])
53
        with subprocess_timeout(p, 0) as timedout:
54
            retval = p.wait()
55
            self.assertEqual(timedout.value, False)
56
        self.assertEqual(retval, 0)
57
58
    def test_suppress_stdout(self):
59
        def print_func():
60
            print("func")
61
            raise NotImplementedError
62
63
        def no_print_func():
64
            with suppress_stdout():
65
                print("func")
66
                raise NotImplementedError
67
68
        old_stdout = sys.stdout
69
        sys.stdout = False
70
71
        self.assertRaises(AttributeError, print_func)
72
        self.assertRaises(NotImplementedError, no_print_func)
73
74
        sys.stdout = old_stdout
75
76
    def test_retrieve_stdout(self):
77
        with retrieve_stdout() as sio:
78
            print("test")
79
            self.assertEqual(sio.getvalue(), "test\n")
80
81
    def test_simulate_console_inputs(self):
82
        with simulate_console_inputs(0, 1, 2) as generator:
83
            self.assertEqual(input(), 0)
84
            self.assertEqual(generator.last_input, 0)
85
            generator.inputs.append(3)
86
            self.assertEqual(input(), 1)
87
            self.assertEqual(input(), 2)
88
            self.assertEqual(input(), 3)
89
            self.assertEqual(generator.last_input, 3)
90
91
        with simulate_console_inputs("test"), self.assertRaises(ValueError):
92
            self.assertEqual(input(), "test")
93
            input()
94
95
96
if __name__ == '__main__':
97
    unittest.main(verbosity=2)
98