prepare_file()   F
last analyzed

Complexity

Conditions 10

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
dl 0
loc 30
rs 3.1304
c 0
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like prepare_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import builtins
2
import os
3
import platform
4
import signal
5
import sys
6
import tempfile
7
import threading
8
from contextlib import closing, contextmanager
9
from io import StringIO
10
11
from coalib.misc.MutableValue import MutableValue
12
13
14
@contextmanager
15
def subprocess_timeout(sub_process, seconds, kill_pg=False):
16
    """
17
    Kill subprocess if the sub process takes more the than the timeout.
18
19
    :param sub_process: The sub process to run.
20
    :param seconds:     The number of seconds to allow the test to run for. If
21
                        set to 0 or a negative value, it waits indefinitely.
22
                        Floats can be used to specify units smaller than
23
                        seconds.
24
    :param kill_pg:     Boolean whether to kill the process group or only this
25
                        process. (not applicable for windows)
26
    """
27
    timedout = MutableValue(False)
28
29
    if seconds <= 0:
30
        yield timedout
31
        return
32
33
    finished = threading.Event()
34
35
    if platform.system() == "Windows":  # pragma: no cover
36
        kill_pg = False
37
38
    def kill_it():
39
        finished.wait(seconds)
40
        if not finished.is_set():
41
            timedout.value = True
42
            if kill_pg:
43
                pgid = os.getpgid(sub_process.pid)
44
            os.kill(sub_process.pid, signal.SIGINT)
45
            if kill_pg:
46
                os.killpg(pgid, signal.SIGINT)
47
48
    thread = threading.Thread(name='timeout-killer', target=kill_it)
49
    try:
50
        thread.start()
51
        yield timedout
52
    finally:
53
        finished.set()
54
        thread.join()
55
56
57
@contextmanager
58
def replace_stdout(replacement):
59
    """
60
    Replaces stdout with the replacement, yields back to the caller and then
61
    reverts everything back.
62
    """
63
    _stdout = sys.stdout
64
    sys.stdout = replacement
65
    try:
66
        yield
67
    finally:
68
        sys.stdout = _stdout
69
70
71
@contextmanager
72
def replace_stderr(replacement):
73
    """
74
    Replaces stderr with the replacement, yields back to the caller and then
75
    reverts everything back.
76
    """
77
    _stderr = sys.stderr
78
    sys.stderr = replacement
79
    try:
80
        yield
81
    finally:
82
        sys.stderr = _stderr
83
84
85
@contextmanager
86
def suppress_stdout():
87
    """
88
    Suppresses everything going to stdout.
89
    """
90
    with open(os.devnull, "w") as devnull, replace_stdout(devnull):
91
        yield
92
93
94
@contextmanager
95
def retrieve_stdout():
96
    """
97
    Yields a StringIO object from which one can read everything that was
98
    printed to stdout. (It won't be printed to the real stdout!)
99
100
    Example usage:
101
102
    with retrieve_stdout() as stdout:
103
        print("something")  # Won't print to the console
104
        what_was_printed = stdout.getvalue()  # Save the value
105
    """
106
    with closing(StringIO()) as sio, replace_stdout(sio):
107
        oldprint = builtins.print
108
        try:
109
            # Overriding stdout doesn't work with libraries, this ensures even
110
            # cached variables take this up. Well... it works.
111
            def newprint(*args, **kwargs):
112
                kwargs['file'] = sio
113
                oldprint(*args, **kwargs)
114
115
            builtins.print = newprint
116
            yield sio
117
        finally:
118
            builtins.print = oldprint
119
120
121
@contextmanager
122
def retrieve_stderr():
123
    """
124
    Yields a StringIO object from which one can read everything that was
125
    printed to stderr. (It won't be printed to the real stderr!)
126
127
    Example usage:
128
129
    with retrieve_stderr() as stderr:
130
        print("something")  # Won't print to the console
131
        what_was_printed = stderr.getvalue()  # Save the value
132
    """
