Completed
Push — master ( 21c87f...9911f1 )
by Ionel Cristian
28s
created

handle_connection_repl()   F

Complexity

Conditions 11

Size

Total Lines 52

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 11
c 3
b 0
f 0
dl 0
loc 52
rs 3.8571

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like handle_connection_repl() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from __future__ import print_function
2
3
import atexit
4
import code
5
import errno
6
import os
7
import signal
8
import socket
9
import struct
10
import sys
11
import traceback
12
from contextlib import closing
13
14
__version__ = '1.3.0'
15
16
try:
17
    import signalfd
18
except ImportError:
19
    signalfd = None
20
try:
21
    string = basestring
22
except NameError:  # python 3
23
    string = str
24
try:
25
    InterruptedError = InterruptedError
26
except NameError:  # python <= 3.2
27
    InterruptedError = OSError
28
if hasattr(sys, 'setswitchinterval'):
29
    setinterval = sys.setswitchinterval
30
    getinterval = sys.getswitchinterval
31
else:
32
    setinterval = sys.setcheckinterval
33
    getinterval = sys.getcheckinterval
34
35
try:
36
    from eventlet.patcher import original as _original
37
38
    def _get_original(mod, name):
39
        return getattr(_original(mod), name)
40
except ImportError:
41
    try:
42
        from gevent.monkey import get_original as _get_original
43
    except ImportError:
44
        def _get_original(mod, name):
45
            return getattr(__import__(mod), name)
46
47
_ORIGINAL_SOCKET = _get_original('socket', 'socket')
48
_ORIGINAL_FROMFD = _get_original('socket', 'fromfd')
49
_ORIGINAL_FDOPEN = _get_original('os', 'fdopen')
50
_ORIGINAL_DUP = _get_original('os', 'dup')
51
_ORIGINAL_DUP2 = _get_original('os', 'dup2')
52
try:
53
    _ORIGINAL_ALLOCATE_LOCK = _get_original('thread', 'allocate_lock')
54
except ImportError:  # python 3
55
    _ORIGINAL_ALLOCATE_LOCK = _get_original('_thread', 'allocate_lock')
56
_ORIGINAL_THREAD = _get_original('threading', 'Thread')
57
_ORIGINAL_EVENT = _get_original('threading', 'Event')
58
_ORIGINAL__ACTIVE = _get_original('threading', '_active')
59
_ORIGINAL_SLEEP = _get_original('time', 'sleep')
60
61
PY3 = sys.version_info[0] == 3
62
PY26 = sys.version_info[:2] == (2, 6)
63
64
try:
65
    import ctypes
66
    import ctypes.util
67
68
    libpthread_path = ctypes.util.find_library("pthread")
69
    if not libpthread_path:
70
        raise ImportError
71
    libpthread = ctypes.CDLL(libpthread_path)
72
    if not hasattr(libpthread, "pthread_setname_np"):
73
        raise ImportError
74
    _pthread_setname_np = libpthread.pthread_setname_np
75
    _pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
76
    _pthread_setname_np.restype = ctypes.c_int
77
78
    def pthread_setname_np(ident, name):
79
        _pthread_setname_np(ident, name[:15].encode('utf8'))
80
except ImportError:
81
    def pthread_setname_np(ident, name):
82
        pass
83
84
if sys.platform == 'darwin' or sys.platform.startswith("freebsd"):
85
    _PEERCRED_LEVEL = getattr(socket, 'SOL_LOCAL', 0)
86
    _PEERCRED_OPTION = getattr(socket, 'LOCAL_PEERCRED', 1)
87
else:
88
    _PEERCRED_LEVEL = socket.SOL_SOCKET
89
    # TODO: Is this missing on some platforms?
90
    _PEERCRED_OPTION = getattr(socket, 'SO_PEERCRED', 17)
91
92
_ALL_SIGNALS = tuple(getattr(signal, sig) for sig in dir(signal)
93
                     if sig.startswith('SIG') and '_' not in sig)
