Test Failed
Push — master ( a5ee34...588b1f )
by Justin M.
34s queued 10s
created

AsyncioEventLoop._close()   A

Complexity

Conditions 2

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 3.6875

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 1
cts 4
cp 0.25
crap 3.6875
1
"""Event loop implementation that uses the `asyncio` standard module.
2
3
The `asyncio` module was added to python standard library on 3.4, and it
4
provides a pure python implementation of an event loop library. It is used
5
as a fallback in case pyuv is not available(on python implementations other
6
than CPython).
7
8
Earlier python versions are supported through the `trollius` package, which
9
is a backport of `asyncio` that works on Python 2.6+.
10
"""
11 6
from __future__ import absolute_import
12
13 6
import logging
14 6
import os
15 6
import sys
16 6
from collections import deque
17
18 6
try:
19
    # For python 3.4+, use the standard library module
20 6
    import asyncio
21 2
except (ImportError, SyntaxError):
22
    # Fallback to trollius
23 2
    import trollius as asyncio
24
25 6
from .base import BaseEventLoop
26
27 6
logger = logging.getLogger(__name__)
28 6
debug, info, warn = (logger.debug, logger.info, logger.warning,)
29
30 6
loop_cls = asyncio.SelectorEventLoop
31 6
if os.name == 'nt':
32
    from asyncio.windows_utils import PipeHandle
33 6
    import msvcrt
34
35
    # On windows use ProactorEventLoop which support pipes and is backed by the
36
    # more powerful IOCP facility
37
    # NOTE: we override in the stdio case, because it doesn't work.
38
    loop_cls = asyncio.ProactorEventLoop
39
40
41 6
class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol,
42
                       asyncio.SubprocessProtocol):
43
44
    """`BaseEventLoop` subclass that uses `asyncio` as a backend."""
45
46 6
    def connection_made(self, transport):
47
        """Used to signal `asyncio.Protocol` of a successful connection."""
48 6
        self._transport = transport
49 6
        self._raw_transport = transport
50 6
        if isinstance(transport, asyncio.SubprocessTransport):
51 6
            self._transport = transport.get_pipe_transport(0)
52
53 6
    def connection_lost(self, exc):
54
        """Used to signal `asyncio.Protocol` of a lost connection."""
55
        self._on_error(exc.args[0] if exc else 'EOF')
56
57 6
    def data_received(self, data):
58
        """Used to signal `asyncio.Protocol` of incoming data."""
59
        if self._on_data:
60
            self._on_data(data)
61 4
            return
62 4
        self._queued_data.append(data)
63
64 6
    def pipe_connection_lost(self, fd, exc):
65
        """Used to signal `asyncio.SubprocessProtocol` of a lost connection."""
66 6
        self._on_error(exc.args[0] if exc else 'EOF')
67
68 6
    def pipe_data_received(self, fd, data):
69
        """Used to signal `asyncio.SubprocessProtocol` of incoming data."""
70 6
        if fd == 2:  # stderr fd number
71
            self._on_stderr(data)
72 6
        elif self._on_data:
73 6
            self._on_data(data)
74
        else:
75
            self._queued_data.append(data)
76
77 6
    def process_exited(self):
78
        """Used to signal `asyncio.SubprocessProtocol` when the child exits."""
79
        self._on_error('EOF')
80
81 6
    def _init(self):
82 6
        self._loop = loop_cls()
83 6
        self._queued_data = deque()
84 6
        self._fact = lambda: self
85 6
        self._raw_transport = None
86
87 6
    def _connect_tcp(self, address, port):
88
        coroutine = self._loop.create_connection(self._fact, address, port)
89
        self._loop.run_until_complete(coroutine)
90
91 6
    def _connect_socket(self, path):
92
        if os.name == 'nt':
93 2
            coroutine = self._loop.create_pipe_connection(self._fact, path)
94
        else:
95
            coroutine = self._loop.create_unix_connection(self._fact, path)
96
        self._loop.run_until_complete(coroutine)
97
98 6
    def _connect_stdio(self):
99
        if os.name == 'nt':
100
            pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno()))
0 ignored issues
show
introduced by
The variable msvcrt does not seem to be defined in case os.name == 'nt' on line 31 is False. Are you sure this can never be the case?
Loading history...
introduced by
The variable PipeHandle does not seem to be defined in case os.name == 'nt' on line 31 is False. Are you sure this can never be the case?
Loading history...
101
        else:
102
            pipe = sys.stdin
103 2
        coroutine = self._loop.connect_read_pipe(self._fact, pipe)
104
        self._loop.run_until_complete(coroutine)
105 2
        debug("native stdin connection successful")
106
107
        # Make sure subprocesses don't clobber stdout,
108
        # send the output to stderr instead.
109 2
        rename_stdout = os.dup(sys.stdout.fileno())
110 2
        os.dup2(sys.stderr.fileno(), sys.stdout.fileno())
111
112
        if os.name == 'nt':
113
            pipe = PipeHandle(msvcrt.get_osfhandle(rename_stdout))
114
        else:
115
            pipe = os.fdopen(rename_stdout, 'wb')
116
        coroutine = self._loop.connect_write_pipe(self._fact, pipe)
117
        self._loop.run_until_complete(coroutine)
118
        debug("native stdout connection successful")
119
120 6
    def _connect_child(self, argv):
121 6
        if os.name != 'nt':
122 6
            self._child_watcher = asyncio.get_child_watcher()
123 6
            self._child_watcher.attach_loop(self._loop)
124 6
        coroutine = self._loop.subprocess_exec(self._fact, *argv)
125 6
        self._loop.run_until_complete(coroutine)
126
127 6
    def _start_reading(self):
128 6
        pass
129
130 6
    def _send(self, data):
131 6
        self._transport.write(data)
132
133 6
    def _run(self):
134 6
        while self._queued_data:
135
            self._on_data(self._queued_data.popleft())
136 6
        self._loop.run_forever()
137
138 6
    def _stop(self):
139 6
        self._loop.stop()
140
141 6
    def _close(self):
142
        if self._raw_transport is not None:
143
            self._raw_transport.close()
144
        self._loop.close()
145
146 6
    def _threadsafe_call(self, fn):
147 6
        self._loop.call_soon_threadsafe(fn)
148
149 6
    def _setup_signals(self, signals):
150 6
        if os.name == 'nt':
151
            # add_signal_handler is not supported in win32
152
            self._signals = []
153
            return
154
155 6
        self._signals = list(signals)
156 6
        for signum in self._signals:
157 6
            self._loop.add_signal_handler(signum, self._on_signal, signum)
158
159 6
    def _teardown_signals(self):
160 6
        for signum in self._signals:
161
            self._loop.remove_signal_handler(signum)
162