Completed
Pull Request — master (#1183)
by Lasse
01:39
created

coalib.misc.preserve_sys_path()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 1
dl 0
loc 7
rs 9.4286
1
from contextlib import contextmanager, closing
2
import sys
3
import os
4
from io import StringIO
5
import builtins
6
import signal
7
import threading
8
import platform
9
10
from coalib.misc.MutableValue import MutableValue
11
12
13
@contextmanager
14
def subprocess_timeout(sub_process, seconds, kill_pg=False):
15
    """
16
    Kill subprocess if the sub process takes more the than the timeout.
17
18
    :param sub_process: The sub process to run.
19
    :param seconds:     The number of seconds to allow the test to run for. If
20
                        set to 0 or a negative value, it waits indefinitely.
21
                        Floats can be used to specify units smaller than
22
                        seconds.
23
    :param kill_pg:     Boolean whether to kill the process group or only this
24
                        process. (not applicable for windows)
25
    """
26
    timedout = MutableValue(False)
27
28
    if seconds <= 0:
29
        yield timedout
30
        return
31
32
    finished = threading.Event()
33
34
    if platform.system() == "Windows":  # pragma: no cover
35
        kill_pg = False
36
37
    def kill_it():
38
        finished.wait(seconds)
39
        if not finished.is_set():
40
            timedout.value = True
41
            if kill_pg:
42
                pgid = os.getpgid(sub_process.pid)
43
            os.kill(sub_process.pid, signal.SIGINT)
44
            if kill_pg:
45
                os.killpg(pgid, signal.SIGINT)
46
47
    thread = threading.Thread(name='timeout-killer', target=kill_it)
48
    try:
49
        thread.start()
50
        yield timedout
51
    finally:
52
        finished.set()
53
        thread.join()
54
55
56
@contextmanager
57
def replace_stdout(replacement):
58
    """
59
    Replaces stdout with the replacement, yields back to the caller and then
60
    reverts everything back.
61
    """
62
    _stdout = sys.stdout
63
    sys.stdout = replacement
64
    try:
65
        yield
66
    finally:
67
        sys.stdout = _stdout
68
69
70
@contextmanager
71
def suppress_stdout():
72
    """
73
    Suppresses everything going to stdout.
74
    """
75
    with open(os.devnull, "w") as devnull, replace_stdout(devnull):
76
        yield
77
78
79
@contextmanager
80
def retrieve_stdout():
81
    """
82
    Yields a StringIO object from which one can read everything that was
83
    printed to stdout. (It won't be printed to the real stdout!)
84
85
    Example usage:
86
87
    with retrieve_stdout() as stdout:
88
        print("something")  # Won't print to the console
89
        what_was_printed = stdout.getvalue()  # Save the value
90
    """
91
    with closing(StringIO()) as sio, replace_stdout(sio):
92
        oldprint = builtins.print
93
        try:
94
            # Overriding stdout doesn't work with libraries, this ensures even
95
            # cached variables take this up. Well... it works.
96
            def newprint(*args, **kwargs):
97
                kwargs['file'] = sio
98
                oldprint(*args, **kwargs)
99
100
            builtins.print = newprint
101
            yield sio
102
        finally:
103
            builtins.print = oldprint
104
105
106
@contextmanager
107
def simulate_console_inputs(*inputs):
108
    """
109
    Does some magic to simulate the given inputs to any calls to the `input`
110
    builtin. This yields back an InputGenerator object so you can check
111
    which input was already used and append any additional inputs you want.
112
    Example:
113
114
        with simulate_console_inputs(0, 1, 2) as generator:
115
            assert(input() == 0)
116
            assert(generator.last_input == 0)
117
            generator.inputs.append(3)
118
            assert(input() == 1)
119
            assert(input() == 2)
120
            assert(input() == 3)
121
            assert(generator.last_input == 3)
122
123
    :param inputs:      Any inputs to simulate.
124
    :raises ValueError: Raised when was asked for more input but there's no
125
                        more provided.
126
    """
127
    class InputGenerator:
128
        def __init__(self, inputs):
129
            self.last_input = -1
130
            self.inputs = inputs
131
132
        def generate_input(self, prompt=''):
133
            print(prompt, end="")
134
            self.last_input += 1
135
            try:
136
                return self.inputs[self.last_input]
137
            except IndexError:
138
                raise ValueError("Asked for more input, but no more was "
139
                                 "provided from `simulate_console_inputs`.")
140
141
    input_generator = InputGenerator(list(inputs))
142
    _input = builtins.input
143
    builtins.input = input_generator.generate_input
144
    try:
145
        yield input_generator
146
    finally:
147
        builtins.input = _input
148