TeeingStreamProxy.flush()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python
2
# coding=utf-8
3
from __future__ import division, print_function, unicode_literals
4
import os
5
import sys
6
import subprocess
7
from threading import Timer
8
from contextlib import contextmanager
9
import wrapt
10
from sacred.optional import libc
11
from tempfile import NamedTemporaryFile
12
from sacred.settings import SETTINGS
13
from sacred.utils import FileNotFoundError, StringIO
14
15
16
def flush():
17
    """Try to flush all stdio buffers, both from python and from C."""
18
    try:
19
        sys.stdout.flush()
20
        sys.stderr.flush()
21
    except (AttributeError, ValueError, IOError):
22
        pass  # unsupported
23
    try:
24
        libc.fflush(None)
25
    except (AttributeError, ValueError, IOError):
26
        pass  # unsupported
27
28
29
def get_stdcapturer(mode=None):
30
    mode = mode if mode is not None else SETTINGS.CAPTURE_MODE
31
    return mode, {
32
        "no": no_tee,
33
        "fd": tee_output_fd,
34
        "sys": tee_output_python}[mode]
35
36
37
class TeeingStreamProxy(wrapt.ObjectProxy):
38
    """A wrapper around stdout or stderr that duplicates all output to out."""
39
40
    def __init__(self, wrapped, out):
41
        super(TeeingStreamProxy, self).__init__(wrapped)
42
        self._self_out = out
43
44
    def write(self, data):
45
        self.__wrapped__.write(data)
46
        self._self_out.write(data)
47
48
    def flush(self):
49
        self.__wrapped__.flush()
50
        self._self_out.flush()
51
52
53
class CapturedStdout(object):
54
    def __init__(self, buffer):
55
        self.buffer = buffer
56
        self.read_position = 0
57
        self.final = None
58
59
    @property
60
    def closed(self):
61
        return self.buffer.closed
62
63
    def flush(self):
64
        return self.buffer.flush()
65
66
    def get(self):
67
        if self.final is None:
68
            self.buffer.seek(self.read_position)
69
            value = self.buffer.read()
70
            self.read_position = self.buffer.tell()
71
            return value
72
        else:
73
            value = self.final
74
            self.final = None
75
            return value
76
77
    def finalize(self):
78
        self.flush()
79
        self.final = self.get()
80
        self.buffer.close()
81
82
83
@contextmanager
84
def no_tee():
85
    out = CapturedStdout(StringIO())
86
    try:
87
        yield out
88
    finally:
89
        out.finalize()
90
91
92
@contextmanager
93
def tee_output_python():
94
    """Duplicate sys.stdout and sys.stderr to new StringIO."""
95
    buffer = StringIO()
96
    out = CapturedStdout(buffer)
97
    orig_stdout, orig_stderr = sys.stdout, sys.stderr
98
    flush()
99
    sys.stdout = TeeingStreamProxy(sys.stdout, buffer)
100
    sys.stderr = TeeingStreamProxy(sys.stderr, buffer)
101
    try:
102
        yield out
103
    finally:
104
        flush()
105
        out.finalize()
106
        sys.stdout, sys.stderr = orig_stdout, orig_stderr
107
108
109
# Duplicate stdout and stderr to a file. Inspired by:
110
# http://eli.thegreenplace.net/2015/redirecting-all-kinds-of-stdout-in-python/
111
# http://stackoverflow.com/a/651718/1388435
112
# http://stackoverflow.com/a/22434262/1388435
113
@contextmanager
114
def tee_output_fd():
115
    """Duplicate stdout and stderr to a file on the file descriptor level."""
116
    with NamedTemporaryFile(mode='w+') as target:
117
        original_stdout_fd = 1
118
        original_stderr_fd = 2
119
        target_fd = target.fileno()
120
121
        # Save a copy of the original stdout and stderr file descriptors
122
        saved_stdout_fd = os.dup(original_stdout_fd)
123
        saved_stderr_fd = os.dup(original_stderr_fd)
124
125
        try:
126
            # we call os.setsid to move process to a new process group
127
            # this is done to avoid receiving KeyboardInterrupts (see #149)
128
            # in Python 3 we could just pass start_new_session=True
129
            tee_stdout = subprocess.Popen(
130
                ['tee', '-a', target.name], preexec_fn=os.setsid,
131
                stdin=subprocess.PIPE, stdout=1)
132
            tee_stderr = subprocess.Popen(
133
                ['tee', '-a', target.name], preexec_fn=os.setsid,
134
                stdin=subprocess.PIPE, stdout=2)
135
        except (FileNotFoundError, (OSError, AttributeError)):
136
            # No tee found in this operating system. Trying to use a python
137
            # implementation of tee. However this is slow and error-prone.
138
            tee_stdout = subprocess.Popen(
139
                [sys.executable, "-m", "sacred.pytee"],
140
                stdin=subprocess.PIPE, stderr=target_fd)
141
            tee_stderr = subprocess.Popen(
142
                [sys.executable, "-m", "sacred.pytee"],
143
                stdin=subprocess.PIPE, stdout=target_fd)
144
145
        flush()
146
        os.dup2(tee_stdout.stdin.fileno(), original_stdout_fd)
147
        os.dup2(tee_stderr.stdin.fileno(), original_stderr_fd)
148
        out = CapturedStdout(target)
149
150
        try:
151
            yield out  # let the caller do their printing
152
        finally:
153
            flush()
154
155
            # then redirect stdout back to the saved fd
156
            tee_stdout.stdin.close()
157
            tee_stderr.stdin.close()
158
159
            # restore original fds
160
            os.dup2(saved_stdout_fd, original_stdout_fd)
161
            os.dup2(saved_stderr_fd, original_stderr_fd)
162
163
            # wait for completion of the tee processes with timeout
164
            # implemented using a timer because timeout support is py3 only
165
            def kill_tees():
166
                tee_stdout.kill()
167
                tee_stderr.kill()
168
169
            tee_timer = Timer(1, kill_tees)
170
            try:
171
                tee_timer.start()
172
                tee_stdout.wait()
173
                tee_stderr.wait()
174
            finally:
175
                tee_timer.cancel()
176
177
            os.close(saved_stdout_fd)
178
            os.close(saved_stderr_fd)
179
            out.finalize()
180