94
95
# These (_LOG and _MANHOLE) will hold instances after install
96
_MANHOLE = None
97
_LOCK = _ORIGINAL_ALLOCATE_LOCK()
98
99
100
def force_original_socket(sock):
101
    if hasattr(sock, 'detach'):
102
        fd = sock.detach()
103
    else:
104
        fd = sock.fileno()
105
106
    return _ORIGINAL_FROMFD(fd, sock.family, sock.type, sock.proto)
107
108
109
def get_peercred(sock):
110
    """Gets the (pid, uid, gid) for the client on the given *connected* socket."""
111
    buf = sock.getsockopt(_PEERCRED_LEVEL, _PEERCRED_OPTION, struct.calcsize('3i'))
112
    return struct.unpack('3i', buf)
113
114
115
class AlreadyInstalled(Exception):
116
    pass
117
118
119
class NotInstalled(Exception):
120
    pass
121
122
123
class ConfigurationConflict(Exception):
124
    pass
125
126
127
class SuspiciousClient(Exception):
128
    pass
129
130
131
class ManholeThread(_ORIGINAL_THREAD):
132
    """
133
    Thread that runs the infamous "Manhole". This thread is a `daemon` thread - it will exit if the main thread
134
    exits.
135
136
    On connect, a different, non-daemon thread will be started - so that the process won't exit while there's a
137
    connection to the manole.
138
139
    Args:
140
        sigmask (list of singal numbers): Signals to block in this thread.
141
        start_timeout (float): Seconds to wait for the thread to start. Emits a message if the thread is not running
142
            when calling ``start()``.
143
        bind_delay (float): Seconds to delay socket binding. Default: `no delay`.
144
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
145
    """
146
147
    def __init__(self,
148
                 get_socket, sigmask, start_timeout, connection_handler,
149
                 bind_delay=None, daemon_connection=False):
150
        super(ManholeThread, self).__init__()
151
        self.daemon = True
152
        self.daemon_connection = daemon_connection
153
        self.name = "Manhole"
154
        self.sigmask = sigmask
155
        self.serious = _ORIGINAL_EVENT()
156
        # time to wait for the manhole to get serious (to have a complete start)
157
        # see: http://emptysqua.re/blog/dawn-of-the-thread/
158
        self.start_timeout = start_timeout
159
        self.bind_delay = bind_delay
160
        self.connection_handler = connection_handler
161
        self.get_socket = get_socket
162
        self.should_run = False
163
164
    def stop(self):
165
        self.should_run = False
166
167
    def clone(self, **kwargs):
168
        """
169
        Make a fresh thread with the same options. This is usually used on dead threads.
170
        """
171
        return ManholeThread(
172
            self.get_socket, self.sigmask, self.start_timeout,
173
            connection_handler=self.connection_handler,
174
            daemon_connection=self.daemon_connection,
175
            **kwargs
176
        )
177
178
    def start(self):
179
        self.should_run = True
180
        super(ManholeThread, self).start()
181
        if not self.serious.wait(self.start_timeout) and not PY26:
182
            _LOG("WARNING: Waited %s seconds but Manhole thread didn't start yet :(" % self.start_timeout)
183
184
    def run(self):
185
        """
186
        Runs the manhole loop. Only accepts one connection at a time because:
187
188
        * This thread is a daemon thread (exits when main thread exists).
189
        * The connection need exclusive access to stdin, stderr and stdout so it can redirect inputs and outputs.
190
        """
191
        self.serious.set()
192
        if signalfd and self.sigmask:
193
            signalfd.sigprocmask(signalfd.SIG_BLOCK, self.sigmask)
194
        pthread_setname_np(self.ident, self.name)
195
196
        if self.bind_delay:
197
            _LOG("Delaying UDS binding %s seconds ..." % self.bind_delay)
198
            _ORIGINAL_SLEEP(self.bind_delay)
199
200
        sock = self.get_socket()
201
        while self.should_run:
202
            _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
203
            try:
204
                client = ManholeConnectionThread(sock.accept()[0], self.connection_handler, self.daemon_connection)
205
                client.start()
206
                client.join()
