Completed
Push — master ( 350973...b0c670 )
by Ionel Cristian
49s
created

pthread_setname_np()   A

Complexity

Conditions 1

Size

Total Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
from __future__ import print_function
2
3
import atexit
4
import code
5
import errno
6
import os
7
import signal
8
import socket
9
import struct
10
import sys
11
import traceback
12
from contextlib import closing
13
14
__version__ = '1.3.0'
15
16
try:
17
    import signalfd
18
except ImportError:
19
    signalfd = None
20
try:
21
    string = basestring
22
except NameError:  # python 3
23
    string = str
24
try:
25
    InterruptedError = InterruptedError
26
except NameError:  # python <= 3.2
27
    InterruptedError = OSError
28
if hasattr(sys, 'setswitchinterval'):
29
    setinterval = sys.setswitchinterval
30
    getinterval = sys.getswitchinterval
31
else:
32
    setinterval = sys.setcheckinterval
33
    getinterval = sys.getcheckinterval
34
35
try:
36
    from eventlet.patcher import original as _original
37
38
    def _get_original(mod, name):
39
        return getattr(_original(mod), name)
40
except ImportError:
41
    try:
42
        from gevent.monkey import get_original as _get_original
43
    except ImportError:
44
        def _get_original(mod, name):
45
            return getattr(__import__(mod), name)
46
47
_ORIGINAL_SOCKET = _get_original('socket', 'socket')
48
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
49
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
50
try:
51
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
52
except ImportError:  # python 3
53
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
54
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
55
_ORIGINAL_EVENT = _get_original('threading', 'Event')
56
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
57
_ORIGINAL_SLEEP = _get_original('time', 'sleep')
58
59
PY3 = sys.version_info[0] == 3
60
PY26 = sys.version_info[:2] == (2, 6)
61
62
try:
63
    import ctypes
64
    import ctypes.util
65
66
    libpthread_path = ctypes.util.find_library("pthread")
67
    if not libpthread_path:
68
        raise ImportError
69
    libpthread = ctypes.CDLL(libpthread_path)
70
    if not hasattr(libpthread, "pthread_setname_np"):
71
        raise ImportError
72
    _pthread_setname_np = libpthread.pthread_setname_np
73
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
74
    _pthread_setname_np.restype = ctypes.c_int
75
76
    def pthread_setname_np(ident, name):
77
        _pthread_setname_np(ident, name[:15].encode('utf8'))
78
except ImportError:
79
    def pthread_setname_np(ident, name):
80
        pass
81
82
if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
83
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
84
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
85
else:
86
    _PEERCRED_LEVEL = socket.SOL_SOCKET
87
    # TODO: Is this missing on some platforms?
88
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)
89
90
_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
91
                     if sig.startswith('SIG') and '_' not in sig)
92
93
# These (_LOG and _MANHOLE) will hold instances after install
94
_MANHOLE = None
95
_LOCK = _ORIGINAL_ALLOCATE_LOCK()
96
97
98
def force_original_socket(sock):
99
    with closing(sock):
100
        if hasattr(_ORIGINAL_SOCKET, '_sock'):
101
            sock._sock, sock = None, sock._sock
102
            return _ORIGINAL_SOCKET(_sock=sock)
103
        else:
104
            return _ORIGINAL_SOCKET(sock.family, sock.type, sock.proto, os.dup(sock.fileno()))
105
106
107
def get_peercred(sock):
108
    """Gets the (pid, uid, gid) for the client on the given *connected* socket."""
109
    buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i'))
110
    return struct.unpack('3i', buf)
111
112
113
class AlreadyInstalled(Exception):
114
    pass
115
116
117
class NotInstalled(Exception):
118
    pass
119
120
121
class ConfigurationConflict(Exception):
122
    pass
123
124
125
class SuspiciousClient(Exception):
126
    pass
127
128
129
class ManholeThread(_ORIGINAL_THREAD):
130
    """
131
    Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread
132
    exits.
133
134
    On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a
135
    connection to the manole.
136
137
    Args:
138
        sigmask (list of singal numbers): Signals to block in this thread.
139
        start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running
140
            when calling ``start()``.
141
        bind_delay (float): Seconds to delay socket binding. Default: `no delay`.
142
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
143
    """
144
145
    def __init__(self,
146
                 get_socket, sigmask, start_timeout, connection_handler,
147
                 bind_delay=None, daemon_connection=False):
148
        super(ManholeThread, self).__init__()
149
        self.daemon = True
150
        self.daemon_connection = daemon_connection
151
        self.name = "Manhole"
152
        self.sigmask = sigmask
153
        self.serious = _ORIGINAL_EVENT()
154
        # time to wait for the manhole to get serious (to have a complete start)
