Failed Conditions
Pull Request — master (#2076)
by Abdeali
02:11
created

coalib/misc/ContextManagers.py (37 issues)

1
import builtins
2
import os
3
import platform
4
import signal
5
import sys
6
import tempfile
7
import threading
8
from contextlib import closing, contextmanager
9
from io import StringIO
10
11
from coalib.misc.MutableValue import MutableValue
12
13
14
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
15
def subprocess_timeout(sub_process, seconds, kill_pg=False):
16
    """
17
    Kill subprocess if the sub process takes more the than the timeout.
18
19
    :param sub_process: The sub process to run.
20
    :param seconds:     The number of seconds to allow the test to run for. If
21
                        set to 0 or a negative value, it waits indefinitely.
22
                        Floats can be used to specify units smaller than
23
                        seconds.
24
    :param kill_pg:     Boolean whether to kill the process group or only this
25
                        process. (not applicable for windows)
26
    """
27
    timedout = MutableValue(False)
28
29
    if seconds <= 0:
30
        yield timedout
31
        return
32
33
    finished = threading.Event()
34
35
    if platform.system() == "Windows":  # pragma: no cover
36
        kill_pg = False
37
38
    def kill_it():
39
        finished.wait(seconds)
40
        if not finished.is_set():
41
            timedout.value = True
42
            if kill_pg:
43
                pgid = os.getpgid(sub_process.pid)
44
            os.kill(sub_process.pid, signal.SIGINT)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable signal does not seem to be defined.
Loading history...
45
            if kill_pg:
46
                os.killpg(pgid, signal.SIGINT)
0 ignored issues
show
The variable pgid does not seem to be defined in case kill_pg on line 42 is False. Are you sure this can never be the case?
Loading history...
47
48
    thread = threading.Thread(name='timeout-killer', target=kill_it)
49
    try:
50
        thread.start()
51
        yield timedout
52
    finally:
53
        finished.set()
54
        thread.join()
55
56
57
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
58
def replace_stdout(replacement):
59
    """
60
    Replaces stdout with the replacement, yields back to the caller and then
61
    reverts everything back.
62
    """
63
    _stdout = sys.stdout
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sys does not seem to be defined.
Loading history...
64
    sys.stdout = replacement
65
    try:
66
        yield
67
    finally:
68
        sys.stdout = _stdout
69
70
71
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
72
def replace_stderr(replacement):
73
    """
74
    Replaces stderr with the replacement, yields back to the caller and then
75
    reverts everything back.
76
    """
77
    _stderr = sys.stderr
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sys does not seem to be defined.
Loading history...
78
    sys.stderr = replacement
79
    try:
80
        yield
81
    finally:
82
        sys.stderr = _stderr
83
84
85
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
86
def suppress_stdout():
87
    """
88
    Suppresses everything going to stdout.
89
    """
90
    with open(os.devnull, "w") as devnull, replace_stdout(devnull):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable os does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable devnull does not seem to be defined.
Loading history...
91
        yield
92
93
94
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
95
def retrieve_stdout():
96
    """
97
    Yields a StringIO object from which one can read everything that was
98
    printed to stdout. (It won't be printed to the real stdout!)
99
100
    Example usage:
101
102
    with retrieve_stdout() as stdout:
103
        print("something")  # Won't print to the console
104
        what_was_printed = stdout.getvalue()  # Save the value
105
    """
106
    with closing(StringIO()) as sio, replace_stdout(sio):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sio does not seem to be defined.
Loading history...
107
        oldprint = builtins.print
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable builtins does not seem to be defined.
Loading history...
108
        try:
109
            # Overriding stdout doesn't work with libraries, this ensures even
110
            # cached variables take this up. Well... it works.
111
            def newprint(*args, **kwargs):
112
                kwargs['file'] = sio
113
                oldprint(*args, **kwargs)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable kwargs does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
114
115
            builtins.print = newprint
116
            yield sio
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sio does not seem to be defined.
Loading history...
117
        finally:
118
            builtins.print = oldprint
119
120
121
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
122
def retrieve_stderr():
123
    """
124
    Yields a StringIO object from which one can read everything that was
125
    printed to stderr. (It won't be printed to the real stderr!)
126
127
    Example usage:
128
129
    with retrieve_stderr() as stderr:
130
        print("something")  # Won't print to the console
131
        what_was_printed = stderr.getvalue()  # Save the value
132
    """
133
    with closing(StringIO()) as sio, replace_stderr(sio):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sio does not seem to be defined.
Loading history...
134
        oldprint = builtins.print
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable builtins does not seem to be defined.
Loading history...
135
        try:
136
            # Overriding stderr doesn't work with libraries, this ensures even
137
            # cached variables take this up. Well... it works.
138
            def newprint(*args, **kwargs):
139
                kwargs['file'] = sio
140
                oldprint(*args, **kwargs)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable kwargs does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable args does not seem to be defined.
Loading history...
141
142
            builtins.print = newprint
143
            yield sio
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable sio does not seem to be defined.
Loading history...
144
        finally:
145
            builtins.print = oldprint
146
147
148
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
149
def simulate_console_inputs(*inputs):
150
    """
151
    Does some magic to simulate the given inputs to any calls to the ``input``
152
    builtin. This yields back an InputGenerator object so you can check
153
    which input was already used and append any additional inputs you want.
154
    Example:
155
156
        with simulate_console_inputs(0, 1, 2) as generator:
157
            assert(input() == 0)
158
            assert(generator.last_input == 0)
159
            generator.inputs.append(3)
160
            assert(input() == 1)
161
            assert(input() == 2)
162
            assert(input() == 3)
163
            assert(generator.last_input == 3)
164
165
    :param inputs:      Any inputs to simulate.
166
    :raises ValueError: Raised when was asked for more input but there's no
167
                        more provided.
168
    """
169
    class InputGenerator:
170
171
        def __init__(self, inputs):
172
            self.last_input = -1
173
            self.inputs = inputs
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable inputs does not seem to be defined.
Loading history...
174
175
        def generate_input(self, prompt=''):
176
            print(prompt, end="")
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable prompt does not seem to be defined.
Loading history...
177
            self.last_input += 1
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable self does not seem to be defined.
Loading history...
178
            try:
179
                return self.inputs[self.last_input]
180
            except IndexError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable IndexError does not seem to be defined.
Loading history...
181
                raise ValueError("Asked for more input, but no more was "
182
                                 "provided from `simulate_console_inputs`.")
183
184
    input_generator = InputGenerator(list(inputs))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable inputs does not seem to be defined.
Loading history...
185
    _input = builtins.input
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable builtins does not seem to be defined.
Loading history...
186
    builtins.input = input_generator.generate_input
187
    try:
188
        yield input_generator
189
    finally:
190
        builtins.input = _input
191
192
193
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
194
def make_temp(suffix="", prefix="tmp", dir=None):
195
    """
196
    Creates a temporary file with a closed stream and deletes it when done.
197
198
    :return: A contextmanager retrieving the file path.
199
    """
200
    temporary = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
201
    os.close(temporary[0])
202
    try:
203
        yield temporary[1]
204
    finally:
205
        os.remove(temporary[1])
206
207
208
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
209
def prepare_file(lines,
210
                 filename,
211
                 force_linebreaks=True,
212
                 create_tempfile=True,
213
                 tempfile_kwargs={}):
214
    """
215
    Can create a temporary file (if filename is None) with the lines.
216
    Can also add a trailing newline to each line specified if needed.
217
218
    :param lines:            The lines from the file. (list or tuple of strings)
219
    :param filename:         The filename to be prepared.
220
    :param force_linebreaks: Whether to append newlines at each line if needed.
221
    :param create_tempfile:  Whether to save lines in tempfile if needed.
222
    :param tempfile_kwargs:  Kwargs passed to tempfile.mkstemp().
223
    """
224
    if force_linebreaks:
225
        lines = type(lines)(line if line.endswith('\n') else line+'\n'
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable line does not seem to be defined.
Loading history...
226
                            for line in lines)
227
228
    if not create_tempfile and filename is None:
229
        filename = "dummy_file_name"
230
231
    if not isinstance(filename, str) and create_tempfile:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
232
        with make_temp(**tempfile_kwargs) as filename:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable filename does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable tempfile_kwargs does not seem to be defined.
Loading history...
233
            with open(filename, 'w', encoding='utf-8') as file:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable file does not seem to be defined.
Loading history...
234
                file.writelines(lines)
235
            yield lines, filename
236
    else:
237
        yield lines, filename
238
239
240
@contextmanager
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable contextmanager does not seem to be defined.
Loading history...
241
def change_directory(path):
242
    old_dir = os.getcwd()
243
    os.chdir(path)
244
    try:
245
        yield
246
    finally:
247
        os.chdir(old_dir)
248