Completed
Push — master ( 81e6f9...f36c2e )
by Ionel Cristian
37s
created

handle_connection_exec()   B

Complexity

Conditions 5

Size

Total Lines 12

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 5
c 2
b 0
f 0
dl 0
loc 12
rs 8.5454
1
from __future__ import print_function
2
3
import atexit
4
import code
5
import errno
6
import os
7
import signal
8
import socket
9
import struct
10
import sys
11
import traceback
12
from contextlib import closing
13
14
__version__ = '1.4.0'
15
16
try:
17
    import signalfd
18
except ImportError:
19
    signalfd = None
20
try:
21
    string = basestring
22
except NameError:  # python 3
23
    string = str
24
try:
25
    InterruptedError = InterruptedError
26
except NameError:  # python <= 3.2
27
    InterruptedError = OSError
28
if hasattr(sys, 'setswitchinterval'):
29
    setinterval = sys.setswitchinterval
30
    getinterval = sys.getswitchinterval
31
else:
32
    setinterval = sys.setcheckinterval
33
    getinterval = sys.getcheckinterval
34
35
try:
36
    from eventlet.patcher import original as _original
37
38
    def _get_original(mod, name):
39
        return getattr(_original(mod), name)
40
except ImportError:
41
    try:
42
        from gevent.monkey import get_original as _get_original
43
    except ImportError:
44
        def _get_original(mod, name):
45
            return getattr(__import__(mod), name)
46
47
_ORIGINAL_SOCKET = _get_original('socket', 'socket')
48
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
49
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
50
_ORIGINAL_DUP = _get_original('os', 'dup')
51
_ORIGINAL_DUP2 = _get_original('os', 'dup2')
52
try:
53
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
54
except ImportError:  # python 3
55
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
56
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
57
_ORIGINAL_EVENT = _get_original('threading', 'Event')
58
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
59
_ORIGINAL_SLEEP = _get_original('time', 'sleep')
60
61
PY3 = sys.version_info[0] == 3
62
PY26 = sys.version_info[:2] == (2, 6)
63
64
try:
65
    import ctypes
66
    import ctypes.util
67
68
    libpthread_path = ctypes.util.find_library("pthread")
69
    if not libpthread_path:
70
        raise ImportError
71
    libpthread = ctypes.CDLL(libpthread_path)
72
    if not hasattr(libpthread, "pthread_setname_np"):
73
        raise ImportError
74
    _pthread_setname_np = libpthread.pthread_setname_np
75
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
76
    _pthread_setname_np.restype = ctypes.c_int
77
78
    def pthread_setname_np(ident, name):
79
        _pthread_setname_np(ident, name[:15].encode('utf8'))
80
except ImportError:
81
    def pthread_setname_np(ident, name):
82
        pass
83
84
if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
85
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
86
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
87
else:
88
    _PEERCRED_LEVEL = socket.SOL_SOCKET
89
    # TODO: Is this missing on some platforms?
90
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)
91
92
_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
93
                     if sig.startswith('SIG') and '_' not in sig)
94
95
# These (_LOG and _MANHOLE) will hold instances after install
96
_MANHOLE = None
97
_LOCK = _ORIGINAL_ALLOCATE_LOCK()
98
99
100
def force_original_socket(sock):
101
    with closing(sock):
102
        if hasattr(sock, 'detach'):
103
            return _ORIGINAL_SOCKET(sock.family, sock.type, sock.proto, sock.detach())
104
        else:
105
            assert hasattr(_ORIGINAL_SOCKET, '_sock')
106
            return _ORIGINAL_SOCKET(_sock=sock._sock)
107
108
109
def get_peercred(sock):
110
    """Gets the (pid, uid, gid) for the client on the given *connected* socket."""
111
    buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i'))
112
    return struct.unpack('3i', buf)
113
114
115
class AlreadyInstalled(Exception):
116
    pass
117
118
119
class NotInstalled(Exception):
120
    pass
