Completed
Push — master ( fa9d02...0de63f )
by Ionel Cristian
49s
created

handle_connection_exec()   A

Complexity

Conditions 2

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 10
rs 9.4285
1
from __future__ import print_function
2
3
import atexit
4
import code
5
import errno
6
import os
7
import signal
8
import socket
9
import struct
10
import sys
11
import traceback
12
from contextlib import closing
13
14
__version__ = '1.3.0'
15
16
try:
17
    import signalfd
18
except ImportError:
19
    signalfd = None
20
try:
21
    string = basestring
22
except NameError:  # python 3
23
    string = str
24
try:
25
    InterruptedError = InterruptedError
26
except NameError:  # python <= 3.2
27
    InterruptedError = OSError
28
if hasattr(sys, 'setswitchinterval'):
29
    setinterval = sys.setswitchinterval
30
    getinterval = sys.getswitchinterval
31
else:
32
    setinterval = sys.setcheckinterval
33
    getinterval = sys.getcheckinterval
34
35
try:
36
    from eventlet.patcher import original as _original
37
38
    def _get_original(mod, name):
39
        return getattr(_original(mod), name)
40
except ImportError:
41
    try:
42
        from gevent.monkey import get_original as _get_original
43
    except ImportError:
44
        def _get_original(mod, name):
45
            return getattr(__import__(mod), name)
46
47
_ORIGINAL_SOCKET = _get_original('socket', 'socket')
48
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
49
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
50
try:
51
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
52
except ImportError:  # python 3
53
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
54
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
55
_ORIGINAL_EVENT = _get_original('threading', 'Event')
56
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
57
_ORIGINAL_SLEEP = _get_original('time', 'sleep')
58
59
PY3 = sys.version_info[0] == 3
60
PY26 = sys.version_info[:2] == (2, 6)
61
62
try:
63
    import ctypes
64
    import ctypes.util
65
66
    libpthread_path = ctypes.util.find_library("pthread")
67
    if not libpthread_path:
68
        raise ImportError
69
    libpthread = ctypes.CDLL(libpthread_path)
70
    if not hasattr(libpthread, "pthread_setname_np"):
71
        raise ImportError
72
    _pthread_setname_np = libpthread.pthread_setname_np
73
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
74
    _pthread_setname_np.restype = ctypes.c_int
75
    pthread_setname_np = lambda ident, name: _pthread_setname_np(ident, name[:15].encode('utf8'))
76
except ImportError:
77
    pthread_setname_np = lambda ident, name: None
78
79
if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
80
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
81
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
82
else:
83
    _PEERCRED_LEVEL = socket.SOL_SOCKET
84
    # TODO: Is this missing on some platforms?
85
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)
86
87
_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
88
                     if sig.startswith('SIG') and '_' not in sig)
89
90
# These (_LOG and _MANHOLE) will hold instances after install
91
_MANHOLE = None
92
_LOCK = _ORIGINAL_ALLOCATE_LOCK()
93
94
95
def force_original_socket(sock):
96
    with closing(sock):
97
        if hasattr(_ORIGINAL_SOCKET, '_sock'):
98
            sock._sock, sock = None, sock._sock
99
            return _ORIGINAL_SOCKET(_sock=sock)
100
        else:
101
            return _ORIGINAL_SOCKET(sock.family, sock.type, sock.proto, os.dup(sock.fileno()))
102
103
104
def get_peercred(sock):
105
    """Gets the (pid, uid, gid) for the client on the given *connected* socket."""
106
    buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i'))
107
    return struct.unpack('3i', buf)
108
109
110
class AlreadyInstalled(Exception):
111
    pass
112
113
114
class NotInstalled(Exception):
115
    pass
116
117
118
class ConfigurationConflict(Exception):
119
    pass
120
121
122
class SuspiciousClient(Exception):
123
    pass
124
125
126
class ManholeThread(_ORIGINAL_THREAD):
127
    """
128
    Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread
129
    exits.
130
131
    On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a
132
    connection to the manole.
133
134
    Args:
135
        sigmask (list of singal numbers): Signals to block in this thread.
136
        start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running
137
            when calling ``start()``.
138
        bind_delay (float): Seconds to delay socket binding. Default: `no delay`.
139
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
140
    """
141
142
    def __init__(self,
143
                 get_socket, sigmask, start_timeout, connection_handler,
144
                 bind_delay=None, daemon_connection=False):
145
        super(ManholeThread, self).__init__()
146
        self.daemon = True
147
        self.daemon_connection = daemon_connection
148
        self.name = "Manhole"
149
        self.sigmask = sigmask
150
        self.serious = _ORIGINAL_EVENT()
151
        # time to wait for the manhole to get serious (to have a complete start)