207
            except (InterruptedError, socket.error) as e:
208
                if e.errno != errno.EINTR:
209
                    raise
210
                continue
211
            finally:
212
                client = None
213
214
215
class ManholeConnectionThread(_ORIGINAL_THREAD):
216
    """
217
    Manhole thread that handles the connection. This thread is a normal thread (non-daemon) - it won't exit if the
218
    main thread exits.
219
    """
220
221
    def __init__(self, client, connection_handler, daemon=False):
222
        super(ManholeConnectionThread, self).__init__()
223
        self.daemon = daemon
224
        self.client = force_original_socket(client)
225
        self.connection_handler = connection_handler
226
        self.name = "ManholeConnectionThread"
227
228
    def run(self):
229
        _LOG('Started ManholeConnectionThread thread. Checking credentials ...')
230
        pthread_setname_np(self.ident, "Manhole ----")
231
        pid, _, _ = self.check_credentials(self.client)
232
        pthread_setname_np(self.ident, "Manhole %s" % pid)
233
        try:
234
            self.connection_handler(self.client)
235
        except BaseException as exc:
236
            _LOG("ManholeConnectionThread failure: %r" % exc)
237
238
    @staticmethod
239
    def check_credentials(client):
240
        """
241
        Checks credentials for given socket.
242
        """
243
        pid, uid, gid = get_peercred(client)
244
245
        euid = os.geteuid()
246
        client_name = "PID:%s UID:%s GID:%s" % (pid, uid, gid)
247
        if uid not in (0, euid):
248
            raise SuspiciousClient("Can't accept client with %s. It doesn't match the current EUID:%s or ROOT." % (
249
                client_name, euid
250
            ))
251
252
        _LOG("Accepted connection %s from %s" % (client, client_name))
253
        return pid, uid, gid
254
255
256
def handle_connection_exec(client):
257
    """
258
    Alternate connection handler. No output redirection.
259
    """
260
    client.settimeout(None)
261
    fh = os.fdopen(client.fileno())
262
    payload = fh.readline()
263
    while payload:
264
        exec(payload)
265
        payload = fh.readline()
266
267
268
def handle_connection_repl(client):
269
    """
270
    Handles connection.
271
    """
272
    client.settimeout(None)
273
    # # disable this till we have evidence that it's needed
274
    # client.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 0)
275
    # # Note: setting SO_RCVBUF on UDS has no effect, see: http://man7.org/linux/man-pages/man7/unix.7.html
276
277
    backup = []
278
    old_interval = getinterval()
279
    patches = [('r', ('stdin', '__stdin__')), ('w', ('stdout', '__stdout__'))]
280
    if _MANHOLE.redirect_stderr:
281
        patches.append(('w', ('stderr', '__stderr__')))
282
    try:
283
        client_fd = client.fileno()
284
        for mode, names in patches:
285
            for name in names:
286
                backup.append((name, getattr(sys, name)))
287
                setattr(sys, name, _ORIGINAL_FDOPEN(client_fd, mode, 1 if PY3 else 0))
288
        try:
289
            handle_repl(_MANHOLE.locals)
290
        except Exception as exc:
291
            _LOG("REPL failed with %r." % exc)
292
        _LOG("DONE.")
293
    finally:
294
        try:
295
            # Change the switch/check interval to something ridiculous. We don't want to have other thread try
296
            # to write to the redirected sys.__std*/sys.std* - it would fail horribly.
297
            setinterval(2147483647)
298
            try:
299
                client.close()  # close before it's too late. it may already be dead
300
            except IOError:
301
                pass
302
            junk = []  # keep the old file objects alive for a bit
303
            for name, fh in backup:
304
                junk.append(getattr(sys, name))
305
                setattr(sys, name, fh)
306
            del backup
307
            for fh in junk:
308
                try:
309
                    if hasattr(fh, 'detach'):
310
                        fh.detach()
311
                    else:
312
                        fh.close()
313
                except IOError:
314
                    pass
315
                del fh
316
            del junk
317
        finally:
318
            setinterval(old_interval)
