Passed
Push — master ( adb41f...8de8c8 )
by Humberto
02:30 queued 11s
created

kytos/core/atcp_server.py (1 issue)

1
"""AsyncIO TCP Server for Kytos."""
2
3 1
import asyncio
4 1
import errno
5 1
import logging
6
7 1
from kytos.core.connection import Connection
8 1
from kytos.core.events import KytosEvent
9
10 1
LOG = logging.getLogger(__name__)
11
12
13 1
def exception_handler(loop, context):
14
    """Exception handler to avoid tracebacks because of network timeouts."""
15 1
    exc = context.get('exception')
16 1
    transport = context.get('transport')
17
18 1
    if isinstance(exc, TimeoutError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable TimeoutError does not seem to be defined.
Loading history...
19 1
        LOG.info('Socket timeout: %r', transport)
20 1
    elif isinstance(exc, OSError) and exc.errno == errno.EBADF:
21 1
        LOG.info('Socket closed: %r', transport)
22
    else:
23 1
        loop.default_exception_handler(context)
24
25
26 1
class KytosServer:
27
    """Abstraction of a TCP Server to listen to packages from the network.
28
29
    The KytosServer will listen on the specified port
30
    for any new TCP request from the network and then instantiate the
31
    specified RequestHandler to handle the new request.
32
    It creates a new thread for each Handler.
33
    """
34
35 1
    def __init__(self,  # pylint: disable=too-many-arguments
36
                 server_address, server_protocol, controller,
37
                 protocol_name, loop=None):
38
        """Create the object without starting the server.
39
40
        Args:
41
            server_address (tuple): Address where the server is listening.
42
                example: ('127.0.0.1', 80)
43
            server_protocol (asyncio.Protocol):
44
                Class that will be instantiated to handle each request.
45
            controller (:class:`~kytos.core.controller.Controller`):
46
                An instance of Kytos Controller class.
47
            protocol_name (str): Southbound protocol name that will be used
48
        """
49 1
        self.server_address = server_address
50 1
        self.server_protocol = server_protocol
51 1
        self.controller = controller
52 1
        self.protocol_name = protocol_name
53
54
        # This will be an `asyncio.Server` instance after `serve_forever` is
55
        # called
56 1
        self._server = None
57
58
        # Here we compose the received `server_protocol` class with a `server`
59
        # object pointing to this instance
60 1
        self.server_protocol.server = self
61
62 1
        self.loop = loop or asyncio.get_event_loop()
63 1
        self.loop.set_exception_handler(exception_handler)
64
65 1
    def serve_forever(self):
66
        """Handle requests until an explicit shutdown() is called."""
67 1
        addr, port = self.server_address[0], self.server_address[1]
68
69 1
        self._server = self.loop.create_server(self.server_protocol,
70
                                               addr, port)
71
72 1
        try:
73 1
            task = self.loop.create_task(self._server)
74 1
            LOG.info("Kytos listening at %s:%s", addr, port)
75
        except Exception:
76
            LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port)
77
            task.close()
78
            raise
79
80 1
    def shutdown(self):
81
        """Call .close() on underlying TCP server, closing client sockets."""
82
        self._server.close()
83
        # self.loop.run_until_complete(self._server.wait_closed())
84
85
86 1
class KytosServerProtocol(asyncio.Protocol):
87
    """Kytos' main request handler.
88
89
    It is instantiated once per connection between each switch and the
90
    controller.
91
    The setup method will dispatch a KytosEvent (``kytos/core.connection.new``)
92
    on the controller, that will be processed by a Core App.
93
    The finish method will close the connection and dispatch a KytosEvent
94
    (``kytos/core.connection.closed``) on the controller.
95
    """
96
97 1
    known_ports = {
98
        6633: 'openflow',
99
        6653: 'openflow'
100
    }
101
102 1
    def __init__(self):
103
        """Initialize protocol and check if server attribute was set."""
104 1
        self._loop = asyncio.get_event_loop()
105
106 1
        self.connection = None
107 1
        self.transport = None
108 1
        self._rest = b''
109
110
        # server attribute is set outside this class, in KytosServer.init()
111
        # Here we initialize it to None to avoid pylint warnings
112 1
        if not getattr(self, 'server'):
113
            self.server = None
114
115
        # Then we check if it was really set
116 1
        if not self.server:
117
            raise ValueError("server instance must be assigned before init")
118
119 1
    def connection_made(self, transport):
120
        """Handle new client connection, passing it to the controller.
121
122
        Build a new Kytos `Connection` and send a ``kytos/core.connection.new``
123
        KytosEvent through the app buffer.
124
        """
125 1
        self.transport = transport
126
127 1
        addr, port = transport.get_extra_info('peername')
128 1
        _, server_port = transport.get_extra_info('sockname')
129 1
        socket = transport.get_extra_info('socket')
130
131 1
        LOG.info("New connection from %s:%s", addr, port)
132
133 1
        self.connection = Connection(addr, port, socket)
134
135
        # This allows someone to inherit from KytosServer and start a server
136
        # on another port to handle a different protocol.
137 1
        if self.server.protocol_name:
138 1
            self.known_ports[server_port] = self.server.protocol_name
139
140 1
        if server_port in self.known_ports:
141 1
            protocol_name = self.known_ports[server_port]
142
        else:
143
            protocol_name = f'{server_port:04d}'
144 1
        self.connection.protocol.name = protocol_name
145
146 1
        event_name = f'kytos/core.{protocol_name}.connection.new'
147 1
        event = KytosEvent(name=event_name,
148
                           content={'source': self.connection})
149
150 1
        self._loop.create_task(self.server.controller.buffers.raw.aput(event))
151
152 1
    def data_received(self, data):
153
        """Handle each request and place its data in the raw event buffer.
154
155
        Sends the received binary data in a ``kytos/core.{protocol}.raw.in``
156
        event on the raw buffer.
157
        """
158
        # max_size = 2**16
159
        # new_data = self.request.recv(max_size)
160
161 1
        data = self._rest + data
162
163 1
        LOG.debug("New data from %s:%s (%s bytes)",
164
                  self.connection.address, self.connection.port, len(data))
165
166
        # LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port,
167
        #           len(data), binascii.hexlify(data))
168
169 1
        content = {'source': self.connection, 'new_data': data}
170 1
        event_name = f'kytos/core.{self.connection.protocol.name}.raw.in'
171 1
        event = KytosEvent(name=event_name, content=content)
172
173 1
        self._loop.create_task(self.server.controller.buffers.raw.aput(event))
174
175 1
    def connection_lost(self, exc):
176
        """Close the connection socket and generate connection lost event.
177
178
        Emits a ``kytos/core.{protocol}.connection.lost`` event through the
179
        App buffer.
180
        """
181 1
        reason = exc or "Request closed by client"
182 1
        LOG.info("Connection lost with client %s:%s. Reason: %s",
183
                 self.connection.address, self.connection.port, reason)
184
185 1
        self.connection.close()
186
187 1
        content = {'source': self.connection}
188 1
        if exc:
189 1
            content['exception'] = exc
190 1
        event_name = \
191
            f'kytos/core.{self.connection.protocol.name}.connection.lost'
192 1
        event = KytosEvent(name=event_name, content=content)
193
194
        self._loop.create_task(self.server.controller.buffers.app.aput(event))
195