133
    with closing(StringIO()) as sio, replace_stderr(sio):
134
        oldprint = builtins.print
135
        try:
136
            # Overriding stderr doesn't work with libraries, this ensures even
137
            # cached variables take this up. Well... it works.
138
            def newprint(*args, **kwargs):
139
                kwargs['file'] = sio
140
                oldprint(*args, **kwargs)
141
142
            builtins.print = newprint
143
            yield sio
144
        finally:
145
            builtins.print = oldprint
146
147
148
@contextmanager
149
def simulate_console_inputs(*inputs):
150
    """
151
    Does some magic to simulate the given inputs to any calls to the ``input``
152
    builtin. This yields back an InputGenerator object so you can check
153
    which input was already used and append any additional inputs you want.
154
    Example:
155
156
        with simulate_console_inputs(0, 1, 2) as generator:
157
            assert(input() == 0)
158
            assert(generator.last_input == 0)
159
            generator.inputs.append(3)
160
            assert(input() == 1)
161
            assert(input() == 2)
162
            assert(input() == 3)
163
            assert(generator.last_input == 3)
164
165
    :param inputs:      Any inputs to simulate.
166
    :raises ValueError: Raised when was asked for more input but there's no
167
                        more provided.
168
    """
169
    class InputGenerator:
170
171
        def __init__(self, inputs):
172
            self.last_input = -1
173
            self.inputs = inputs
174
175
        def generate_input(self, prompt=''):
176
            print(prompt, end="")
177
            self.last_input += 1
178
            try:
179
                return self.inputs[self.last_input]
180
            except IndexError:
181
                raise ValueError("Asked for more input, but no more was "
182
                                 "provided from `simulate_console_inputs`.")
183
184
    input_generator = InputGenerator(list(inputs))
185
    _input = builtins.input
186
    builtins.input = input_generator.generate_input
187
    try:
188
        yield input_generator
189
    finally:
190
        builtins.input = _input
191
192
193
@contextmanager
194
def make_temp(suffix="", prefix="tmp", dir=None):
195
    """
196
    Creates a temporary file with a closed stream and deletes it when done.
197
198
    :return: A contextmanager retrieving the file path.
199
    """
200
    temporary = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
201
    os.close(temporary[0])
202
    try:
203
        yield temporary[1]
204
    finally:
205
        os.remove(temporary[1])
206
207
208
@contextmanager
209
def prepare_file(lines,
210
                 filename,
211
                 force_linebreaks=True,
212
                 create_tempfile=True,
213
                 tempfile_kwargs={}):
214
    """
215
    Can create a temporary file (if filename is None) with the lines.
216
    Can also add a trailing newline to each line specified if needed.
217
218
    :param lines:            The lines from the file. (list or tuple of strings)
219
    :param filename:         The filename to be prepared.
220
    :param force_linebreaks: Whether to append newlines at each line if needed.
221
    :param create_tempfile:  Whether to save lines in tempfile if needed.
222
    :param tempfile_kwargs:  Kwargs passed to tempfile.mkstemp().
223
    """
224
    if force_linebreaks:
225
        lines = type(lines)(line if line.endswith('\n') else line+'\n'
226
                            for line in lines)
227
228
    if not create_tempfile and filename is None:
229
        filename = "dummy_file_name"
230
231
    if not isinstance(filename, str) and create_tempfile:
232
        with make_temp(**tempfile_kwargs) as filename:
233
            with open(filename, 'w', encoding='utf-8') as file:
234
                file.writelines(lines)
235
            yield lines, filename
236
    else:
237
        yield lines, filename
238
239
240
@contextmanager
241
def change_directory(path):
242
    old_dir = os.getcwd()
243
    os.chdir(path)
244
    try:
245
        yield
246
    finally:
247
        os.chdir(old_dir)
248