155
        # see: http://emptysqua.re/blog/dawn-of-the-thread/
156
        self.start_timeout = start_timeout
157
        self.bind_delay = bind_delay
158
        self.connection_handler = connection_handler
159
        self.get_socket = get_socket
160
        self.should_run = False
161
162
    def stop(self):
163
        self.should_run = False
164
165
    def clone(self, **kwargs):
166
        """
167
        Make a fresh thread with the same options. This is usually used on dead threads.
168
        """
169
        return ManholeThread(
170
            self.get_socket, self.sigmask, self.start_timeout,
171
            connection_handler=self.connection_handler,
172
            daemon_connection=self.daemon_connection,
173
            **kwargs
174
        )
175
176
    def start(self):
177
        self.should_run = True
178
        super(ManholeThread, self).start()
179
        if not self.serious.wait(self.start_timeout) and not PY26:
180
            _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
181
182
    def run(self):
183
        """
184
        Runs the manhole loop. Only accepts one connection at a time because:
185
186
        * This thread is a daemon thread (exits when main thread exists).
187
        * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs.
188
        """
189
        self.serious.set()
190
        if signalfd and self.sigmask:
191
            signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask)
192
        pthread_setname_np(self.ident, self.name)
193
194
        if self.bind_delay:
195
            _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay)
196
            _ORIGINAL_SLEEP(self.bind_delay)
197
198
        sock = self.get_socket()
199
        while self.should_run:
200
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
201
            try:
202
                client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection)
203
                client.start()
204
                client.join()
205
            except (InterruptedError, socket.error) as e:
206
                if e.errno != errno.EINTR:
207
                    raise
208
                continue
209
            finally:
210
                client = None
211
212
213
class ManholeConnectionThread(_ORIGINAL_THREAD):
214
    """
215
    Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the
216
    main thread exits.
217
    """
218
219
    def __init__(self, client, connection_handler, daemon=False):
220
        super(ManholeConnectionThread, self).__init__()
221
        self.daemon = daemon
222
        self.client = force_original_socket(client)
223
        self.connection_handler = connection_handler
224
        self.name = "ManholeConnectionThread"
225
226
    def run(self):
227
        _LOG('Started ManholeConnectionThread thread. Checking credentials ...')
228
        pthread_setname_np(self.ident, "Manhole ----")
229
        pid, _, _ = self.check_credentials(self.client)
230
        pthread_setname_np(self.ident, "Manhole %s" % pid)
231
        self.connection_handler(self.client)
232
233
    @staticmethod
234
    def check_credentials(client):
235
        """
236
        Checks credentials for given socket.
237
        """
238
        pid, uid, gid = get_peercred(client)
239
240
        euid = os.geteuid()
241
        client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
242
        if uid not in (0, euid):
243
            raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % (
244
                client_name, euid
245
            ))
246
247
        _LOG("Accepted connection %s from %s" % (client, client_name))
248
        return pid, uid, gid
249
250
251
def handle_connection_exec(client):
252
    """
253
    Alternate connection handler. No output redirection.
254
    """
255
    client.settimeout(None)
256
    fh = os.fdopen(client.fileno())
257
    payload = fh.readline()
258
    while payload:
259
        exec(payload)
260
        payload = fh.readline()
261
262
263
def handle_connection_repl(client):
264
    """
265
    Handles connection.
266
    """
267
    client.settimeout(None)
268
269
    # # disable this till we have evidence that it's needed
270
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
271
    # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html
272
273
    backup = []
274
    old_interval = getinterval()
275
    patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))]
276
    if _MANHOLE.redirect_stderr:
277
        patches.append(('w', ('stderr', '__stderr__')))
278
    try:
279
        try:
280
            client_fd = client.fileno()
281
            for mode, names in patches:
282
                for name in names:
283
                    backup.append((name, getattr(sys, name)))
284
                    setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0))
285
            try:
286
                handle_repl(_MANHOLE.locals)
287
            except Exception as exc:
288
                _LOG("Failed with %r." % exc)
289
            _LOG("DONE.")
290
        finally:
291
            try:
292
                # Change the switch/check interval to something ridiculous. We don't want to have other thread try
293
                # to write to the redirected sys.__std*/sys.std* - it would fail horribly.
294
                setinterval(2147483647)
295
296
                client.close()  # close before it's too late. it may already be dead
297
                junk = []  # keep the old file objects alive for a bit
298
                for name, fh in backup:
299
                    junk.append(getattr(sys, name))
300
                    setattr(sys, name, fh)
301
                del backup
302
                for fh in junk:
303
                    try:
304
                        fh.close()
305
                    except IOError:
306
                        pass
307
                    del fh
308
                del junk
309
            finally:
310
                setinterval(old_interval)
311
                _LOG("Cleaned up.")
