Completed
Push — master ( 75ac14...81e6f9 )
by Ionel Cristian
01:23 queued 01:07
created

check_credentials()   A

Complexity

Conditions 2

Size

Total Lines 15

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 2
c 1
b 0
f 0
dl 0
loc 15
rs 9.4285
1
from __future__ import print_function
2
3
import atexit
4
import code
5
import errno
6
import os
7
import signal
8
import socket
9
import struct
10
import sys
11
import traceback
12
from contextlib import closing
13
14
__version__ = '1.4.0'
15
16
try:
17
    import signalfd
18
except ImportError:
19
    signalfd = None
20
try:
21
    string = basestring
22
except NameError:  # python 3
23
    string = str
24
try:
25
    InterruptedError = InterruptedError
26
except NameError:  # python <= 3.2
27
    InterruptedError = OSError
28
if hasattr(sys, 'setswitchinterval'):
29
    setinterval = sys.setswitchinterval
30
    getinterval = sys.getswitchinterval
31
else:
32
    setinterval = sys.setcheckinterval
33
    getinterval = sys.getcheckinterval
34
35
try:
36
    from eventlet.patcher import original as _original
37
38
    def _get_original(mod, name):
39
        return getattr(_original(mod), name)
40
except ImportError:
41
    try:
42
        from gevent.monkey import get_original as _get_original
43
    except ImportError:
44
        def _get_original(mod, name):
45
            return getattr(__import__(mod), name)
46
47
_ORIGINAL_SOCKET = _get_original('socket', 'socket')
48
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
49
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
50
_ORIGINAL_DUP = _get_original('os', 'dup')
51
_ORIGINAL_DUP2 = _get_original('os', 'dup2')
52
try:
53
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
54
except ImportError:  # python 3
55
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
56
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
57
_ORIGINAL_EVENT = _get_original('threading', 'Event')
58
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
59
_ORIGINAL_SLEEP = _get_original('time', 'sleep')
60
61
PY3 = sys.version_info[0] == 3
62
PY26 = sys.version_info[:2] == (2, 6)
63
64
try:
65
    import ctypes
66
    import ctypes.util
67
68
    libpthread_path = ctypes.util.find_library("pthread")
69
    if not libpthread_path:
70
        raise ImportError
71
    libpthread = ctypes.CDLL(libpthread_path)
72
    if not hasattr(libpthread, "pthread_setname_np"):
73
        raise ImportError
74
    _pthread_setname_np = libpthread.pthread_setname_np
75
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
76
    _pthread_setname_np.restype = ctypes.c_int
77
78
    def pthread_setname_np(ident, name):
79
        _pthread_setname_np(ident, name[:15].encode('utf8'))
80
except ImportError:
81
    def pthread_setname_np(ident, name):
82
        pass
83
84
if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
85
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
86
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
87
else:
88
    _PEERCRED_LEVEL = socket.SOL_SOCKET
89
    # TODO: Is this missing on some platforms?
90
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)
91
92
_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
93
                     if sig.startswith('SIG') and '_' not in sig)
94
95
# These (_LOG and _MANHOLE) will hold instances after install
96
_MANHOLE = None
97
_LOCK = _ORIGINAL_ALLOCATE_LOCK()
98
99
100
def force_original_socket(sock):
101
    with closing(sock):
102
        if hasattr(sock, 'detach'):
103
            return _ORIGINAL_SOCKET(sock.family, sock.type, sock.proto, sock.detach())
104
        else:
105
            assert hasattr(_ORIGINAL_SOCKET, '_sock')
106
            return _ORIGINAL_SOCKET(_sock=sock._sock)
107
108
109
def get_peercred(sock):
110
    """Gets the (pid, uid, gid) for the client on the given *connected* socket."""
111
    buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i'))
112
    return struct.unpack('3i', buf)
113
114
115
class AlreadyInstalled(Exception):
116
    pass
117
118
119
class NotInstalled(Exception):
120
    pass
121
122
123
class ConfigurationConflict(Exception):
124
    pass
125
126
127
class SuspiciousClient(Exception):
128
    pass
129
130
131
class ManholeThread(_ORIGINAL_THREAD):
132
    """
133
    Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread
134
    exits.
135
136
    On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a
137
    connection to the manole.
138
139
    Args:
140
        sigmask (list of singal numbers): Signals to block in this thread.
141
        start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running
142
            when calling ``start()``.
143
        bind_delay (float): Seconds to delay socket binding. Default: `no delay`.
144
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
145
    """
146
147
    def __init__(self,
148
                 get_socket, sigmask, start_timeout, connection_handler,
149
                 bind_delay=None, daemon_connection=False):
150
        super(ManholeThread, self).__init__()
151
        self.daemon = True
152
        self.daemon_connection = daemon_connection
153
        self.name = "Manhole"
154
        self.sigmask = sigmask