319
            _LOG("Cleaned up.")
320
321
322
323
class ManholeConsole(code.InteractiveConsole):
324
    def __init__(self, *args, **kw):
325
        code.InteractiveConsole.__init__(self, *args, **kw)
326
        if _MANHOLE.redirect_stderr:
327
            self.file = sys.stderr
328
        else:
329
            self.file = sys.stdout
330
331
    def write(self, data):
332
        self.file.write(data)
333
334
335
def handle_repl(locals):
336
    """
337
    Dumps stacktraces and runs an interactive prompt (REPL).
338
    """
339
    dump_stacktraces()
340
    namespace = {
341
        'dump_stacktraces': dump_stacktraces,
342
        'sys': sys,
343
        'os': os,
344
        'socket': socket,
345
        'traceback': traceback,
346
    }
347
    if locals:
348
        namespace.update(locals)
349
    ManholeConsole(namespace).interact()
350
351
352
class Logger(object):
353
    """
354
    Internal object used for logging.
355
356
    Initially this is not configured. Until you call ``manhole.install()`` this logger object won't work (will raise
357
    ``NotInstalled``).
358
    """
359
    time = _get_original('time', 'time')
360
    enabled = True
361
    destination = None
362
363
    def configure(self, enabled, destination):
364
        self.enabled = enabled
365
        self.destination = destination
366
367
    def release(self):
368
        self.enabled = True
369
        self.destination = None
370
371
    def __call__(self, message):
372
        """
373
        Fail-ignorant logging function.
374
        """
375
        if self.enabled:
376
            if self.destination is None:
377
                raise NotInstalled("Manhole is not installed!")
378
            try:
379
                full_message = "Manhole[%.4f]: %s\n" % (self.time(), message)
380
381
                if isinstance(self.destination, int):
382
                    os.write(self.destination, full_message.encode('ascii', 'ignore'))
383
                else:
384
                    self.destination.write(full_message)
385
            except:  # pylint: disable=W0702
386
                pass
387
388
389
_LOG = Logger()
390
391
392
class Manhole(object):
393
    # Manhole core configuration
394
    # These are initialized when manhole is installed.
395
    daemon_connection = False
396
    locals = None
397
    original_os_fork = None
398
    original_os_forkpty = None
399
    redirect_stderr = True
400
    reinstall_delay = 0.5
401
    should_restart = None
402
    sigmask = _ALL_SIGNALS
403
    socket_path = None
404
    start_timeout = 0.5
405
    connection_handler = None
406
    previous_signal_handlers = None
407
    _thread = None
408
409
    def configure(self,
410
                  patch_fork=True, activate_on=None, sigmask=_ALL_SIGNALS, oneshot_on=None, thread=True,
411
                  start_timeout=0.5, socket_path=None, reinstall_delay=0.5, locals=None, daemon_connection=False,
412
                  redirect_stderr=True, connection_handler=handle_connection_repl):
413
        self.socket_path = socket_path
414
        self.reinstall_delay = reinstall_delay
415
        self.redirect_stderr = redirect_stderr
416
        self.locals = locals
417
        self.sigmask = sigmask
418
        self.daemon_connection = daemon_connection
419
        self.start_timeout = start_timeout
420
        self.previous_signal_handlers = {}
421
        self.connection_handler = connection_handler
422
423
        if oneshot_on is None and activate_on is None and thread:
424
            self.thread.start()
425
            self.should_restart = True
426
427
        if oneshot_on is not None:
428
            oneshot_on = getattr(signal, 'SIG' + oneshot_on) if isinstance(oneshot_on, string) else oneshot_on
429
            self.previous_signal_handlers.setdefault(oneshot_on, signal.signal(oneshot_on, self.handle_oneshot))
430
431
        if activate_on is not None:
432
            activate_on = getattr(signal, 'SIG' + activate_on) if isinstance(activate_on, string) else activate_on
433
            if activate_on == oneshot_on:
434
                raise ConfigurationConflict('You cannot do activation of the Manhole thread on the same signal '
435
                                            'that you want to do oneshot activation !')
