Output   A
last analyzed

Complexity

Total Complexity 0

Size/Duplication

Total Lines 3
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import atexit
4
import errno
5
import logging
6
import os
7
import signal
8
import sys
9
import time
10
from functools import partial
11
12
from process_tests import setup_coverage
13
14
TIMEOUT = int(os.getenv('MANHOLE_TEST_TIMEOUT', 10))
15
SOCKET_PATH = '/tmp/manhole-socket'
16
OUTPUT = sys.__stdout__
17
18
19
def handle_sigterm(signo, _frame):
20
    # Simulate real termination
21
    print("Terminated", file=OUTPUT)
22
    sys.exit(128 + signo)
23
24
25
# Handling sigterm ensure that atexit functions are called, and we do not leave
26
# leftover /tmp/manhole-pid sockets.
27
signal.signal(signal.SIGTERM, handle_sigterm)
28
29
30
@atexit.register
31
def log_exit():
32
    print("In atexit handler.", file=OUTPUT)
33
34
35
def setup_greenthreads(patch_threads=False):
36
    try:
37
        from gevent import monkey
38
        monkey.patch_all(thread=False)
39
    except (ImportError, SyntaxError):
40
        pass
41
42
    try:
43
        import eventlet
44
        eventlet.hubs.get_hub()  # workaround for circular import issue in eventlet,
45
                                 # see https://github.com/eventlet/eventlet/issues/401
46
        eventlet.monkey_patch(thread=False)
47
    except (ImportError, SyntaxError):
48
        pass
49
50
51
def do_fork():
52
    pid = os.fork()
53
    if pid:
54
        @atexit.register
55
        def cleanup():
56
            try:
57
                os.kill(pid, signal.SIGINT)
58
                time.sleep(0.2)
59
                os.kill(pid, signal.SIGTERM)
60
            except OSError as e:
61
                if e.errno != errno.ESRCH:
62
                    raise
63
64
        os.waitpid(pid, 0)
65
    else:
66
        time.sleep(TIMEOUT * 10)
67
68
69
if __name__ == '__main__':
70
    logging.basicConfig(
71
        level=logging.DEBUG,
72
        format='[pid=%(process)d - %(asctime)s]: %(name)s - %(levelname)s - %(message)s',
73
    )
74
    test_name = sys.argv[1]
75
    try:
76
77
        setup_coverage()
78
79
        if os.getenv('PATCH_THREAD', False):
80
            import manhole
81
82
            setup_greenthreads(True)
83
        else:
84
            setup_greenthreads(True)
85
            import manhole
86
87
        if test_name == 'test_environ_variable_activation':
88
            time.sleep(TIMEOUT)
89
        elif test_name == 'test_install_twice_not_strict':
90
            manhole.install(oneshot_on='USR2')
91
            manhole.install(strict=False)
92
            time.sleep(TIMEOUT)
93
        elif test_name == 'test_log_fd':
94
            manhole.install(verbose=True, verbose_destination=2)
95
            manhole._LOG("whatever-1")
96
            manhole._LOG("whatever-2")
97
        elif test_name == 'test_log_fh':
98
            class Output(object):
99
                data = []
100
                write = data.append
101
102
            manhole.install(verbose=True, verbose_destination=Output)
103
            manhole._LOG("whatever")
104
            if Output.data and "]: whatever" in Output.data[-1]:
105
                print("SUCCESS")
106
        elif test_name == 'test_activate_on_usr2':
107
            manhole.install(activate_on='USR2')
108
            for i in range(TIMEOUT * 100):
109
                time.sleep(0.1)
110
        elif test_name == 'test_install_once':
111
            manhole.install()
112
            try:
113
                manhole.install()
114
            except manhole.AlreadyInstalled:
115
                print('ALREADY_INSTALLED')
116
            else:
117
                raise AssertionError("Did not raise AlreadyInstalled")
118
        elif test_name == 'test_stderr_doesnt_deadlock':
119
            import subprocess
120
121
            manhole.install()
122
123
            for i in range(50):
124
                print('running iteration', i)
125
                p = subprocess.Popen(['true'])
126
                print('waiting for process', p.pid)
127
                p.wait()
128
                print('process ended')
129
                path = '/tmp/manhole-%d' % p.pid
130
                if os.path.exists(path):
131
                    os.unlink(path)
132
                    raise AssertionError(path + ' exists !')
133
            print('SUCCESS')
134
        elif test_name == 'test_fork_exec':
135
            manhole.install(reinstall_delay=5)
136
            print("Installed.")
137
            time.sleep(0.2)
138
            pid = os.fork()
139
            print("Forked, pid =", pid)
140
            if pid:
141
                os.waitpid(pid, 0)
142
                path = '/tmp/manhole-%d' % pid
143
                if os.path.exists(path):
144
                    os.unlink(path)
145
                    raise AssertionError(path + ' exists !')
146
            else:
147
                try:
148
                    time.sleep(1)
149
                    print("Exec-ing `true`")
150
                    os.execvp('true', ['true'])
151
                finally:
152
                    os._exit(1)
153
            print('SUCCESS')
154
        elif test_name == 'test_activate_on_with_oneshot_on':
155
            manhole.install(activate_on='USR2', oneshot_on='USR2')
156
            for i in range(TIMEOUT * 100):
157
                time.sleep(0.1)
158
        elif test_name == 'test_interrupt_on_accept':
159
            def handle_usr2(_sig, _frame):
160
                print('Got USR2')
161
162
163
            signal.signal(signal.SIGUSR2, handle_usr2)