155
        self.serious = _ORIGINAL_EVENT()
156
        # time to wait for the manhole to get serious (to have a complete start)
157
        # see: http://emptysqua.re/blog/dawn-of-the-thread/
158
        self.start_timeout = start_timeout
159
        self.bind_delay = bind_delay
160
        self.connection_handler = connection_handler
161
        self.get_socket = get_socket
162
        self.should_run = False
163
164
    def stop(self):
165
        self.should_run = False
166
167
    def clone(self, **kwargs):
168
        """
169
        Make a fresh thread with the same options. This is usually used on dead threads.
170
        """
171
        return ManholeThread(
172
            self.get_socket, self.sigmask, self.start_timeout,
173
            connection_handler=self.connection_handler,
174
            daemon_connection=self.daemon_connection,
175
            **kwargs
176
        )
177
178
    def start(self):
179
        self.should_run = True
180
        super(ManholeThread, self).start()
181
        if not self.serious.wait(self.start_timeout) and not PY26:
182
            _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
183
184
    def run(self):
185
        """
186
        Runs the manhole loop. Only accepts one connection at a time because:
187
188
        * This thread is a daemon thread (exits when main thread exists).
189
        * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs.
190
        """
191
        self.serious.set()
192
        if signalfd and self.sigmask:
193
            signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask)
194
        pthread_setname_np(self.ident, self.name)
195
196
        if self.bind_delay:
197
            _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay)
198
            _ORIGINAL_SLEEP(self.bind_delay)
199
200
        sock = self.get_socket()
201
        while self.should_run:
202
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
203
            try:
204
                client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection)
205
                client.start()
206
                client.join()
207
            except (InterruptedError, socket.error) as e:
208
                if e.errno != errno.EINTR:
209
                    raise
210
                continue
211
            finally:
212
                client = None
213
214
215
class ManholeConnectionThread(_ORIGINAL_THREAD):
216
    """
217
    Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the
218
    main thread exits.
219
    """
220
221
    def __init__(self, client, connection_handler, daemon=False):
222
        super(ManholeConnectionThread, self).__init__()
223
        self.daemon = daemon
224
        self.client = force_original_socket(client)
225
        self.connection_handler = connection_handler
226
        self.name = "ManholeConnectionThread"
227
228
    def run(self):
229
        _LOG('Started ManholeConnectionThread thread. Checking credentials ...')
230
        pthread_setname_np(self.ident, "Manhole -------")
231
        pid, _, _ = check_credentials(self.client)
232
        pthread_setname_np(self.ident, "Manhole < PID:%s" % pid)
233
        try:
234
            self.connection_handler(self.client)
235
        except BaseException as exc:
236
            _LOG("ManholeConnectionThread failure: %r" % exc)
237
238
239
def check_credentials(client):
240
    """
241
    Checks credentials for given socket.
242
    """
243
    pid, uid, gid = get_peercred(client)
244
245
    euid = os.geteuid()
246
    client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
247
    if uid not in (0, euid):
248
        raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % (
249
            client_name, euid
250
        ))
251
252
    _LOG("Accepted connection %s from %s" % (client, client_name))
253
    return pid, uid, gid
254
255
256
def handle_connection_exec(client):
257
    """
258
    Alternate connection handler. No output redirection.
259
    """
260
    client.settimeout(None)
261
    fh = os.fdopen(client.fileno())
262
    payload = fh.readline()
263
    while payload:
264
        exec(payload)
265
        payload = fh.readline()
266
267
268
def handle_connection_repl(client):
269
    """
270
    Handles connection.
271
    """
272
    client.settimeout(None)
273
    # # disable this till we have evidence that it's needed
274
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
275
    # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html
276
277
    backup = []
278
    old_interval = getinterval()
279
    patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))]
280
    if _MANHOLE.redirect_stderr:
281
        patches.append(('w', ('stderr', '__stderr__')))
282
    try:
283
        client_fd = client.fileno()
284
        for mode, names in patches:
285
            for name in names:
286
                backup.append((name, getattr(sys, name)))
287
                setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0))
288
        try:
289
            handle_repl(_MANHOLE.locals)
290
        except Exception as exc:
291
            _LOG("REPL failed with %r." % exc)
292
        _LOG("DONE.")
293
    finally:
294
        try:
295
            # Change the switch/check interval to something ridiculous. We don't want to have other thread try
296
            # to write to the redirected sys.__std*/sys.std* - it would fail horribly.
297
            setinterval(2147483647)
298
            try:
299
                client.close()  # close before it's too late. it may already be dead
300
            except IOError:
301
                pass
302
            junk = []  # keep the old file objects alive for a bit
303
            for name, fh in backup:
304
                junk.append(getattr(sys, name))
305
                setattr(sys, name, fh)
306
            del backup
307
            for fh in junk:
308
                try:
309
                    if hasattr(fh, 'detach'):
310
                        fh.detach()
311
                    else:
312
                        fh.close()
313
                except IOError:
314
                    pass
315
                del fh
316
            del junk
317
        finally:
318
            setinterval(old_interval)
319
            _LOG("Cleaned up.")
320
321
322
class ManholeConsole(code.InteractiveConsole):
323
    def __init__(self, *args, **kw):
324
        code.InteractiveConsole.__init__(self, *args, **kw)
325
        if _MANHOLE.redirect_stderr:
326
            self.file = sys.stderr
327
        else:
328
            self.file = sys.stdout
329
330
    def write(self, data):
331
        self.file.write(data)
332
333
334
def handle_repl(locals):
335
    """
336
    Dumps stacktraces and runs an interactive prompt (REPL).
337
    """
338
    dump_stacktraces()
339
    namespace = {
340
        'dump_stacktraces': dump_stacktraces,
341
        'sys': sys,
342
        'os': os,
343
        'socket': socket,
344
        'traceback': traceback,
345
    }
346
    if locals:
347
        namespace.update(locals)
348
    ManholeConsole(namespace).interact()
349
350
351
class Logger(object):
352
    """
353
    Internal object used for logging.
354
355
    Initially this is not configured. Until you call ``manhole.install()`` this logger object won't work (will raise
356
    ``NotInstalled``).
357
    """
358
    time = _get_original('time', 'time')
359
    enabled = True
360
    destination = None
361
362
    def configure(self, enabled, destination):
363
        self.enabled = enabled
364
        self.destination = destination
365
366
    def release(self):
367
        self.enabled = True
368
        self.destination = None
369
370
    def __call__(self, message):
371
        """
372
        Fail-ignorant logging function.
373
        """
374
        if self.enabled:
375
            if self.destination is None:
376
                raise NotInstalled("Manhole is not installed!")
377
            try:
378
                full_message = "Manhole[%s:%.4f]: %s\n" % (os.getpid(), self.time(), message)
379
380
                if isinstance(self.destination, int):
381
                    os.write(self.destination, full_message.encode('ascii', 'ignore'))
382
                else:
383
                    self.destination.write(full_message)
384
            except:  # pylint: disable=W0702
385
                pass
386
387
388
_LOG = Logger()
389
390
391
class Manhole(object):
392
    # Manhole core configuration
393
    # These are initialized when manhole is installed.
394
    daemon_connection = False
395
    locals = None
396
    original_os_fork = None
397
    original_os_forkpty = None
398
    redirect_stderr = True
399
    reinstall_delay = 0.5
400
    should_restart = None
401
    sigmask = _ALL_SIGNALS
402
    socket_path = None
403
    start_timeout = 0.5
404
    connection_handler = None
405
    previous_signal_handlers = None
406
    _thread = None
407
408
    def configure(self,
409
                  patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True,
410
                  start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False,
411
                  redirect_stderr=True, connection_handler=handle_connection_repl):
412
        self.socket_path = socket_path
413
        self.reinstall_delay = reinstall_delay
414
        self.redirect_stderr = redirect_stderr
415
        self.locals = locals
416
        self.sigmask = sigmask
417
        self.daemon_connection = daemon_connection
418
        self.start_timeout = start_timeout
419
        self.previous_signal_handlers = {}
420
        self.connection_handler = connection_handler
421
422
        if oneshot_on is None and activate_on is None and thread:
423
            self.thread.start()
424
            self.should_restart = True
425
426
        if oneshot_on is not None:
427
            oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on
428
            self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot))
429
430
        if activate_on is not None:
431
            activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on
432
            if activate_on == oneshot_on:
433
                raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal '
434
                                            'that you want to do oneshot activation !')
435
            self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal))
436
437
        atexit.register(self.remove_manhole_uds)
438
        if patch_fork:
439
            if activate_on is None and oneshot_on is None and socket_path is None:
440
                self.patch_os_fork_functions()
441
            else:
442
                if activate_on:
443
                    _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on)
444
                elif oneshot_on:
445
                    _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on)
446
                elif socket_path:
447
                    _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path)
448
449
    def release(self):
450
        if self._thread:
451
            self._thread.stop()
452
            self._thread = None
453
        self.remove_manhole_uds()
454
        self.restore_os_fork_functions()
455
        for sig, handler in self.previous_signal_handlers.items():
456
            signal.signal(sig, handler)
457
        self.previous_signal_handlers.clear()
458
459
    @property
460
    def thread(self):
461
        if self._thread is None:
462
            self._thread = ManholeThread(
463
                self.get_socket, self.sigmask, self.start_timeout, self.connection_handler,
464
                daemon_connection=self.daemon_connection
465
            )
466
        return self._thread
467
468
    @thread.setter
469
    def thread(self, value):
