Completed
Pull Request — master (#1431)
by Abdeali
01:31
created

coalib.tests.misc.ContextManagersTest   A

Complexity

Total Complexity 22

Size/Duplication

Total Lines 114
Duplicated Lines 0 %
Metric Value
dl 0
loc 114
rs 10
wmc 22

8 Methods

Rating   Name   Duplication   Size   Complexity  
B test_subprocess_timeout() 0 34 5
A test_simulate_console_inputs() 0 13 3
A no_print_func() 0 4 2
A test_retrieve_stdout() 0 4 2
A print_func() 0 3 1
A test_make_temp() 0 8 3
A test_suppress_stdout() 0 17 4
B test_prepare_file() 0 31 5
1
import unittest
2
import sys
3
import subprocess
4
import os
5
6
from coalib.misc.ContextManagers import (suppress_stdout,
7
                                         retrieve_stdout,
8
                                         simulate_console_inputs,
9
                                         subprocess_timeout,
10
                                         make_temp,
11
                                         prepare_file)
12
from coalib.processes.Processing import create_process_group
13
14
15
process_group_timeout_test_code = """
16
import time, subprocess, sys;
17
p = subprocess.Popen([sys.executable,
18
                     "-c",
19
                     "import time; time.sleep(100)"]);
20
time.sleep(100);
21
"""
22
23
24
class ContextManagersTest(unittest.TestCase):
25
26
    def test_subprocess_timeout(self):
27
        p = subprocess.Popen([sys.executable,
28
                              "-c",
29
                              "import time; time.sleep(0.5);"],
30
                             stderr=subprocess.PIPE)
31
        with subprocess_timeout(p, 0.2) as timedout:
32
            retval = p.wait()
33
            p.stderr.close()
34
            self.assertEqual(timedout.value, True)
35
        self.assertNotEqual(retval, 0)
36
37
        p = create_process_group([sys.executable,
38
                                  "-c",
39
                                  process_group_timeout_test_code])
40
        with subprocess_timeout(p, 0.5, kill_pg=True):
41
            retval = p.wait()
42
            self.assertEqual(timedout.value, True)
43
        self.assertNotEqual(retval, 0)
44
45
        p = subprocess.Popen([sys.executable,
46
                              "-c",
47
                              "import time"])
48
        with subprocess_timeout(p, 0.5) as timedout:
49
            retval = p.wait()
50
            self.assertEqual(timedout.value, False)
51
        self.assertEqual(retval, 0)
52
53
        p = subprocess.Popen([sys.executable,
54
                              "-c",
55
                              "import time"])
56
        with subprocess_timeout(p, 0) as timedout:
57
            retval = p.wait()
58
            self.assertEqual(timedout.value, False)
59
        self.assertEqual(retval, 0)
60
61
    def test_suppress_stdout(self):
62
        def print_func():
63
            print("func")
64
            raise NotImplementedError
65
66
        def no_print_func():
67
            with suppress_stdout():
68
                print("func")
69
                raise NotImplementedError
70
71
        old_stdout = sys.stdout
72
        sys.stdout = False
73
74
        self.assertRaises(AttributeError, print_func)
75
        self.assertRaises(NotImplementedError, no_print_func)
76
77
        sys.stdout = old_stdout
78
79
    def test_retrieve_stdout(self):
80
        with retrieve_stdout() as sio:
81
            print("test")
82
            self.assertEqual(sio.getvalue(), "test\n")
83
84
    def test_simulate_console_inputs(self):
85
        with simulate_console_inputs(0, 1, 2) as generator:
86
            self.assertEqual(input(), 0)
87
            self.assertEqual(generator.last_input, 0)
88
            generator.inputs.append(3)
89
            self.assertEqual(input(), 1)
90
            self.assertEqual(input(), 2)
91
            self.assertEqual(input(), 3)
92
            self.assertEqual(generator.last_input, 3)
93
94
        with simulate_console_inputs("test"), self.assertRaises(ValueError):
95
            self.assertEqual(input(), "test")
96
            input()
97
98
    def test_make_temp(self):
99
        with make_temp() as f_a:
100
            self.assertTrue(os.path.isfile(f_a))
101
            self.assertTrue(os.path.basename(f_a).startswith("tmp"))
102
103
        with make_temp(suffix=".orig", prefix="pre") as f_b:
104
            self.assertTrue(f_b.endswith(".orig"))
105
            self.assertTrue(os.path.basename(f_b).startswith("pre"))
106
107
    def test_prepare_file(self):
108
        with prepare_file(['line1', 'line2\n'],
109
                          "/file/name",
110
                          force_linebreaks=True,
111
                          create_tempfile=True) as (lines, filename):
112
            self.assertEqual(filename, '/file/name')
113
            self.assertEqual(lines, ['line1\n', 'line2\n'])
114
115
        with prepare_file(['line1', 'line2\n'],
116
                          None,
117
                          force_linebreaks=False,
118
                          create_tempfile=True) as (lines, filename):
119
            self.assertTrue(os.path.isfile(filename))
120
            self.assertEqual(lines, ['line1', 'line2\n'])
121
122
        with prepare_file(['line1', 'line2\n'],
123
                          None,
124
                          tempfile_kwargs={"suffix": ".test",
125
                                           "prefix": "test_"},
126
                          force_linebreaks=False,
127
                          create_tempfile=True) as (lines, filename):
128
            self.assertTrue(os.path.isfile(filename))
129
            basename = os.path.basename(filename)
130
            self.assertTrue(basename.endswith(".test"))
131
            self.assertTrue(basename.startswith("test_"))
132
133
        with prepare_file(['line1', 'line2\n'],
134
                          None,
135
                          force_linebreaks=False,
136
                          create_tempfile=False) as (lines, filename):
137
            self.assertEqual(filename, "dummy_file_name")
138