312
    except Exception:
313
        _LOG("ManholeConnectionThread thread failed:")
314
        _LOG(traceback.format_exc())
315
316
317
class ManholeConsole(code.InteractiveConsole):
318
    def __init__(self, *args, **kw):
319
        code.InteractiveConsole.__init__(self, *args, **kw)
320
        if _MANHOLE.redirect_stderr:
321
            self.file = sys.stderr
322
        else:
323
            self.file = sys.stdout
324
325
    def write(self, data):
326
        self.file.write(data)
327
328
329
def handle_repl(locals):
330
    """
331
    Dumps stacktraces and runs an interactive prompt (REPL).
332
    """
333
    dump_stacktraces()
334
    namespace = {
335
        'dump_stacktraces': dump_stacktraces,
336
        'sys': sys,
337
        'os': os,
338
        'socket': socket,
339
        'traceback': traceback,
340
    }
341
    if locals:
342
        namespace.update(locals)
343
    ManholeConsole(namespace).interact()
344
345
346
class Logger(object):
347
    """
348
    Internal object used for logging.
349
350
    Initially this is not configured. Until you call ``manhole.install()`` this logger object won't work (will raise
351
    ``NotInstalled``).
352
    """
353
    time = _get_original('time', 'time')
354
    enabled = True
355
    destination = None
356
357
    def configure(self, enabled, destination):
358
        self.enabled = enabled
359
        self.destination = destination
360
361
    def release(self):
362
        self.enabled = True
363
        self.destination = None
364
365
    def __call__(self, message):
366
        """
367
        Fail-ignorant logging function.
368
        """
369
        if self.enabled:
370
            if self.destination is None:
371
                raise NotInstalled("Manhole is not installed!")
372
            try:
373
                full_message = "Manhole[%.4f]: %s\n" % (self.time(), message)
374
375
                if isinstance(self.destination, int):
376
                    os.write(self.destination, full_message.encode('ascii', 'ignore'))
377
                else:
378
                    self.destination.write(full_message)
379
            except:  # pylint: disable=W0702
380
                pass
381
382
383
_LOG = Logger()
384
385
386
class Manhole(object):
387
    # Manhole core configuration
388
    # These are initialized when manhole is installed.
389
    daemon_connection = False
390
    locals = None
391
    original_os_fork = None
392
    original_os_forkpty = None
393
    redirect_stderr = True
394
    reinstall_delay = 0.5
395
    should_restart = None
396
    sigmask = _ALL_SIGNALS
397
    socket_path = None
398
    start_timeout = 0.5
399
    connection_handler = None
400
    previous_signal_handlers = None
401
    _thread = None
402
403
    def configure(self,
404
                  patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True,
405
                  start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False,
406
                  redirect_stderr=True, connection_handler=handle_connection_repl):
407
        self.socket_path = socket_path
408
        self.reinstall_delay = reinstall_delay
409
        self.redirect_stderr = redirect_stderr
410
        self.locals = locals
411
        self.sigmask = sigmask
412
        self.daemon_connection = daemon_connection
413
        self.start_timeout = start_timeout
414
        self.previous_signal_handlers = {}
415
        self.connection_handler = connection_handler
416
417
        if oneshot_on is None and activate_on is None and thread:
418
            self.thread.start()
419
            self.should_restart = True
420
421
        if oneshot_on is not None:
422
            oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on
423
            self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot))
424
425
        if activate_on is not None:
426
            activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on
427
            if activate_on == oneshot_on:
428
                raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal '
429
                                            'that you want to do oneshot activation !')
430
            self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal))
431
432
        atexit.register(self.remove_manhole_uds)
433
        if patch_fork:
434
            if activate_on is None and oneshot_on is None and socket_path is None:
435
                self.patch_os_fork_functions()
436
            else:
437
                if activate_on:
438
                    _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on)
439
                elif oneshot_on:
440
                    _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on)
441
                elif socket_path:
442
                    _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path)
443
444
    def release(self):
445
        if self._thread:
446
            self._thread.stop()
447
            self._thread = None
448
        self.remove_manhole_uds()
449
        self.restore_os_fork_functions()
450
        for sig, handler in self.previous_signal_handlers.items():
451
            signal.signal(sig, handler)
452
        self.previous_signal_handlers.clear()
453
454
    @property
455
    def thread(self):
456
        if self._thread is None:
457
            self._thread = ManholeThread(
458
                self.get_socket, self.sigmask, self.start_timeout, self.connection_handler,
459
                daemon_connection=self.daemon_connection
460
            )
461
        return self._thread
462
463
    @thread.setter
464
    def thread(self, value):
465
        self._thread = value
466
467
    def get_socket(self):
468
        sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM)
469
        name = self.remove_manhole_uds()