164
165
            import ctypes
166
            import ctypes.util
167
168
            libpthread_path = ctypes.util.find_library("pthread")
169
            if not libpthread_path:
170
                raise ImportError
171
            libpthread = ctypes.CDLL(libpthread_path)
172
            if not hasattr(libpthread, "pthread_setname_np"):
173
                raise ImportError
174
            pthread_kill = libpthread.pthread_kill
175
            pthread_kill.argtypes = [ctypes.c_void_p, ctypes.c_int]
176
            pthread_kill.restype = ctypes.c_int
177
            manhole.install(sigmask=None)
178
            for i in range(15):
179
                time.sleep(0.1)
180
            print("Sending signal to manhole thread ...")
181
            pthread_kill(manhole._MANHOLE.thread.ident, signal.SIGUSR2)
182
            for i in range(TIMEOUT * 100):
183
                time.sleep(0.1)
184
        elif test_name == 'test_oneshot_on_usr2':
185
            manhole.install(oneshot_on='USR2')
186
            for i in range(TIMEOUT * 100):
187
                time.sleep(0.1)
188
        elif test_name.startswith('test_signalfd_weirdness'):
189
            signalled = False
190
            @partial(signal.signal, signal.SIGUSR1)
191
            def signal_handler(sig, _):
192
                print('Received signal %s' % sig)
193
                global signalled
194
                signalled = True
195
            if 'negative' in test_name:
196
                manhole.install(sigmask=None)
197
            else:
198
                manhole.install(sigmask=[signal.SIGUSR1])
199
200
            time.sleep(0.3)  # give the manhole a bit enough time to start
201
            print('Starting ...')
202
            import signalfd
203
204
            signalfd.sigprocmask(signalfd.SIG_BLOCK, [signal.SIGUSR1])
205
            sys.setcheckinterval(1)
206
            for i in range(100000):
207
                os.kill(os.getpid(), signal.SIGUSR1)
208
            print('signalled=%s' % signalled)
209
            time.sleep(TIMEOUT * 10)
210
        elif test_name == 'test_auth_fail':
211
            manhole.get_peercred = lambda _: (-1, -1, -1)
212
            manhole.install()
213
            time.sleep(TIMEOUT * 10)
214
        elif test_name == 'test_socket_path':
215
            manhole.install(socket_path=SOCKET_PATH)
216
            time.sleep(TIMEOUT * 10)
217
        elif test_name == 'test_daemon_connection':
218
            manhole.install(daemon_connection=True)
219
            time.sleep(TIMEOUT)
220
        elif test_name == 'test_socket_path_with_fork':
221
            manhole.install(socket_path=SOCKET_PATH)
222
            time.sleep(TIMEOUT)
223
            do_fork()
224
        elif test_name == 'test_locals':
225
            manhole.install(socket_path=SOCKET_PATH,
226
                            locals={'k1': 'v1', 'k2': 'v2'})
227
            time.sleep(TIMEOUT)
228
        elif test_name == 'test_locals_after_fork':
229
            manhole.install(locals={'k1': 'v1', 'k2': 'v2'})
230
            do_fork()
231
        elif test_name == 'test_redirect_stderr_default':
232
            manhole.install(socket_path=SOCKET_PATH)
233
            time.sleep(TIMEOUT)
234
        elif test_name == 'test_redirect_stderr_disabled':
235
            manhole.install(socket_path=SOCKET_PATH, redirect_stderr=False)
236
            time.sleep(TIMEOUT)
237
        elif test_name == 'test_sigmask':
238
            manhole.install(socket_path=SOCKET_PATH, sigmask=[signal.SIGUSR1])
239
            time.sleep(TIMEOUT)
240
        elif test_name == 'test_connection_handler_exec_func':
241
            manhole.install(connection_handler=manhole.handle_connection_exec, locals={'tete': lambda: print('TETE')})
242
            time.sleep(TIMEOUT * 10)
243
        elif test_name == 'test_connection_handler_exec_str':
244
            manhole.install(connection_handler='exec', locals={'tete': lambda: print('TETE')})
245
            time.sleep(TIMEOUT * 10)
246
        else:
247
            manhole.install()
248
            time.sleep(0.3)  # give the manhole a bit enough time to start
249
            if test_name == 'test_simple':
250
                time.sleep(TIMEOUT * 10)
251
            elif test_name == 'test_with_forkpty':
252
                time.sleep(1)
253
                pid, masterfd = os.forkpty()
254
                if pid:
255
                    @atexit.register
256
                    def cleanup():
257
                        try:
258
                            os.kill(pid, signal.SIGINT)
259
                            time.sleep(0.2)
260
                            os.kill(pid, signal.SIGTERM)
261
                        except OSError as e:
262
                            if e.errno != errno.ESRCH:
263
                                raise
264
265
266
                    while not os.waitpid(pid, os.WNOHANG)[0]:
267
                        try:
268
                            os.write(2, os.read(masterfd, 1024))
269
                        except OSError as e:
270
                            print("Error while reading from masterfd:", e)
271
                else:
272
                    time.sleep(TIMEOUT * 10)
273
            elif test_name == 'test_with_fork':
274
                time.sleep(1)
275
                do_fork()
276
            else:
277
                raise RuntimeError('Invalid test spec.')
278
    except:  # pylint: disable=W0702
279
        print('Died with %s.' % sys.exc_info()[0].__name__, file=OUTPUT)
280
        import traceback
281
282
        traceback.print_exc(file=OUTPUT)
283
    print('DIED.', file=OUTPUT)
284