436
            self.previous_signal_handlers.setdefault(activate_on, signal.signal(activate_on, self.activate_on_signal))
437
438
        atexit.register(self.remove_manhole_uds)
439
        if patch_fork:
440
            if activate_on is None and oneshot_on is None and socket_path is None:
441
                self.patch_os_fork_functions()
442
            else:
443
                if activate_on:
444
                    _LOG("Not patching os.fork and os.forkpty. Activation is done by signal %s" % activate_on)
445
                elif oneshot_on:
446
                    _LOG("Not patching os.fork and os.forkpty. Oneshot activation is done by signal %s" % oneshot_on)
447
                elif socket_path:
448
                    _LOG("Not patching os.fork and os.forkpty. Using user socket path %s" % socket_path)
449
450
    def release(self):
451
        if self._thread:
452
            self._thread.stop()
453
            self._thread = None
454
        self.remove_manhole_uds()
455
        self.restore_os_fork_functions()
456
        for sig, handler in self.previous_signal_handlers.items():
457
            signal.signal(sig, handler)
458
        self.previous_signal_handlers.clear()
459
460
    @property
461
    def thread(self):
462
        if self._thread is None:
463
            self._thread = ManholeThread(
464
                self.get_socket, self.sigmask, self.start_timeout, self.connection_handler,
465
                daemon_connection=self.daemon_connection
466
            )
467
        return self._thread
468
469
    @thread.setter
470
    def thread(self, value):
471
        self._thread = value
472
473
    def get_socket(self):
474
        sock = _ORIGINAL_SOCKET(socket.AF_UNIX, socket.SOCK_STREAM)
475
        name = self.remove_manhole_uds()
476
        sock.bind(name)
477
        sock.listen(5)
478
        _LOG("Manhole UDS path: " + name)
479
        return sock
480
481
    def reinstall(self):
482
        """
483
        Reinstalls the manhole. Checks if the thread is running. If not, it starts it again.
484
        """
485
        with _LOCK:
486
            if not (self.thread.is_alive() and self.thread in _ORIGINAL__ACTIVE):
487
                self.thread = self.thread.clone(bind_delay=self.reinstall_delay)
488
                if self.should_restart:
489
                    self.thread.start()
490
491
    def handle_oneshot(self, _signum=None, _frame=None):
492
        try:
493
            try:
494
                sock = self.get_socket()
495
                _LOG("Waiting for new connection (in pid:%s) ..." % os.getpid())
496
                client = force_original_socket(sock.accept()[0])
497
                ManholeConnectionThread.check_credentials(client)
498
                self.connection_handler(client)
499
            finally:
500
                self.remove_manhole_uds()
501
        except BaseException as exc:  # pylint: disable=W0702
502
            # we don't want to let any exception out, it might make the application misbehave
503
            _LOG("Oneshot failure: %r" % exc)
504
505
    def remove_manhole_uds(self):
506
        name = self.uds_name
507
        if os.path.exists(name):
508
            os.unlink(name)
509
        return name
510
511
    @property
512
    def uds_name(self):
513
        if self.socket_path is None:
514
            return "/tmp/manhole-%s" % os.getpid()
515
        return self.socket_path
516
517
    def patched_fork(self):
518
        """Fork a child process."""
519
        pid = self.original_os_fork()
520
        if not pid:
521
            _LOG('Fork detected. Reinstalling Manhole.')
522
            self.reinstall()
523
        return pid
524
525
    def patched_forkpty(self):
526
        """Fork a new process with a new pseudo-terminal as controlling tty."""
527
        pid, master_fd = self.original_os_forkpty()
528
        if not pid:
529
            _LOG('Fork detected. Reinstalling Manhole.')
530
            self.reinstall()
531
        return pid, master_fd
532
533
    def patch_os_fork_functions(self):
534
        self.original_os_fork, os.fork = os.fork, self.patched_fork
535
        self.original_os_forkpty, os.forkpty = os.forkpty, self.patched_forkpty