470
        self._thread = value
471
472
    def get_socket(self):
473
        sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM)
474
        name = self.remove_manhole_uds()
475
        sock.bind(name)
476
        sock.listen(5)
477
        _LOG("Manhole UDS path: " + name)
478
        return sock
479
480
    def reinstall(self):
481
        """
482
        Reinstalls the manhole. Checks if the thread is running. If not, it starts it again.
483
        """
484
        with _LOCK:
485
            if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE):
486
                self.thread = self.thread.clone(bind_delay=self.reinstall_delay)
487
                if self.should_restart:
488
                    self.thread.start()
489
490
    def handle_oneshot(self, _signum=None, _frame=None):
491
        try:
492
            try:
493
                sock = self.get_socket()
494
                _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
495
                client = force_original_socket(sock.accept()[0])
496
                check_credentials(client)
497
                self.connection_handler(client)
498
            finally:
499
                self.remove_manhole_uds()
500
        except BaseException as exc:  # pylint: disable=W0702
501
            # we don't want to let any exception out, it might make the application misbehave
502
            _LOG("Oneshot failure: %r" % exc)
503
504
    def remove_manhole_uds(self):
505
        name = self.uds_name
506
        if os.path.exists(name):
507
            os.unlink(name)
508
        return name
509
510
    @property
511
    def uds_name(self):
512
        if self.socket_path is None:
513
            return "/tmp/manhole-%s" % os.getpid()
514
        return self.socket_path
515
516
    def patched_fork(self):
517
        """Fork a child process."""
518
        pid = self.original_os_fork()
519
        if not pid:
520
            _LOG('Fork detected. Reinstalling Manhole.')
521
            self.reinstall()
522
        return pid
523
524
    def patched_forkpty(self):
525
        """Fork a new process with a new pseudo-terminal as controlling tty."""
526
        pid, master_fd = self.original_os_forkpty()
527
        if not pid:
528
            _LOG('Fork detected. Reinstalling Manhole.')
529
            self.reinstall()
530
        return pid, master_fd
531
532
    def patch_os_fork_functions(self):
533
        self.original_os_fork, os.fork = os.fork, self.patched_fork
534
        self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty
535
        _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork))
536
537
    def restore_os_fork_functions(self):
538
        if self.original_os_fork:
539
            os.fork = self.original_os_fork
540
        if self.original_os_forkpty:
541
            os.forkpty = self.original_os_forkpty
542
543
    def activate_on_signal(self, _signum, _frame):
544
        self.thread.start()
545
546
547
def install(verbose=True,
548
            verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__,
549
            strict=True,
550
            **kwargs):
551
    """
552
    Installs the manhole.
553
554
    Args:
555
        verbose (bool): Set it to ``False`` to squelch the logging.
556
        verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr
557
            (stderr ``2`` file descriptor).
558
        patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched
559
        activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
560
            want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the
561
            thread active all the time.
562
        oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
563
            want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want
564
            threads at all.
565
        thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if
566
            ``oneshort_on`` or ``activate_on`` are used.
567
        sigmask (list of ints or signal names): Will set the signal mask to the given list (using
568
            ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable.
569
            **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine cause
570
            Python will force all the signal handling to be run in the main thread but signalfd doesn't.
571
        socket_path (str): Use a specifc path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This
572
            disables ``patch_fork`` as children cannot resuse the same path.
573
        reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This
574
            alleviates cleanup failures when using fork+exec patterns.
575
        locals (dict): Names to add to manhole interactive shell locals.
576
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
577
        redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``.
578
        connection_handler (function): Function that implements the connection handler (warning: this is for advanced
579
            users). Default: ``manhole.handle_connection``.
580
    """
581
    # pylint: disable=W0603
582
    global _MANHOLE
583
584
    with _LOCK:
585
        if _MANHOLE is None:
586
            _MANHOLE = Manhole()
587
        else:
588
            if strict:
589
                raise AlreadyInstalled("Manhole already installed!")
590
            else:
591
                _LOG.release()
592
                _MANHOLE.release()  # Threads might be started here
593
594
    _LOG.configure(verbose, verbose_destination)
595
    _MANHOLE.configure(**kwargs)  # Threads might be started here
596
    return _MANHOLE
597
598
599
def dump_stacktraces():
600
    """
601
    Dumps thread ids and tracebacks to stdout.
602
    """
603
    lines = []
604
    for thread_id, stack in sys._current_frames().items():  # pylint: disable=W0212
605
        lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % (
606
            os.getpid(), thread_id
607
        ))
608
        for filename, lineno, name, line in traceback.extract_stack(stack):
609
            lines.append('File: "%s", line %d, in %s' % (filename, lineno, name))
610
            if line:
611
                lines.append("  %s" % (line.strip()))
612
    lines.append("#############################################\n\n")
613
614
    print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)
615