1 | """AsyncIO TCP Server for Kytos.""" |
||
2 | |||
3 | 1 | import asyncio |
|
4 | 1 | import errno |
|
5 | 1 | import logging |
|
6 | |||
7 | 1 | from kytos.core.connection import Connection |
|
8 | 1 | from kytos.core.events import KytosEvent |
|
9 | |||
10 | 1 | LOG = logging.getLogger(__name__) |
|
11 | |||
12 | |||
13 | 1 | def exception_handler(loop, context): |
|
14 | """Exception handler to avoid tracebacks because of network timeouts.""" |
||
15 | 1 | exc = context.get('exception') |
|
16 | 1 | transport = context.get('transport') |
|
17 | |||
18 | 1 | if isinstance(exc, TimeoutError): |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
19 | 1 | LOG.info('Socket timeout: %r', transport) |
|
20 | 1 | elif isinstance(exc, OSError) and exc.errno == errno.EBADF: |
|
21 | 1 | LOG.info('Socket closed: %r', transport) |
|
22 | else: |
||
23 | 1 | loop.default_exception_handler(context) |
|
24 | |||
25 | |||
26 | 1 | class KytosServer: |
|
27 | """Abstraction of a TCP Server to listen to packages from the network. |
||
28 | |||
29 | The KytosServer will listen on the specified port |
||
30 | for any new TCP request from the network and then instantiate the |
||
31 | specified RequestHandler to handle the new request. |
||
32 | It creates a new thread for each Handler. |
||
33 | """ |
||
34 | |||
35 | 1 | def __init__(self, # pylint: disable=too-many-arguments |
|
36 | server_address, server_protocol, controller, |
||
37 | protocol_name, loop=None): |
||
38 | """Create the object without starting the server. |
||
39 | |||
40 | Args: |
||
41 | server_address (tuple): Address where the server is listening. |
||
42 | example: ('127.0.0.1', 80) |
||
43 | server_protocol (asyncio.Protocol): |
||
44 | Class that will be instantiated to handle each request. |
||
45 | controller (:class:`~kytos.core.controller.Controller`): |
||
46 | An instance of Kytos Controller class. |
||
47 | protocol_name (str): Southbound protocol name that will be used |
||
48 | """ |
||
49 | 1 | self.server_address = server_address |
|
50 | 1 | self.server_protocol = server_protocol |
|
51 | 1 | self.controller = controller |
|
52 | 1 | self.protocol_name = protocol_name |
|
53 | |||
54 | # This will be an `asyncio.Server` instance after `serve_forever` is |
||
55 | # called |
||
56 | 1 | self._server = None |
|
57 | |||
58 | # Here we compose the received `server_protocol` class with a `server` |
||
59 | # object pointing to this instance |
||
60 | 1 | self.server_protocol.server = self |
|
61 | |||
62 | 1 | self.loop = loop or asyncio.get_event_loop() |
|
63 | 1 | self.loop.set_exception_handler(exception_handler) |
|
64 | |||
65 | 1 | def serve_forever(self): |
|
66 | """Handle requests until an explicit shutdown() is called.""" |
||
67 | 1 | addr, port = self.server_address[0], self.server_address[1] |
|
68 | |||
69 | 1 | self._server = self.loop.create_server(self.server_protocol, |
|
70 | addr, port) |
||
71 | |||
72 | 1 | try: |
|
73 | 1 | task = self.loop.create_task(self._server) |
|
74 | 1 | LOG.info("Kytos listening at %s:%s", addr, port) |
|
75 | except Exception: |
||
76 | LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port) |
||
77 | task.close() |
||
78 | raise |
||
79 | |||
80 | 1 | def shutdown(self): |
|
81 | """Call .close() on underlying TCP server, closing client sockets.""" |
||
82 | self._server.close() |
||
83 | # self.loop.run_until_complete(self._server.wait_closed()) |
||
84 | |||
85 | |||
86 | 1 | class KytosServerProtocol(asyncio.Protocol): |
|
87 | """Kytos' main request handler. |
||
88 | |||
89 | It is instantiated once per connection between each switch and the |
||
90 | controller. |
||
91 | The setup method will dispatch a KytosEvent (``kytos/core.connection.new``) |
||
92 | on the controller, that will be processed by a Core App. |
||
93 | The finish method will close the connection and dispatch a KytosEvent |
||
94 | (``kytos/core.connection.closed``) on the controller. |
||
95 | """ |
||
96 | |||
97 | 1 | known_ports = { |
|
98 | 6633: 'openflow', |
||
99 | 6653: 'openflow' |
||
100 | } |
||
101 | |||
102 | 1 | def __init__(self): |
|
103 | """Initialize protocol and check if server attribute was set.""" |
||
104 | 1 | self._loop = asyncio.get_event_loop() |
|
105 | |||
106 | 1 | self.connection = None |
|
107 | 1 | self.transport = None |
|
108 | 1 | self._rest = b'' |
|
109 | |||
110 | # server attribute is set outside this class, in KytosServer.init() |
||
111 | # Here we initialize it to None to avoid pylint warnings |
||
112 | 1 | if not getattr(self, 'server'): |
|
113 | self.server = None |
||
114 | |||
115 | # Then we check if it was really set |
||
116 | 1 | if not self.server: |
|
117 | raise ValueError("server instance must be assigned before init") |
||
118 | |||
119 | 1 | def connection_made(self, transport): |
|
120 | """Handle new client connection, passing it to the controller. |
||
121 | |||
122 | Build a new Kytos `Connection` and send a ``kytos/core.connection.new`` |
||
123 | KytosEvent through the app buffer. |
||
124 | """ |
||
125 | 1 | self.transport = transport |
|
126 | |||
127 | 1 | addr, port = transport.get_extra_info('peername') |
|
128 | 1 | _, server_port = transport.get_extra_info('sockname') |
|
129 | 1 | socket = transport.get_extra_info('socket') |
|
130 | |||
131 | 1 | LOG.info("New connection from %s:%s", addr, port) |
|
132 | |||
133 | 1 | self.connection = Connection(addr, port, socket) |
|
134 | |||
135 | # This allows someone to inherit from KytosServer and start a server |
||
136 | # on another port to handle a different protocol. |
||
137 | 1 | if self.server.protocol_name: |
|
138 | 1 | self.known_ports[server_port] = self.server.protocol_name |
|
139 | |||
140 | 1 | if server_port in self.known_ports: |
|
141 | 1 | protocol_name = self.known_ports[server_port] |
|
142 | else: |
||
143 | protocol_name = f'{server_port:04d}' |
||
144 | 1 | self.connection.protocol.name = protocol_name |
|
145 | |||
146 | 1 | event_name = f'kytos/core.{protocol_name}.connection.new' |
|
147 | 1 | event = KytosEvent(name=event_name, |
|
148 | content={'source': self.connection}) |
||
149 | |||
150 | 1 | self._loop.create_task(self.server.controller.buffers.raw.aput(event)) |
|
151 | |||
152 | 1 | def data_received(self, data): |
|
153 | """Handle each request and place its data in the raw event buffer. |
||
154 | |||
155 | Sends the received binary data in a ``kytos/core.{protocol}.raw.in`` |
||
156 | event on the raw buffer. |
||
157 | """ |
||
158 | # max_size = 2**16 |
||
159 | # new_data = self.request.recv(max_size) |
||
160 | |||
161 | 1 | data = self._rest + data |
|
162 | |||
163 | 1 | LOG.debug("New data from %s:%s (%s bytes)", |
|
164 | self.connection.address, self.connection.port, len(data)) |
||
165 | |||
166 | # LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port, |
||
167 | # len(data), binascii.hexlify(data)) |
||
168 | |||
169 | 1 | content = {'source': self.connection, 'new_data': data} |
|
170 | 1 | event_name = f'kytos/core.{self.connection.protocol.name}.raw.in' |
|
171 | 1 | event = KytosEvent(name=event_name, content=content) |
|
172 | |||
173 | 1 | self._loop.create_task(self.server.controller.buffers.raw.aput(event)) |
|
174 | |||
175 | 1 | def connection_lost(self, exc): |
|
176 | """Close the connection socket and generate connection lost event. |
||
177 | |||
178 | Emits a ``kytos/core.{protocol}.connection.lost`` event through the |
||
179 | App buffer. |
||
180 | """ |
||
181 | 1 | reason = exc or "Request closed by client" |
|
182 | 1 | LOG.info("Connection lost with client %s:%s. Reason: %s", |
|
183 | self.connection.address, self.connection.port, reason) |
||
184 | |||
185 | 1 | self.connection.close() |
|
186 | |||
187 | 1 | content = {'source': self.connection} |
|
188 | 1 | if exc: |
|
189 | 1 | content['exception'] = exc |
|
190 | 1 | event_name = \ |
|
191 | f'kytos/core.{self.connection.protocol.name}.connection.lost' |
||
192 | 1 | event = KytosEvent(name=event_name, content=content) |
|
193 | |||
194 | self._loop.create_task(self.server.controller.buffers.app.aput(event)) |
||
195 |