Passed
Push — master ( 5b16cd...320625 )
by Vinicius
09:36 queued 07:15
created

kytos.core.atcp_server   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 196
Duplicated Lines 0 %

Test Coverage

Coverage 88.89%

Importance

Changes 0
Metric Value
eloc 91
dl 0
loc 196
rs 10
c 0
b 0
f 0
ccs 72
cts 81
cp 0.8889
wmc 18

7 Methods

Rating   Name   Duplication   Size   Complexity  
A KytosServer.serve_forever() 0 14 2
A KytosServerProtocol.connection_made() 0 32 3
A KytosServerProtocol.__init__() 0 16 3
A KytosServer.__init__() 0 29 1
A KytosServer.shutdown() 0 3 1
A KytosServerProtocol.data_received() 0 23 2
A KytosServerProtocol.connection_lost() 0 20 2

1 Function

Rating   Name   Duplication   Size   Complexity  
A exception_handler() 0 11 4
1
"""AsyncIO TCP Server for Kytos."""
2
3 1
import asyncio
4 1
import errno
5 1
import logging
6
7 1
from kytos.core.connection import Connection
8 1
from kytos.core.events import KytosEvent
9
10 1
LOG = logging.getLogger(__name__)
11
12
13 1
def exception_handler(loop, context):
14
    """Exception handler to avoid tracebacks because of network timeouts."""
15 1
    exc = context.get('exception')
16 1
    transport = context.get('transport')
17
18 1
    if isinstance(exc, TimeoutError):
19 1
        LOG.info('Socket timeout: %r', transport)
20 1
    elif isinstance(exc, OSError) and exc.errno == errno.EBADF:
21 1
        LOG.info('Socket closed: %r', transport)
22
    else:
23 1
        loop.default_exception_handler(context)
24
25
26 1
class KytosServer:
27
    """Abstraction of a TCP Server to listen to packages from the network.
28
29
    The KytosServer will listen on the specified port
30
    for any new TCP request from the network and then instantiate the
31
    specified RequestHandler to handle the new request.
32
    It creates a new thread for each Handler.
33
    """
34
35 1
    def __init__(self,  # pylint: disable=too-many-arguments
36
                 server_address, server_protocol, controller,
37
                 protocol_name, loop=None):
38
        """Create the object without starting the server.
39
40
        Args:
41
            server_address (tuple): Address where the server is listening.
42
                example: ('127.0.0.1', 80)
43
            server_protocol (asyncio.Protocol):
44
                Class that will be instantiated to handle each request.
45
            controller (:class:`~kytos.core.controller.Controller`):
46
                An instance of Kytos Controller class.
47
            protocol_name (str): Southbound protocol name that will be used
48
        """
49 1
        self.server_address = server_address
50 1
        self.server_protocol = server_protocol
51 1
        self.controller = controller
52 1
        self.protocol_name = protocol_name
53
54
        # This will be an `asyncio.Server` instance after `serve_forever` is
55
        # called
56 1
        self._server = None
57
58
        # Here we compose the received `server_protocol` class with a `server`
59
        # object pointing to this instance
60 1
        self.server_protocol.server = self
61
62 1
        self.loop = loop or asyncio.get_event_loop()
63 1
        self.loop.set_exception_handler(exception_handler)
64
65 1
    def serve_forever(self):
66
        """Handle requests until an explicit shutdown() is called."""
67 1
        addr, port = self.server_address[0], self.server_address[1]
68
69 1
        self._server = self.loop.create_server(self.server_protocol,
70
                                               addr, port)
71
72 1
        try:
73 1
            task = self.loop.create_task(self._server)
74 1
            LOG.info("Kytos listening at %s:%s", addr, port)
75
        except Exception:
76
            LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port)
77
            task.close()
78
            raise
79
80 1
    def shutdown(self):
81
        """Call .close() on underlying TCP server, closing client sockets."""
82
        self._server.close()
83
        # self.loop.run_until_complete(self._server.wait_closed())