121
122
123
class ConfigurationConflict(Exception):
124
    pass
125
126
127
class SuspiciousClient(Exception):
128
    pass
129
130
131
class ManholeThread(_ORIGINAL_THREAD):
132
    """
133
    Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread
134
    exits.
135
136
    On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a
137
    connection to the manole.
138
139
    Args:
140
        sigmask (list of singal numbers): Signals to block in this thread.
141
        start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running
142
            when calling ``start()``.
143
        bind_delay (float): Seconds to delay socket binding. Default: `no delay`.
144
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
145
    """
146
147
    def __init__(self,
148
                 get_socket, sigmask, start_timeout, connection_handler,
149
                 bind_delay=None, daemon_connection=False):
150
        super(ManholeThread, self).__init__()
151
        self.daemon = True
152
        self.daemon_connection = daemon_connection
153
        self.name = "Manhole"
154
        self.sigmask = sigmask
155
        self.serious = _ORIGINAL_EVENT()
156
        # time to wait for the manhole to get serious (to have a complete start)
157
        # see: http://emptysqua.re/blog/dawn-of-the-thread/
158
        self.start_timeout = start_timeout
159
        self.bind_delay = bind_delay
160
        self.connection_handler = connection_handler
161
        self.get_socket = get_socket
162
        self.should_run = False
163
164
    def stop(self):
165
        self.should_run = False
166
167
    def clone(self, **kwargs):
168
        """
169
        Make a fresh thread with the same options. This is usually used on dead threads.
170
        """
171
        return ManholeThread(
172
            self.get_socket, self.sigmask, self.start_timeout,
173
            connection_handler=self.connection_handler,
174
            daemon_connection=self.daemon_connection,
175
            **kwargs
176
        )
177
178
    def start(self):
179
        self.should_run = True
180
        super(ManholeThread, self).start()
181
        if not self.serious.wait(self.start_timeout) and not PY26:
182
            _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
183
184
    def run(self):
185
        """
186
        Runs the manhole loop. Only accepts one connection at a time because:
187
188
        * This thread is a daemon thread (exits when main thread exists).
189
        * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs.
190
        """
191
        self.serious.set()
192
        if signalfd and self.sigmask:
193
            signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask)
194
        pthread_setname_np(self.ident, self.name)
195
196
        if self.bind_delay:
197
            _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay)
198
            _ORIGINAL_SLEEP(self.bind_delay)
199
200
        sock = self.get_socket()
201
        while self.should_run:
202
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
203
            try:
204
                client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection)
205
                client.start()
206
                client.join()
207
            except (InterruptedError, socket.error) as e:
208
                if e.errno != errno.EINTR:
209
                    raise
210
                continue
211
            finally:
212
                client = None
213
214
215
class ManholeConnectionThread(_ORIGINAL_THREAD):
216
    """
217
    Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the
218
    main thread exits.
219
    """
220
221
    def __init__(self, client, connection_handler, daemon=False):
222
        super(ManholeConnectionThread, self).__init__()
223
        self.daemon = daemon
224
        self.client = force_original_socket(client)
225
        self.connection_handler = connection_handler
226
        self.name = "ManholeConnectionThread"
227
228
    def run(self):
229
        _LOG('Started ManholeConnectionThread thread. Checking credentials ...')
230
        pthread_setname_np(self.ident, "Manhole -------")
231
        pid, _, _ = check_credentials(self.client)
232
        pthread_setname_np(self.ident, "Manhole < PID:%s" % pid)
233
        try:
234
            self.connection_handler(self.client)
235
        except BaseException as exc:
236
            _LOG("ManholeConnectionThread failure: %r" % exc)
237
238
239
def check_credentials(client):
240
    """
241
    Checks credentials for given socket.
242
    """
243
    pid, uid, gid = get_peercred(client)
244
245
    euid = os.geteuid()
246
    client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
247
    if uid not in (0, euid):
248
        raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % (
249
            client_name, euid
250
        ))