152
        # see: http://emptysqua.re/blog/dawn-of-the-thread/
153
        self.start_timeout = start_timeout
154
        self.bind_delay = bind_delay
155
        self.connection_handler = connection_handler
156
        self.get_socket = get_socket
157
        self.should_run = False
158
159
    def stop(self):
160
        self.should_run = False
161
162
    def clone(self, **kwargs):
163
        """
164
        Make a fresh thread with the same options. This is usually used on dead threads.
165
        """
166
        return ManholeThread(
167
            self.get_socket, self.sigmask, self.start_timeout,
168
            connection_handler=self.connection_handler,
169
            daemon_connection=self.daemon_connection,
170
            **kwargs
171
        )
172
173
    def start(self):
174
        self.should_run = True
175
        super(ManholeThread, self).start()
176
        if not self.serious.wait(self.start_timeout) and not PY26:
177
            _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
178
179
    def run(self):
180
        """
181
        Runs the manhole loop. Only accepts one connection at a time because:
182
183
        * This thread is a daemon thread (exits when main thread exists).
184
        * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs.
185
        """
186
        self.serious.set()
187
        if signalfd and self.sigmask:
188
            signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask)
189
        pthread_setname_np(self.ident, self.name)
190
191
        if self.bind_delay:
192
            _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay)
193
            _ORIGINAL_SLEEP(self.bind_delay)
194
195
        sock = self.get_socket()
196
        while self.should_run:
197
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
198
            try:
199
                client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection)
200
                client.start()
201
                client.join()
202
            except (InterruptedError, socket.error) as e:
203
                if e.errno != errno.EINTR:
204
                    raise
205
                continue
206
            finally:
207
                client = None
208
209
210
class ManholeConnectionThread(_ORIGINAL_THREAD):
211
    """
212
    Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the
213
    main thread exits.
214
    """
215
216
    def __init__(self, client, connection_handler, daemon=False):
217
        super(ManholeConnectionThread, self).__init__()
218
        self.daemon = daemon
219
        self.client = force_original_socket(client)
220
        self.connection_handler = connection_handler
221
        self.name = "ManholeConnectionThread"
222
223
    def run(self):
224
        _LOG('Started ManholeConnectionThread thread. Checking credentials ...')
225
        pthread_setname_np(self.ident, "Manhole ----")
226
        pid, _, _ = self.check_credentials(self.client)
227
        pthread_setname_np(self.ident, "Manhole %s" % pid)
228
        self.connection_handler(self.client)
229
230
    @staticmethod
231
    def check_credentials(client):
232
        """
233
        Checks credentials for given socket.
234
        """
235
        pid, uid, gid = get_peercred(client)
236
237
        euid = os.geteuid()
238
        client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
239
        if uid not in (0, euid):
240
            raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % (
241
                client_name, euid
242
            ))
243
244
        _LOG("Accepted connection %s from %s" % (client, client_name))
245
        return pid, uid, gid
246
247
248
def handle_connection_exec(client):
249
    """
250
    Alternate connection handler. No output redirection.
251
    """
252
    client.settimeout(None)
253
    fh = os.fdopen(client.fileno())
254
    payload = fh.readline()
255
    while payload:
256
        exec(payload)
257
        payload = fh.readline()
258
259
260
def handle_connection_repl(client):
261
    """
262
    Handles connection.
263
    """
264
    client.settimeout(None)
265
266
    # # disable this till we have evidence that it's needed
267
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
268
    # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html
269
270
    backup = []
271
    old_interval = getinterval()
272
    patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))]
273
    if _MANHOLE.redirect_stderr:
274
        patches.append(('w', ('stderr', '__stderr__')))
275
    try:
276
        try:
277
            client_fd = client.fileno()
278
            for mode, names in patches:
279
                for name in names:
280
                    backup.append((name, getattr(sys, name)))
281
                    setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0))
282
            try:
283
                handle_repl(_MANHOLE.locals)
284
            except Exception as exc:
285
                _LOG("Failed with %r." % exc)
286
            _LOG("DONE.")
287
        finally:
288
            try:
289
                # Change the switch/check interval to something ridiculous. We don't want to have other thread try
290
                # to write to the redirected sys.__std*/sys.std* - it would fail horribly.
291
                setinterval(2147483647)
292
293
                client.close()  # close before it's too late. it may already be dead
294
                junk = []  # keep the old file objects alive for a bit
295
                for name, fh in backup:
296
                    junk.append(getattr(sys, name))
297
                    setattr(sys, name, fh)
298
                del backup
299
                for fh in junk:
300
                    try:
301
                        fh.close()
302
                    except IOError:
303
                        pass
304
                    del fh
305
                del junk
306
            finally:
307
                setinterval(old_interval)
308
                _LOG("Cleaned up.")