84
85
86 1
class KytosServerProtocol(asyncio.Protocol):
87
    """Kytos' main request handler.
88
89
    It is instantiated once per connection between each switch and the
90
    controller.
91
    The setup method will dispatch a KytosEvent (``kytos/core.connection.new``)
92
    on the controller, that will be processed by a Core App.
93
    The finish method will close the connection and dispatch a KytosEvent
94
    (``kytos/core.connection.closed``) on the controller.
95
    """
96
97 1
    known_ports = {
98
        6633: 'openflow',
99
        6653: 'openflow'
100
    }
101
102 1
    def __init__(self):
103
        """Initialize protocol and check if server attribute was set."""
104 1
        self._loop = asyncio.get_event_loop()
105
106 1
        self.connection = None
107 1
        self.transport = None
108 1
        self._rest = b''
109
110
        # server attribute is set outside this class, in KytosServer.init()
111
        # Here we initialize it to None to avoid pylint warnings
112 1
        if not getattr(self, 'server'):
113
            self.server = None
114
115
        # Then we check if it was really set
116 1
        if not self.server:
117
            raise ValueError("server instance must be assigned before init")
118
119 1
    def connection_made(self, transport):
120
        """Handle new client connection, passing it to the controller.
121
122
        Build a new Kytos `Connection` and send a ``kytos/core.connection.new``
123
        KytosEvent through the app buffer.
124
        """
125 1
        self.transport = transport
126
127 1
        addr, port = transport.get_extra_info('peername')
128 1
        _, server_port = transport.get_extra_info('sockname')
129 1
        socket = transport.get_extra_info('socket')
130
131 1
        LOG.info("New connection from %s:%s", addr, port)
132
133 1
        self.connection = Connection(addr, port, socket)
134
135
        # This allows someone to inherit from KytosServer and start a server
136
        # on another port to handle a different protocol.
137 1
        if self.server.protocol_name:
138 1
            self.known_ports[server_port] = self.server.protocol_name
139
140 1
        if server_port in self.known_ports:
141 1
            protocol_name = self.known_ports[server_port]
142
        else:
143
            protocol_name = f'{server_port:04d}'
144 1
        self.connection.protocol.name = protocol_name
145
146 1
        event_name = f'kytos/core.{protocol_name}.connection.new'
147 1
        event = KytosEvent(name=event_name,
148
                           content={'source': self.connection})
149
150 1
        self._loop.create_task(self.server.controller.buffers.conn.aput(event))
151
152 1
    def data_received(self, data):
153
        """Handle each request and place its data in the raw event buffer.
154
155
        Sends the received binary data in a ``kytos/core.{protocol}.raw.in``
156
        event on the raw buffer.
157
        """
158
        # max_size = 2**16
159
        # new_data = self.request.recv(max_size)
160
161 1
        data = self._rest + data
162
163 1
        if logging.DEBUG >= LOG.getEffectiveLevel():
164
            LOG.debug("New data from %s:%s (%s bytes)",
165
                      self.connection.address, self.connection.port, len(data))
166
167
        # LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port,
168
        #           len(data), binascii.hexlify(data))
169
170 1
        content = {'source': self.connection, 'new_data': data}
171 1
        event_name = f'kytos/core.{self.connection.protocol.name}.raw.in'
172 1
        event = KytosEvent(name=event_name, content=content)
173
174 1
        self._loop.create_task(self.server.controller.buffers.raw.aput(event))
175
176 1
    def connection_lost(self, exc):
177
        """Close the connection socket and generate connection lost event.
178
179
        Emits a ``kytos/core.{protocol}.connection.lost`` event through the
180
        App buffer.
181
        """
182 1
        reason = exc or "Request closed by client"
183 1
        LOG.info("Connection lost with client %s:%s. Reason: %s",
184
                 self.connection.address, self.connection.port, reason)
185
186 1
        self.connection.close()
187
188 1
        content = {'source': self.connection}
189 1
        if exc:
190 1
            content['exception'] = exc
191 1
        event_name = \
192
            f'kytos/core.{self.connection.protocol.name}.connection.lost'
193 1
        event = KytosEvent(name=event_name, content=content)
194
195
        self._loop.create_task(self.server.controller.buffers.conn.aput(event))
196