Completed
Pull Request — master (#1431)
by Abdeali
01:46 queued 13s
created

test_prepare_file()   B

Complexity

Conditions 5

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 29
rs 8.0894
1
import unittest
2
import sys
3
import subprocess
4
import os
5
6
from coalib.misc.ContextManagers import (suppress_stdout,
7
                                         retrieve_stdout,
8
                                         simulate_console_inputs,
9
                                         subprocess_timeout,
10
                                         make_temp,
11
                                         prepare_file)
12
from coalib.processes.Processing import create_process_group
13
14
15
process_group_timeout_test_code = """
16
import time, subprocess, sys;
17
p = subprocess.Popen([sys.executable,
18
                     "-c",
19
                     "import time; time.sleep(100)"]);
20
time.sleep(100);
21
"""
22
23
24
class ContextManagersTest(unittest.TestCase):
25
26
    def test_subprocess_timeout(self):
27
        p = subprocess.Popen([sys.executable,
28
                              "-c",
29
                              "import time; time.sleep(0.5);"],
30
                             stderr=subprocess.PIPE)
31
        with subprocess_timeout(p, 0.2) as timedout:
32
            retval = p.wait()
33
            p.stderr.close()
34
            self.assertEqual(timedout.value, True)
35
        self.assertNotEqual(retval, 0)
36
37
        p = create_process_group([sys.executable,
38
                                  "-c",
39
                                  process_group_timeout_test_code])
40
        with subprocess_timeout(p, 0.5, kill_pg=True):
41
            retval = p.wait()
42
            self.assertEqual(timedout.value, True)
43
        self.assertNotEqual(retval, 0)
44
45
        p = subprocess.Popen([sys.executable,
46
                              "-c",
47
                              "import time"])
48
        with subprocess_timeout(p, 0.5) as timedout:
49
            retval = p.wait()
50
            self.assertEqual(timedout.value, False)
51
        self.assertEqual(retval, 0)
52
53
        p = subprocess.Popen([sys.executable,
54
                              "-c",
55
                              "import time"])
56
        with subprocess_timeout(p, 0) as timedout:
57
            retval = p.wait()
58
            self.assertEqual(timedout.value, False)
59
        self.assertEqual(retval, 0)
60
61
    def test_suppress_stdout(self):
62
        def print_func():
63
            print("func")
64
            raise NotImplementedError
65
66
        def no_print_func():
67
            with suppress_stdout():
68
                print("func")
69
                raise NotImplementedError
70
71
        old_stdout = sys.stdout
72
        sys.stdout = False
73
74
        self.assertRaises(AttributeError, print_func)
75
        self.assertRaises(NotImplementedError, no_print_func)
76
77
        sys.stdout = old_stdout
78
79
    def test_retrieve_stdout(self):
80
        with retrieve_stdout() as sio:
81
            print("test")
82
            self.assertEqual(sio.getvalue(), "test\n")
83
84
    def test_simulate_console_inputs(self):
85
        with simulate_console_inputs(0, 1, 2) as generator:
86
            self.assertEqual(input(), 0)
87
            self.assertEqual(generator.last_input, 0)
88
            generator.inputs.append(3)
89
            self.assertEqual(input(), 1)
90
            self.assertEqual(input(), 2)
91
            self.assertEqual(input(), 3)
92
            self.assertEqual(generator.last_input, 3)
93
94
        with simulate_console_inputs("test"), self.assertRaises(ValueError):
95
            self.assertEqual(input(), "test")
96
            input()
97
98
    def test_make_temp(self):
99
        with make_temp() as f_a:
100
            self.assertTrue(os.path.isfile(f_a))
101
            self.assertTrue(os.path.basename(f_a).startswith("tmp"))
102
103
        with make_temp(suffix=".orig", prefix="pre") as f_b:
104
            self.assertTrue(f_b.endswith(".orig"))
105
            self.assertTrue(os.path.basename(f_b).startswith("pre"))
106
107
    def test_prepare_file(self):
108
        with prepare_file(['line1', 'line2\n'],
109
                          "/file/name",
110
                          force_linebreaks=True,
111
                          create_tempfile=True) as (lines, filename):
112
            self.assertEqual(filename, '/file/name')
113
            self.assertEqual(lines, ['line1\n', 'line2\n'])
114
115
        with prepare_file(['line1', 'line2\n'],
116
                          None,
117
                          force_linebreaks=False,
118
                          create_tempfile=True) as (lines, filename):
119
            self.assertTrue(os.path.isfile(filename))
120
            self.assertEqual(lines, ['line1', 'line2\n'])
121
122
        with prepare_file(['line1', 'line2\n'],
123
                          {"suffix": ".test", "prefix": "test_"},
124
                          force_linebreaks=False,
125
                          create_tempfile=True) as (lines, filename):
126
            self.assertTrue(os.path.isfile(filename))
127
            basename = os.path.basename(filename)
128
            self.assertTrue(basename.endswith(".test"))
129
            self.assertTrue(basename.startswith("test_"))
130
131
        with prepare_file(['line1', 'line2\n'],
132
                          None,
133
                          force_linebreaks=False,
134
                          create_tempfile=False) as (lines, filename):
135
            self.assertEqual(filename, "dummy_file_name")
136