309
    except Exception:
310
        _LOG("ManholeConnectionThread thread failed:")
311
        _LOG(traceback.format_exc())
312
313
314
class ManholeConsole(code.InteractiveConsole):
315
    def __init__(self, *args, **kw):
316
        code.InteractiveConsole.__init__(self, *args, **kw)
317
        if _MANHOLE.redirect_stderr:
318
            self.file = sys.stderr
319
        else:
320
            self.file = sys.stdout
321
322
    def write(self, data):
323
        self.file.write(data)
324
325
326
def handle_repl(locals):
327
    """
328
    Dumps stacktraces and runs an interactive prompt (REPL).
329
    """
330
    dump_stacktraces()
331
    namespace = {
332
        'dump_stacktraces': dump_stacktraces,
333
        'sys': sys,
334
        'os': os,
335
        'socket': socket,
336
        'traceback': traceback,
337
    }
338
    if locals:
339
        namespace.update(locals)
340
    ManholeConsole(namespace).interact()
341
342
343
class Logger(object):
344
    """
345
    Internal object used for logging.
346
347
    Initially this is not configured. Until you call ``manhole.install()`` this logger object won't work (will raise
348
    ``NotInstalled``).
349
    """
350
    time = _get_original('time', 'time')
351
    enabled = True
352
    destination = None
353
354
    def configure(self, enabled, destination):
355
        self.enabled = enabled
356
        self.destination = destination
357
358
    def release(self):
359
        self.enabled = True
360
        self.destination = None
361
362
    def __call__(self, message):
363
        """
364
        Fail-ignorant logging function.
365
        """
366
        if self.enabled:
367
            if self.destination is None:
368
                raise NotInstalled("Manhole is not installed!")
369
            try:
370
                full_message = "Manhole[%.4f]: %s\n" % (self.time(), message)
371
372
                if isinstance(self.destination, int):
373
                    os.write(self.destination, full_message.encode('ascii', 'ignore'))
374
                else:
375
                    self.destination.write(full_message)
376
            except:  # pylint: disable=W0702
377
                pass
378
379
380
_LOG = Logger()
381
382
383
class Manhole(object):
384
    # Manhole core configuration
385
    # These are initialized when manhole is installed.
386
    daemon_connection = False
387
    locals = None
388
    original_os_fork = None
389
    original_os_forkpty = None
390
    redirect_stderr = True
391
    reinstall_delay = 0.5
392
    should_restart = None
393
    sigmask = _ALL_SIGNALS
394
    socket_path = None
395
    start_timeout = 0.5
396
    connection_handler = None
397
    previous_signal_handlers = None
398
    _thread = None
399
400
    def configure(self,
401
                  patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True,
402
                  start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False,
403
                  redirect_stderr=True, connection_handler=handle_connection_repl):
404
        self.socket_path = socket_path
405
        self.reinstall_delay = reinstall_delay
406
        self.redirect_stderr = redirect_stderr
407
        self.locals = locals
408
        self.sigmask = sigmask
409
        self.daemon_connection = daemon_connection
410
        self.start_timeout = start_timeout
411
        self.previous_signal_handlers = {}
412
        self.connection_handler = connection_handler
413
414
        if oneshot_on is None and activate_on is None and thread:
415
            self.thread.start()
416
            self.should_restart = True
417
418
        if oneshot_on is not None:
419
            oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on
420
            self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot))
421
422
        if activate_on is not None:
423
            activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on
424
            if activate_on == oneshot_on:
425
                raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal '
426
                                            'that you want to do oneshot activation !')
427
            self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal))
428
429
        atexit.register(self.remove_manhole_uds)
430
        if patch_fork:
431
            if activate_on is None and oneshot_on is None and socket_path is None:
432
                self.patch_os_fork_functions()
433
            else:
434
                if activate_on:
435
                    _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on)
436
                elif oneshot_on:
437
                    _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on)
438
                elif socket_path:
439
                    _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path)
440
441
    def release(self):
442
        if self._thread:
443
            self._thread.stop()
444
            self._thread = None
445
        self.remove_manhole_uds()
446
        self.restore_os_fork_functions()
447
        for sig, handler in self.previous_signal_handlers.items():
448
            signal.signal(sig, handler)
449
        self.previous_signal_handlers.clear()
450
451
    @property
452
    def thread(self):
453
        if self._thread is None:
454
            self._thread = ManholeThread(
455
                self.get_socket, self.sigmask, self.start_timeout, self.connection_handler,
456
                daemon_connection=self.daemon_connection
457
            )
458
        return self._thread
459
460
    @thread.setter
461
    def thread(self, value):
462
        self._thread = value
463
464
    def get_socket(self):
465
        sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM)
466
        name = self.remove_manhole_uds()
