Completed
Pull Request — master (#1431)
by Abdeali
01:31
created

coalib.misc.prepare_file()   F

Complexity

Conditions 10

Size

Total Lines 30

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 10
dl 0
loc 30
rs 3.1304

How to fix   Complexity   

Complexity

Complex classes like coalib.misc.prepare_file() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from contextlib import contextmanager, closing
2
import sys
3
import os
4
from io import StringIO
5
import builtins
6
import signal
7
import threading
8
import platform
9
import tempfile
10
11
from coalib.misc.MutableValue import MutableValue
12
13
14
@contextmanager
15
def subprocess_timeout(sub_process, seconds, kill_pg=False):
16
    """
17
    Kill subprocess if the sub process takes more the than the timeout.
18
19
    :param sub_process: The sub process to run.
20
    :param seconds:     The number of seconds to allow the test to run for. If
21
                        set to 0 or a negative value, it waits indefinitely.
22
                        Floats can be used to specify units smaller than
23
                        seconds.
24
    :param kill_pg:     Boolean whether to kill the process group or only this
25
                        process. (not applicable for windows)
26
    """
27
    timedout = MutableValue(False)
28
29
    if seconds <= 0:
30
        yield timedout
31
        return
32
33
    finished = threading.Event()
34
35
    if platform.system() == "Windows":  # pragma: no cover
36
        kill_pg = False
37
38
    def kill_it():
39
        finished.wait(seconds)
40
        if not finished.is_set():
41
            timedout.value = True
42
            if kill_pg:
43
                pgid = os.getpgid(sub_process.pid)
44
            os.kill(sub_process.pid, signal.SIGINT)
45
            if kill_pg:
46
                os.killpg(pgid, signal.SIGINT)
47
48
    thread = threading.Thread(name='timeout-killer', target=kill_it)
49
    try:
50
        thread.start()
51
        yield timedout
52
    finally:
53
        finished.set()
54
        thread.join()
55
56
57
@contextmanager
58
def replace_stdout(replacement):
59
    """
60
    Replaces stdout with the replacement, yields back to the caller and then
61
    reverts everything back.
62
    """
63
    _stdout = sys.stdout
64
    sys.stdout = replacement
65
    try:
66
        yield
67
    finally:
68
        sys.stdout = _stdout
69
70
71
@contextmanager
72
def suppress_stdout():
73
    """
74
    Suppresses everything going to stdout.
75
    """
76
    with open(os.devnull, "w") as devnull, replace_stdout(devnull):
77
        yield
78
79
80
@contextmanager
81
def retrieve_stdout():
82
    """
83
    Yields a StringIO object from which one can read everything that was
84
    printed to stdout. (It won't be printed to the real stdout!)
85
86
    Example usage:
87
88
    with retrieve_stdout() as stdout:
89
        print("something")  # Won't print to the console
90
        what_was_printed = stdout.getvalue()  # Save the value
91
    """
92
    with closing(StringIO()) as sio, replace_stdout(sio):
93
        oldprint = builtins.print
94
        try:
95
            # Overriding stdout doesn't work with libraries, this ensures even
96
            # cached variables take this up. Well... it works.
97
            def newprint(*args, **kwargs):
98
                kwargs['file'] = sio
99
                oldprint(*args, **kwargs)
100
101
            builtins.print = newprint
102
            yield sio
103
        finally:
104
            builtins.print = oldprint
105
106
107
@contextmanager
108
def simulate_console_inputs(*inputs):
109
    """
110
    Does some magic to simulate the given inputs to any calls to the `input`
111
    builtin. This yields back an InputGenerator object so you can check
112
    which input was already used and append any additional inputs you want.
113
    Example:
114
115
        with simulate_console_inputs(0, 1, 2) as generator:
116
            assert(input() == 0)
117
            assert(generator.last_input == 0)
118
            generator.inputs.append(3)
119
            assert(input() == 1)
120
            assert(input() == 2)
121
            assert(input() == 3)
122
            assert(generator.last_input == 3)
123
124
    :param inputs:      Any inputs to simulate.
125
    :raises ValueError: Raised when was asked for more input but there's no
126
                        more provided.
127
    """
128
    class InputGenerator:
129
130
        def __init__(self, inputs):
131
            self.last_input = -1
132
            self.inputs = inputs
133
134
        def generate_input(self, prompt=''):
135
            print(prompt, end="")
136
            self.last_input += 1
137
            try:
138
                return self.inputs[self.last_input]
139
            except IndexError:
140
                raise ValueError("Asked for more input, but no more was "
141
                                 "provided from `simulate_console_inputs`.")
142
143
    input_generator = InputGenerator(list(inputs))
144
    _input = builtins.input
145
    builtins.input = input_generator.generate_input
146
    try:
147
        yield input_generator
148
    finally:
149
        builtins.input = _input
150
151
152
@contextmanager
153
def make_temp(suffix="", prefix="tmp", dir=None):
154
    """
155
    Creates a temporary file with a closed stream and deletes it when done.
156
157
    :return: A contextmanager retrieving the file path.
158
    """
159
    temporary = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
160
    os.close(temporary[0])
161
    try:
162
        yield temporary[1]
163
    finally:
164
        os.remove(temporary[1])
165
166
167
@contextmanager
168
def prepare_file(lines,
169
                 filename,
170
                 force_linebreaks=True,
171
                 create_tempfile=True,
172
                 tempfile_kwargs={}):
173
    """
174
    Can creates a temporary file (if filename is None) with the lines.
175
    Can also add a trailing newline to each line specified if needed.
176
177
    :param lines:            The lines from the file. (list of strings)
178
    :param filename:         The filename to be prepared.
179
    :param force_linebreaks: Whether to append newlines at each line if needed.
180
    :param create_tempfile:  Whether to save lines in tempfile if needed.
181
    :param tempfile_kwargs:  Kwargs passed to tempfile.mkstemp().
182
    """
183
    if force_linebreaks:
184
        for i, line in enumerate(lines):
185
            lines[i] = line if line.endswith("\n") else line + "\n"
186
187
    if not create_tempfile and filename is None:
188
        filename = "dummy_file_name"
189
190
    if not isinstance(filename, str) and create_tempfile:
191
        with make_temp(**tempfile_kwargs) as filename:
192
            with open(filename, 'w') as file:
193
                file.writelines(lines)
194
            yield lines, filename
195
    else:
196
        yield lines, filename
197