251
252
    _LOG("Accepted connection %s from %s" % (client, client_name))
253
    return pid, uid, gid
254
255
256
def handle_connection_exec(client):
257
    """
258
    Alternate connection handler. No output redirection.
259
    """
260
    client.settimeout(None)
261
    fh = os.fdopen(client.detach() if hasattr(client, 'detach') else client.fileno())
262
    with closing(client):
263
        with closing(fh):
264
            payload = fh.readline()
265
            while payload:
266
                exec(payload)
267
                payload = fh.readline()
268
269
270
def handle_connection_repl(client):
271
    """
272
    Handles connection.
273
    """
274
    client.settimeout(None)
275
    # # disable this till we have evidence that it's needed
276
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
277
    # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html
278
279
    backup = []
280
    old_interval = getinterval()
281
    patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))]
282
    if _MANHOLE.redirect_stderr:
283
        patches.append(('w', ('stderr', '__stderr__')))
284
    try:
285
        client_fd = client.fileno()
286
        for mode, names in patches:
287
            for name in names:
288
                backup.append((name, getattr(sys, name)))
289
                setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0))
290
        try:
291
            handle_repl(_MANHOLE.locals)
292
        except Exception as exc:
293
            _LOG("REPL failed with %r." % exc)
294
        _LOG("DONE.")
295
    finally:
296
        try:
297
            # Change the switch/check interval to something ridiculous. We don't want to have other thread try
298
            # to write to the redirected sys.__std*/sys.std* - it would fail horribly.
299
            setinterval(2147483647)
300
            try:
301
                client.close()  # close before it's too late. it may already be dead
302
            except IOError:
303
                pass
304
            junk = []  # keep the old file objects alive for a bit
305
            for name, fh in backup:
306
                junk.append(getattr(sys, name))
307
                setattr(sys, name, fh)
308
            del backup
309
            for fh in junk:
310
                try:
311
                    if hasattr(fh, 'detach'):
312
                        fh.detach()
313
                    else:
314
                        fh.close()
315
                except IOError:
316
                    pass
317
                del fh
318
            del junk
319
        finally:
320
            setinterval(old_interval)
321
            _LOG("Cleaned up.")
322
323
_CONNECTION_HANDLER_ALIASES = {
324
    'repl': handle_connection_repl,
325
    'exec': handle_connection_exec
326
}
327
328
329
class ManholeConsole(code.InteractiveConsole):
330
    def __init__(self, *args, **kw):
331
        code.InteractiveConsole.__init__(self, *args, **kw)
332
        if _MANHOLE.redirect_stderr:
333
            self.file = sys.stderr
334
        else:
335
            self.file = sys.stdout
336
337
    def write(self, data):
338
        self.file.write(data)
339
340
341
def handle_repl(locals):
342
    """
343
    Dumps stacktraces and runs an interactive prompt (REPL).
344
    """
345
    dump_stacktraces()
346
    namespace = {
347
        'dump_stacktraces': dump_stacktraces,
348
        'sys': sys,
349
        'os': os,
350
        'socket': socket,
351
        'traceback': traceback,
352
    }
353
    if locals:
354
        namespace.update(locals)
355
    ManholeConsole(namespace).interact()
356
357
358
class Logger(object):
359
    """
360
    Internal object used for logging.
361
362
    Initially this is not configured. Until you call ``manhole.install()`` this logger object won't work (will raise
363
    ``NotInstalled``).
364
    """
365
    time = _get_original('time', 'time')
366
    enabled = True
367
    destination = None
368
369
    def configure(self, enabled, destination):
370
        self.enabled = enabled
371
        self.destination = destination
372
373
    def release(self):
374
        self.enabled = True
375
        self.destination = None
376
377
    def __call__(self, message):
378
        """
379
        Fail-ignorant logging function.
380
        """
381
        if self.enabled:
382
            if self.destination is None:
383
                raise NotInstalled("Manhole is not installed!")
384
            try:
385
                full_message = "Manhole[%s:%.4f]: %s\n" % (os.getpid(), self.time(), message)
386
387
                if isinstance(self.destination, int):
388
                    os.write(self.destination, full_message.encode('ascii', 'ignore'))
389
                else:
390
                    self.destination.write(full_message)
391
            except:  # pylint: disable=W0702
392
                pass
393
394
395
_LOG = Logger()
396
397
398
class Manhole(object):
399
    # Manhole core configuration
400
    # These are initialized when manhole is installed.
401
    daemon_connection = False
402
    locals = None
403
    original_os_fork = None
404
    original_os_forkpty = None
405
    redirect_stderr = True
406
    reinstall_delay = 0.5
407
    should_restart = None
408
    sigmask = _ALL_SIGNALS
409
    socket_path = None
410
    start_timeout = 0.5
411
    connection_handler = None
412
    previous_signal_handlers = None
413
    _thread = None
414
415
    def configure(self,
416
                  patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True,
417
                  start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False,
418
                  redirect_stderr=True, connection_handler=handle_connection_repl):
419
        self.socket_path = socket_path
420
        self.reinstall_delay = reinstall_delay
421
        self.redirect_stderr = redirect_stderr
422
        self.locals = locals
423
        self.sigmask = sigmask
424
        self.daemon_connection = daemon_connection
425
        self.start_timeout = start_timeout
426
        self.previous_signal_handlers = {}
427
        self.connection_handler = _CONNECTION_HANDLER_ALIASES.get(connection_handler, connection_handler)
428
429
        if oneshot_on is None and activate_on is None and thread:
430
            self.thread.start()
431
            self.should_restart = True
432
433
        if oneshot_on is not None:
434
            oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on
435
            self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot))
436
437
        if activate_on is not None:
438
            activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on
439
            if activate_on == oneshot_on:
440
                raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal '
441
                                            'that you want to do oneshot activation !')
442
            self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal))
443
444
        atexit.register(self.remove_manhole_uds)
445
        if patch_fork:
446
            if activate_on is None and oneshot_on is None and socket_path is None:
447
                self.patch_os_fork_functions()
448
            else:
449
                if activate_on:
450
                    _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on)
451
                elif oneshot_on:
452
                    _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on)
453
                elif socket_path:
454
                    _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path)
455
456
    def release(self):
457
        if self._thread:
458
            self._thread.stop()
459
            self._thread = None
460
        self.remove_manhole_uds()
461
        self.restore_os_fork_functions()
462
        for sig, handler in self.previous_signal_handlers.items():
463
            signal.signal(sig, handler)
464
        self.previous_signal_handlers.clear()
465
466
    @property
467
    def thread(self):
468
        if self._thread is None:
469
            self._thread = ManholeThread(
470
                self.get_socket, self.sigmask, self.start_timeout, self.connection_handler,
471
                daemon_connection=self.daemon_connection
472
            )
473
        return self._thread
474
475
    @thread.setter
476
    def thread(self, value):
477
        self._thread = value
478
479
    def get_socket(self):
480
        sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM)
481
        name = self.remove_manhole_uds()
482
        sock.bind(name)
483
        sock.listen(5)
484
        _LOG("Manhole UDS path: " + name)
485
        return sock
486
487
    def reinstall(self):
488
        """
489
        Reinstalls the manhole. Checks if the thread is running. If not, it starts it again.
490
        """
491
        with _LOCK:
492
            if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE):
493
                self.thread = self.thread.clone(bind_delay=self.reinstall_delay)
494
                if self.should_restart:
495
                    self.thread.start()
496
497
    def handle_oneshot(self, _signum=None, _frame=None):
498
        try:
499
            try:
500
                sock = self.get_socket()
501
                _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
502
                client = force_original_socket(sock.accept()[0])
503
                check_credentials(client)
504
                self.connection_handler(client)
505
            finally:
506
                self.remove_manhole_uds()