467
        sock.bind(name)
468
        sock.listen(5)
469
        _LOG("Manhole UDS path: " + name)
470
        return sock
471
472
    def reinstall(self):
473
        """
474
        Reinstalls the manhole. Checks if the thread is running. If not, it starts it again.
475
        """
476
        with _LOCK:
477
            if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE):
478
                self.thread = self.thread.clone(bind_delay=self.reinstall_delay)
479
                if self.should_restart:
480
                    self.thread.start()
481
482
    def handle_oneshot(self, _signum=None, _frame=None):
483
        try:
484
            sock = self.get_socket()
485
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
486
            client = force_original_socket(sock.accept()[0])
487
            ManholeConnectionThread.check_credentials(client)
488
            self.connection_handler(client)
489
        except:  # pylint: disable=W0702
490
            # we don't want to let any exception out, it might make the application misbehave
491
            _LOG("Manhole oneshot connection failed:")
492
            _LOG(traceback.format_exc())
493
        finally:
494
            self.remove_manhole_uds()
495
496
    def remove_manhole_uds(self):
497
        name = self.uds_name
498
        if os.path.exists(name):
499
            os.unlink(name)
500
        return name
501
502
    @property
503
    def uds_name(self):
504
        if self.socket_path is None:
505
            return "/tmp/manhole-%s" % os.getpid()
506
        return self.socket_path
507
508
    def patched_fork(self):
509
        """Fork a child process."""
510
        pid = self.original_os_fork()
511
        if not pid:
512
            _LOG('Fork detected. Reinstalling Manhole.')
513
            self.reinstall()
514
        return pid
515
516
    def patched_forkpty(self):
517
        """Fork a new process with a new pseudo-terminal as controlling tty."""
518
        pid, master_fd = self.original_os_forkpty()
519
        if not pid:
520
            _LOG('Fork detected. Reinstalling Manhole.')
521
            self.reinstall()
522
        return pid, master_fd
523
524
    def patch_os_fork_functions(self):
525
        self.original_os_fork, os.fork = os.fork, self.patched_fork
526
        self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty
527
        _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork))
528
529
    def restore_os_fork_functions(self):
530
        if self.original_os_fork:
531
            os.fork = self.original_os_fork
532
        if self.original_os_forkpty:
533
            os.forkpty = self.original_os_forkpty
534
535
    def activate_on_signal(self, _signum, _frame):
536
        self.thread.start()
537
538
539
def install(verbose=True,
540
            verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__,
541
            strict=True,
542
            **kwargs):
543
    """
544
    Installs the manhole.
545
546
    Args:
547
        verbose (bool): Set it to ``False`` to squelch the logging.
548
        verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr
549
            (stderr ``2`` file descriptor).
550
        patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched
551
        activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
552
            want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the
553
            thread active all the time.
554
        oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
555
            want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want
556
            threads at all.
557
        thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if
558
            ``oneshort_on`` or ``activate_on`` are used.
559
        sigmask (list of ints or signal names): Will set the signal mask to the given list (using
560
            ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable.
561
            **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine cause
562
            Python will force all the signal handling to be run in the main thread but signalfd doesn't.
563
        socket_path (str): Use a specifc path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This
564
            disables ``patch_fork`` as children cannot resuse the same path.
565
        reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This
566
            alleviates cleanup failures when using fork+exec patterns.
567
        locals (dict): Names to add to manhole interactive shell locals.
568
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
569
        redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``.
570
        connection_handler (function): Function that implements the connection handler (warning: this is for advanced 
571
            users). Default: ``manhole.handle_connection``.
572
    """
573
    # pylint: disable=W0603
574
    global _MANHOLE
575
576
    with _LOCK:
577
        if _MANHOLE is None:
578
            _MANHOLE = Manhole()
579
        else:
580
            if strict:
581
                raise AlreadyInstalled("Manhole already installed!")
582
            else:
583
                _LOG.release()
584
                _MANHOLE.release()  # Threads might be started here
585
586
    _LOG.configure(verbose, verbose_destination)
587
    _MANHOLE.configure(**kwargs)  # Threads might be started here
588
    return _MANHOLE
589
590
591
def dump_stacktraces():
592
    """
593
    Dumps thread ids and tracebacks to stdout.
594
    """
595
    lines = []
596
    for thread_id, stack in sys._current_frames().items():  # pylint: disable=W0212
597
        lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % (
598
            os.getpid(), thread_id
599
        ))
600
        for filename, lineno, name, line in traceback.extract_stack(stack):
601
            lines.append('File: "%s", line %d, in %s' % (filename, lineno, name))
602
            if line:
603
                lines.append("  %s" % (line.strip()))
604
    lines.append("#############################################\n\n")
605
606
    print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)
607