536
        _LOG("Patched %s and %s." % (self.original_os_fork, self.original_os_fork))
537
538
    def restore_os_fork_functions(self):
539
        if self.original_os_fork:
540
            os.fork = self.original_os_fork
541
        if self.original_os_forkpty:
542
            os.forkpty = self.original_os_forkpty
543
544
    def activate_on_signal(self, _signum, _frame):
545
        self.thread.start()
546
547
548
def install(verbose=True,
549
            verbose_destination=sys.__stderr__.fileno() if hasattr(sys.__stderr__, 'fileno') else sys.__stderr__,
550
            strict=True,
551
            **kwargs):
552
    """
553
    Installs the manhole.
554
555
    Args:
556
        verbose (bool): Set it to ``False`` to squelch the logging.
557
        verbose_destination (file descriptor or handle): Destination for verbose messages. Default is unbuffered stderr
558
            (stderr ``2`` file descriptor).
559
        patch_fork (bool): Set it to ``False`` if you don't want your ``os.fork`` and ``os.forkpy`` monkeypatched
560
        activate_on (int or signal name): set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
561
            want the Manhole thread to start when this signal is sent. This is desireable in case you don't want the
562
            thread active all the time.
563
        oneshot_on (int or signal name): Set to ``"USR1"``, ``"USR2"`` or some other signal name, or a number if you
564
            want the Manhole to listen for connection in the signal handler. This is desireable in case you don't want
565
            threads at all.
566
        thread (bool): Start the always-on ManholeThread. Default: ``True``. Automatically switched to ``False`` if
567
            ``oneshort_on`` or ``activate_on`` are used.
568
        sigmask (list of ints or signal names): Will set the signal mask to the given list (using
569
            ``signalfd.sigprocmask``). No action is done if ``signalfd`` is not importable.
570
            **NOTE**: This is done so that the Manhole thread doesn't *steal* any signals; Normally that is fine cause
571
            Python will force all the signal handling to be run in the main thread but signalfd doesn't.
572
        socket_path (str): Use a specifc path for the unix domain socket (instead of ``/tmp/manhole-<pid>``). This
573
            disables ``patch_fork`` as children cannot resuse the same path.
574
        reinstall_delay (float): Delay the unix domain socket creation *reinstall_delay* seconds. This
575
            alleviates cleanup failures when using fork+exec patterns.
576
        locals (dict): Names to add to manhole interactive shell locals.
577
        daemon_connection (bool): The connection thread is daemonic (dies on app exit). Default: ``False``.
578
        redirect_stderr (bool): Redirect output from stderr to manhole console. Default: ``True``.
579
        connection_handler (function): Function that implements the connection handler (warning: this is for advanced
580
            users). Default: ``manhole.handle_connection``.
581
    """
582
    # pylint: disable=W0603
583
    global _MANHOLE
584
585
    with _LOCK:
586
        if _MANHOLE is None:
587
            _MANHOLE = Manhole()
588
        else:
589
            if strict:
590
                raise AlreadyInstalled("Manhole already installed!")
591
            else:
592
                _LOG.release()
593
                _MANHOLE.release()  # Threads might be started here
594
595
    _LOG.configure(verbose, verbose_destination)
596
    _MANHOLE.configure(**kwargs)  # Threads might be started here
597
    return _MANHOLE
598
599
600
def dump_stacktraces():
601
    """
602
    Dumps thread ids and tracebacks to stdout.
603
    """
604
    lines = []
605
    for thread_id, stack in sys._current_frames().items():  # pylint: disable=W0212
606
        lines.append("\n######### ProcessID=%s, ThreadID=%s #########" % (
607
            os.getpid(), thread_id
608
        ))
609
        for filename, lineno, name, line in traceback.extract_stack(stack):
610
            lines.append('File: "%s", line %d, in %s' % (filename, lineno, name))
611
            if line:
612
                lines.append("  %s" % (line.strip()))
613
    lines.append("#############################################\n\n")
614
615
    print('\n'.join(lines), file=sys.stderr if _MANHOLE.redirect_stderr else sys.stdout)
616