470
        sock.bind(name)
471
        sock.listen(5)
472
        _LOG("Manhole UDS path: " + name)
473
        return sock
474
475
    def reinstall(self):
476
        """
477
        Reinstalls the manhole. Checks if the thread is running. If not, it starts it again.
478
        """
479
        with _LOCK:
480
            if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE):
481
                self.thread = self.thread.clone(bind_delay=self.reinstall_delay)
482
                if self.should_restart:
483
                    self.thread.start()
484
485
    def handle_oneshot(self, _signum=None, _frame=None):
486
        try:
487
            sock = self.get_socket()
488
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
489
            client = force_original_socket(sock.accept()[0])
490
            ManholeConnectionThread.check_credentials(client)
491
            self.connection_handler(client)
492
        except:  # pylint: disable=W0702
493
            # we don't want to let any exception out, it might make the application misbehave
494
            _LOG("Manhole oneshot connection failed:")
495
            _LOG(traceback.format_exc())
496
        finally:
497
            self.remove_manhole_uds()
498
499
    def remove_manhole_uds(self):
500
        name = self.uds_name
501
        if os.path.exists(name):
502
            os.unlink(name)
503
        return name
504
505
    @property
506
    def uds_name(self):
507
        if self.socket_path is None:
508
            return "/tmp/manhole-%s" % os.getpid()
509
        return self.socket_path
510
511
    def patched_fork(self):
512
        """Fork a child process."""
513
        pid = self.original_os_fork()
514
        if not pid:
515
            _LOG('Fork detected. Reinstalling Manhole.')
516
            self.reinstall()
517
        return pid
518
519
    def patched_forkpty(self):
520
        """Fork a new process with a new pseudo-terminal as controlling tty."""
521
        pid, master_fd = self.original_os_forkpty()
522
        if not pid:
523
            _LOG('Fork detected. Reinstalling Manhole.')
524
            self.reinstall()
525
        return pid, master_fd
526
527
    def patch_os_fork_functions(self):
528
        self.original_os_fork, os.fork = os.fork, self.patched_fork
529
        self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty
530
        _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork))
531
532
    def restore_os_fork_functions(self):
533
        if self.original_os_fork:
534
            os.fork = self.original_os_fork
535
        if self.original_os_forkpty:
536
            os.forkpty = self.original_os_forkpty
537
538
    def activate_on_signal(self, _signum, _frame):
539
        self.thread.start()
540
541
542
def install(verbose=True,
543
            verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__,
544
            strict=True,
545
            **kwargs):
546
    """
547
    Installs the manhole.
548
549
    Args:
550
        verbose (bool): Set it to ``False`` to squelch the logging.
551
        verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr
552
            (stderr ``2`` file descriptor).
553
        patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched
554
        activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
555
            want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the
556
            thread active all the time.
557
        oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
558
            want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want
559
            threads at all.
560
        thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if
561
            ``oneshort_on`` or ``activate_on`` are used.
562
        sigmask (list of ints or signal names): Will set the signal mask to the given list (using
563
            ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable.
564
            **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine cause
565
            Python will force all the signal handling to be run in the main thread but signalfd doesn't.
566
        socket_path (str): Use a specifc path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This
567
            disables ``patch_fork`` as children cannot resuse the same path.
568
        reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This
569
            alleviates cleanup failures when using fork+exec patterns.
570
        locals (dict): Names to add to manhole interactive shell locals.
571
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
572
        redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``.
573
        connection_handler (function): Function that implements the connection handler (warning: this is for advanced
574
            users). Default: ``manhole.handle_connection``.
575
    """
576
    # pylint: disable=W0603
577
    global _MANHOLE
578
579
    with _LOCK:
580
        if _MANHOLE is None:
581
            _MANHOLE = Manhole()
582
        else:
583
            if strict:
584
                raise AlreadyInstalled("Manhole already installed!")
585
            else:
586
                _LOG.release()
587
                _MANHOLE.release()  # Threads might be started here
588
589
    _LOG.configure(verbose, verbose_destination)
590
    _MANHOLE.configure(**kwargs)  # Threads might be started here
591
    return _MANHOLE
592
593
594
def dump_stacktraces():
595
    """
596
    Dumps thread ids and tracebacks to stdout.
597
    """
598
    lines = []
599
    for thread_id, stack in sys._current_frames().items():  # pylint: disable=W0212
600
        lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % (
601
            os.getpid(), thread_id
602
        ))
603
        for filename, lineno, name, line in traceback.extract_stack(stack):
604
            lines.append('File: "%s", line %d, in %s' % (filename, lineno, name))
605
            if line:
606
                lines.append("  %s" % (line.strip()))
607
    lines.append("#############################################\n\n")
608
609
    print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)
610