1 | import builtins |
||
2 | import os |
||
3 | import platform |
||
4 | import signal |
||
5 | import sys |
||
6 | import tempfile |
||
7 | import threading |
||
8 | from contextlib import closing, contextmanager |
||
9 | from io import StringIO |
||
10 | |||
11 | from coalib.misc.MutableValue import MutableValue |
||
12 | |||
13 | |||
14 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
![]() |
|||
15 | def subprocess_timeout(sub_process, seconds, kill_pg=False): |
||
16 | """ |
||
17 | Kill subprocess if the sub process takes more the than the timeout. |
||
18 | |||
19 | :param sub_process: The sub process to run. |
||
20 | :param seconds: The number of seconds to allow the test to run for. If |
||
21 | set to 0 or a negative value, it waits indefinitely. |
||
22 | Floats can be used to specify units smaller than |
||
23 | seconds. |
||
24 | :param kill_pg: Boolean whether to kill the process group or only this |
||
25 | process. (not applicable for windows) |
||
26 | """ |
||
27 | timedout = MutableValue(False) |
||
28 | |||
29 | if seconds <= 0: |
||
30 | yield timedout |
||
31 | return |
||
32 | |||
33 | finished = threading.Event() |
||
34 | |||
35 | if platform.system() == "Windows": # pragma: no cover |
||
36 | kill_pg = False |
||
37 | |||
38 | def kill_it(): |
||
39 | finished.wait(seconds) |
||
40 | if not finished.is_set(): |
||
41 | timedout.value = True |
||
42 | if kill_pg: |
||
43 | pgid = os.getpgid(sub_process.pid) |
||
44 | os.kill(sub_process.pid, signal.SIGINT) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
45 | if kill_pg: |
||
46 | os.killpg(pgid, signal.SIGINT) |
||
0 ignored issues
–
show
|
|||
47 | |||
48 | thread = threading.Thread(name='timeout-killer', target=kill_it) |
||
49 | try: |
||
50 | thread.start() |
||
51 | yield timedout |
||
52 | finally: |
||
53 | finished.set() |
||
54 | thread.join() |
||
55 | |||
56 | |||
57 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
58 | def replace_stdout(replacement): |
||
59 | """ |
||
60 | Replaces stdout with the replacement, yields back to the caller and then |
||
61 | reverts everything back. |
||
62 | """ |
||
63 | _stdout = sys.stdout |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
64 | sys.stdout = replacement |
||
65 | try: |
||
66 | yield |
||
67 | finally: |
||
68 | sys.stdout = _stdout |
||
69 | |||
70 | |||
71 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
72 | def replace_stderr(replacement): |
||
73 | """ |
||
74 | Replaces stderr with the replacement, yields back to the caller and then |
||
75 | reverts everything back. |
||
76 | """ |
||
77 | _stderr = sys.stderr |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
78 | sys.stderr = replacement |
||
79 | try: |
||
80 | yield |
||
81 | finally: |
||
82 | sys.stderr = _stderr |
||
83 | |||
84 | |||
85 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
86 | def suppress_stdout(): |
||
87 | """ |
||
88 | Suppresses everything going to stdout. |
||
89 | """ |
||
90 | with open(os.devnull, "w") as devnull, replace_stdout(devnull): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
91 | yield |
||
92 | |||
93 | |||
94 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
95 | def retrieve_stdout(): |
||
96 | """ |
||
97 | Yields a StringIO object from which one can read everything that was |
||
98 | printed to stdout. (It won't be printed to the real stdout!) |
||
99 | |||
100 | Example usage: |
||
101 | |||
102 | with retrieve_stdout() as stdout: |
||
103 | print("something") # Won't print to the console |
||
104 | what_was_printed = stdout.getvalue() # Save the value |
||
105 | """ |
||
106 | with closing(StringIO()) as sio, replace_stdout(sio): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
107 | oldprint = builtins.print |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
108 | try: |
||
109 | # Overriding stdout doesn't work with libraries, this ensures even |
||
110 | # cached variables take this up. Well... it works. |
||
111 | def newprint(*args, **kwargs): |
||
112 | kwargs['file'] = sio |
||
113 | oldprint(*args, **kwargs) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
114 | |||
115 | builtins.print = newprint |
||
116 | yield sio |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
117 | finally: |
||
118 | builtins.print = oldprint |
||
119 | |||
120 | |||
121 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
122 | def retrieve_stderr(): |
||
123 | """ |
||
124 | Yields a StringIO object from which one can read everything that was |
||
125 | printed to stderr. (It won't be printed to the real stderr!) |
||
126 | |||
127 | Example usage: |
||
128 | |||
129 | with retrieve_stderr() as stderr: |
||
130 | print("something") # Won't print to the console |
||
131 | what_was_printed = stderr.getvalue() # Save the value |
||
132 | """ |
||
133 | with closing(StringIO()) as sio, replace_stderr(sio): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
134 | oldprint = builtins.print |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
135 | try: |
||
136 | # Overriding stderr doesn't work with libraries, this ensures even |
||
137 | # cached variables take this up. Well... it works. |
||
138 | def newprint(*args, **kwargs): |
||
139 | kwargs['file'] = sio |
||
140 | oldprint(*args, **kwargs) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
141 | |||
142 | builtins.print = newprint |
||
143 | yield sio |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
144 | finally: |
||
145 | builtins.print = oldprint |
||
146 | |||
147 | |||
148 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
149 | def simulate_console_inputs(*inputs): |
||
150 | """ |
||
151 | Does some magic to simulate the given inputs to any calls to the ``input`` |
||
152 | builtin. This yields back an InputGenerator object so you can check |
||
153 | which input was already used and append any additional inputs you want. |
||
154 | Example: |
||
155 | |||
156 | with simulate_console_inputs(0, 1, 2) as generator: |
||
157 | assert(input() == 0) |
||
158 | assert(generator.last_input == 0) |
||
159 | generator.inputs.append(3) |
||
160 | assert(input() == 1) |
||
161 | assert(input() == 2) |
||
162 | assert(input() == 3) |
||
163 | assert(generator.last_input == 3) |
||
164 | |||
165 | :param inputs: Any inputs to simulate. |
||
166 | :raises ValueError: Raised when was asked for more input but there's no |
||
167 | more provided. |
||
168 | """ |
||
169 | class InputGenerator: |
||
170 | |||
171 | def __init__(self, inputs): |
||
172 | self.last_input = -1 |
||
173 | self.inputs = inputs |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
174 | |||
175 | def generate_input(self, prompt=''): |
||
176 | print(prompt, end="") |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
177 | self.last_input += 1 |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
178 | try: |
||
179 | return self.inputs[self.last_input] |
||
180 | except IndexError: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
181 | raise ValueError("Asked for more input, but no more was " |
||
182 | "provided from `simulate_console_inputs`.") |
||
183 | |||
184 | input_generator = InputGenerator(list(inputs)) |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
185 | _input = builtins.input |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
186 | builtins.input = input_generator.generate_input |
||
187 | try: |
||
188 | yield input_generator |
||
189 | finally: |
||
190 | builtins.input = _input |
||
191 | |||
192 | |||
193 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
194 | def make_temp(suffix="", prefix="tmp", dir=None): |
||
195 | """ |
||
196 | Creates a temporary file with a closed stream and deletes it when done. |
||
197 | |||
198 | :return: A contextmanager retrieving the file path. |
||
199 | """ |
||
200 | temporary = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir) |
||
201 | os.close(temporary[0]) |
||
202 | try: |
||
203 | yield temporary[1] |
||
204 | finally: |
||
205 | os.remove(temporary[1]) |
||
206 | |||
207 | |||
208 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
209 | def prepare_file(lines, |
||
210 | filename, |
||
211 | force_linebreaks=True, |
||
212 | create_tempfile=True, |
||
213 | tempfile_kwargs={}): |
||
214 | """ |
||
215 | Can create a temporary file (if filename is None) with the lines. |
||
216 | Can also add a trailing newline to each line specified if needed. |
||
217 | |||
218 | :param lines: The lines from the file. (list or tuple of strings) |
||
219 | :param filename: The filename to be prepared. |
||
220 | :param force_linebreaks: Whether to append newlines at each line if needed. |
||
221 | :param create_tempfile: Whether to save lines in tempfile if needed. |
||
222 | :param tempfile_kwargs: Kwargs passed to tempfile.mkstemp(). |
||
223 | """ |
||
224 | if force_linebreaks: |
||
225 | lines = type(lines)(line if line.endswith('\n') else line+'\n' |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
226 | for line in lines) |
||
227 | |||
228 | if not create_tempfile and filename is None: |
||
229 | filename = "dummy_file_name" |
||
230 | |||
231 | if not isinstance(filename, str) and create_tempfile: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
232 | with make_temp(**tempfile_kwargs) as filename: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Comprehensibility
Best Practice
introduced
by
|
|||
233 | with open(filename, 'w', encoding='utf-8') as file: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
234 | file.writelines(lines) |
||
235 | yield lines, filename |
||
236 | else: |
||
237 | yield lines, filename |
||
238 | |||
239 | |||
240 | @contextmanager |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
241 | def change_directory(path): |
||
242 | old_dir = os.getcwd() |
||
243 | os.chdir(path) |
||
244 | try: |
||
245 | yield |
||
246 | finally: |
||
247 | os.chdir(old_dir) |
||
248 |