507
        except BaseException as exc:  # pylint: disable=W0702
508
            # we don't want to let any exception out, it might make the application misbehave
509
            _LOG("Oneshot failure: %r" % exc)
510
511
    def remove_manhole_uds(self):
512
        name = self.uds_name
513
        if os.path.exists(name):
514
            os.unlink(name)
515
        return name
516
517
    @property
518
    def uds_name(self):
519
        if self.socket_path is None:
520
            return "/tmp/manhole-%s" % os.getpid()
521
        return self.socket_path
522
523
    def patched_fork(self):
524
        """Fork a child process."""
525
        pid = self.original_os_fork()
526
        if not pid:
527
            _LOG('Fork detected. Reinstalling Manhole.')
528
            self.reinstall()
529
        return pid
530
531
    def patched_forkpty(self):
532
        """Fork a new process with a new pseudo-terminal as controlling tty."""
533
        pid, master_fd = self.original_os_forkpty()
534
        if not pid:
535
            _LOG('Fork detected. Reinstalling Manhole.')
536
            self.reinstall()
537
        return pid, master_fd
538
539
    def patch_os_fork_functions(self):
540
        self.original_os_fork, os.fork = os.fork, self.patched_fork
541
        self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty
542
        _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork))
543
544
    def restore_os_fork_functions(self):
545
        if self.original_os_fork:
546
            os.fork = self.original_os_fork
547
        if self.original_os_forkpty:
548
            os.forkpty = self.original_os_forkpty
549
550
    def activate_on_signal(self, _signum, _frame):
551
        self.thread.start()
552
553
554
def install(verbose=True,
555
            verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__,
556
            strict=True,
557
            **kwargs):
558
    """
559
    Installs the manhole.
560
561
    Args:
562
        verbose (bool): Set it to ``False`` to squelch the logging.
563
        verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr
564
            (stderr ``2`` file descriptor).
565
        patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched
566
        activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
567
            want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the
568
            thread active all the time.
569
        oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
570
            want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want
571
            threads at all.
572
        thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if
573
            ``oneshort_on`` or ``activate_on`` are used.
574
        sigmask (list of ints or signal names): Will set the signal mask to the given list (using
575
            ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable.
576
            **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine cause
577
            Python will force all the signal handling to be run in the main thread but signalfd doesn't.
578
        socket_path (str): Use a specifc path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This
579
            disables ``patch_fork`` as children cannot resuse the same path.
580
        reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This
581
            alleviates cleanup failures when using fork+exec patterns.
582
        locals (dict): Names to add to manhole interactive shell locals.
583
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
584
        redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``.
585
        connection_handler (function): Connection handler to use. Use ``"exec"`` for simple implementation without output 
586
            redirection or your own function. (warning: this is for advanced users). Default: ``"repl"``.
587
    """
588
    # pylint: disable=W0603
589
    global _MANHOLE
590
591
    with _LOCK:
592
        if _MANHOLE is None:
593
            _MANHOLE = Manhole()
594
        else:
595
            if strict:
596
                raise AlreadyInstalled("Manhole already installed!")
597
            else:
598
                _LOG.release()
599
                _MANHOLE.release()  # Threads might be started here
600
601
    _LOG.configure(verbose, verbose_destination)
602
    _MANHOLE.configure(**kwargs)  # Threads might be started here
603
    return _MANHOLE
604
605
606
def dump_stacktraces():
607
    """
608
    Dumps thread ids and tracebacks to stdout.
609
    """
610
    lines = []
611
    for thread_id, stack in sys._current_frames().items():  # pylint: disable=W0212
612
        lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % (
613
            os.getpid(), thread_id
614
        ))
615
        for filename, lineno, name, line in traceback.extract_stack(stack):
616
            lines.append('File: "%s", line %d, in %s' % (filename, lineno, name))
617
            if line:
618
                lines.append("  %s" % (line.strip()))
619
    lines.append("#############################################\n\n")
620
621
    print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)
622