1
|
|
|
"""AsyncIO TCP Server for Kytos.""" |
2
|
|
|
|
3
|
1 |
|
import asyncio |
4
|
1 |
|
import errno |
5
|
1 |
|
import logging |
6
|
|
|
|
7
|
1 |
|
from kytos.core.connection import Connection |
8
|
1 |
|
from kytos.core.events import KytosEvent |
9
|
|
|
|
10
|
1 |
|
LOG = logging.getLogger(__name__) |
11
|
|
|
|
12
|
|
|
|
13
|
1 |
|
def exception_handler(loop, context): |
14
|
|
|
"""Exception handler to avoid tracebacks because of network timeouts.""" |
15
|
1 |
|
exc = context.get('exception') |
16
|
1 |
|
transport = context.get('transport') |
17
|
|
|
|
18
|
1 |
|
if isinstance(exc, TimeoutError): |
|
|
|
|
19
|
1 |
|
LOG.info('Socket timeout: %r', transport) |
20
|
1 |
|
elif isinstance(exc, OSError) and exc.errno == errno.EBADF: |
21
|
1 |
|
LOG.info('Socket closed: %r', transport) |
22
|
|
|
else: |
23
|
1 |
|
loop.default_exception_handler(context) |
24
|
|
|
|
25
|
|
|
|
26
|
1 |
|
class KytosServer: |
27
|
|
|
"""Abstraction of a TCP Server to listen to packages from the network. |
28
|
|
|
|
29
|
|
|
The KytosServer will listen on the specified port |
30
|
|
|
for any new TCP request from the network and then instantiate the |
31
|
|
|
specified RequestHandler to handle the new request. |
32
|
|
|
It creates a new thread for each Handler. |
33
|
|
|
""" |
34
|
|
|
|
35
|
1 |
|
def __init__(self, # pylint: disable=too-many-arguments |
36
|
|
|
server_address, server_protocol, controller, |
37
|
|
|
protocol_name, loop=None): |
38
|
|
|
"""Create the object without starting the server. |
39
|
|
|
|
40
|
|
|
Args: |
41
|
|
|
server_address (tuple): Address where the server is listening. |
42
|
|
|
example: ('127.0.0.1', 80) |
43
|
|
|
server_protocol (asyncio.Protocol): |
44
|
|
|
Class that will be instantiated to handle each request. |
45
|
|
|
controller (:class:`~kytos.core.controller.Controller`): |
46
|
|
|
An instance of Kytos Controller class. |
47
|
|
|
protocol_name (str): Southbound protocol name that will be used |
48
|
|
|
""" |
49
|
1 |
|
self.server_address = server_address |
50
|
1 |
|
self.server_protocol = server_protocol |
51
|
1 |
|
self.controller = controller |
52
|
1 |
|
self.protocol_name = protocol_name |
53
|
|
|
|
54
|
|
|
# This will be an `asyncio.Server` instance after `serve_forever` is |
55
|
|
|
# called |
56
|
1 |
|
self._server = None |
57
|
|
|
|
58
|
|
|
# Here we compose the received `server_protocol` class with a `server` |
59
|
|
|
# object pointing to this instance |
60
|
1 |
|
self.server_protocol.server = self |
61
|
|
|
|
62
|
1 |
|
self.loop = loop or asyncio.get_event_loop() |
63
|
1 |
|
self.loop.set_exception_handler(exception_handler) |
64
|
|
|
|
65
|
1 |
|
def serve_forever(self): |
66
|
|
|
"""Handle requests until an explicit shutdown() is called.""" |
67
|
1 |
|
addr, port = self.server_address[0], self.server_address[1] |
68
|
|
|
|
69
|
1 |
|
self._server = self.loop.create_server(self.server_protocol, |
70
|
|
|
addr, port) |
71
|
|
|
|
72
|
1 |
|
try: |
73
|
1 |
|
task = self.loop.create_task(self._server) |
74
|
1 |
|
LOG.info("Kytos listening at %s:%s", addr, port) |
75
|
|
|
except Exception: |
76
|
|
|
LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port) |
77
|
|
|
task.close() |
78
|
|
|
raise |
79
|
|
|
|
80
|
1 |
|
def shutdown(self): |
81
|
|
|
"""Call .close() on underlying TCP server, closing client sockets.""" |
82
|
|
|
self._server.close() |
83
|
|
|
# self.loop.run_until_complete(self._server.wait_closed()) |
84
|
|
|
|
85
|
|
|
|
86
|
1 |
|
class KytosServerProtocol(asyncio.Protocol): |
87
|
|
|
"""Kytos' main request handler. |
88
|
|
|
|
89
|
|
|
It is instantiated once per connection between each switch and the |
90
|
|
|
controller. |
91
|
|
|
The setup method will dispatch a KytosEvent (``kytos/core.connection.new``) |
92
|
|
|
on the controller, that will be processed by a Core App. |
93
|
|
|
The finish method will close the connection and dispatch a KytosEvent |
94
|
|
|
(``kytos/core.connection.closed``) on the controller. |
95
|
|
|
""" |
96
|
|
|
|
97
|
1 |
|
known_ports = { |
98
|
|
|
6633: 'openflow', |
99
|
|
|
6653: 'openflow' |
100
|
|
|
} |
101
|
|
|
|
102
|
1 |
|
def __init__(self): |
103
|
|
|
"""Initialize protocol and check if server attribute was set.""" |
104
|
1 |
|
self._loop = asyncio.get_event_loop() |
105
|
|
|
|
106
|
1 |
|
self.connection = None |
107
|
1 |
|
self.transport = None |
108
|
1 |
|
self._rest = b'' |
109
|
|
|
|
110
|
|
|
# server attribute is set outside this class, in KytosServer.init() |
111
|
|
|
# Here we initialize it to None to avoid pylint warnings |
112
|
1 |
|
if not getattr(self, 'server'): |
113
|
|
|
self.server = None |
114
|
|
|
|
115
|
|
|
# Then we check if it was really set |
116
|
1 |
|
if not self.server: |
117
|
|
|
raise ValueError("server instance must be assigned before init") |
118
|
|
|
|
119
|
1 |
|
def connection_made(self, transport): |
120
|
|
|
"""Handle new client connection, passing it to the controller. |
121
|
|
|
|
122
|
|
|
Build a new Kytos `Connection` and send a ``kytos/core.connection.new`` |
123
|
|
|
KytosEvent through the app buffer. |
124
|
|
|
""" |
125
|
1 |
|
self.transport = transport |
126
|
|
|
|
127
|
1 |
|
addr, port = transport.get_extra_info('peername') |
128
|
1 |
|
_, server_port = transport.get_extra_info('sockname') |
129
|
1 |
|
socket = transport.get_extra_info('socket') |
130
|
|
|
|
131
|
1 |
|
LOG.info("New connection from %s:%s", addr, port) |
132
|
|
|
|
133
|
1 |
|
self.connection = Connection(addr, port, socket) |
134
|
|
|
|
135
|
|
|
# This allows someone to inherit from KytosServer and start a server |
136
|
|
|
# on another port to handle a different protocol. |
137
|
1 |
|
if self.server.protocol_name: |
138
|
1 |
|
self.known_ports[server_port] = self.server.protocol_name |
139
|
|
|
|
140
|
1 |
|
if server_port in self.known_ports: |
141
|
1 |
|
protocol_name = self.known_ports[server_port] |
142
|
|
|
else: |
143
|
|
|
protocol_name = f'{server_port:04d}' |
144
|
1 |
|
self.connection.protocol.name = protocol_name |
145
|
|
|
|
146
|
1 |
|
event_name = f'kytos/core.{protocol_name}.connection.new' |
147
|
1 |
|
event = KytosEvent(name=event_name, |
148
|
|
|
content={'source': self.connection}) |
149
|
|
|
|
150
|
1 |
|
self._loop.create_task(self.server.controller.buffers.raw.aput(event)) |
151
|
|
|
|
152
|
1 |
|
def data_received(self, data): |
153
|
|
|
"""Handle each request and place its data in the raw event buffer. |
154
|
|
|
|
155
|
|
|
Sends the received binary data in a ``kytos/core.{protocol}.raw.in`` |
156
|
|
|
event on the raw buffer. |
157
|
|
|
""" |
158
|
|
|
# max_size = 2**16 |
159
|
|
|
# new_data = self.request.recv(max_size) |
160
|
|
|
|
161
|
1 |
|
data = self._rest + data |
162
|
|
|
|
163
|
1 |
|
LOG.debug("New data from %s:%s (%s bytes)", |
164
|
|
|
self.connection.address, self.connection.port, len(data)) |
165
|
|
|
|
166
|
|
|
# LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port, |
167
|
|
|
# len(data), binascii.hexlify(data)) |
168
|
|
|
|
169
|
1 |
|
content = {'source': self.connection, 'new_data': data} |
170
|
1 |
|
event_name = f'kytos/core.{self.connection.protocol.name}.raw.in' |
171
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
172
|
|
|
|
173
|
1 |
|
self._loop.create_task(self.server.controller.buffers.raw.aput(event)) |
174
|
|
|
|
175
|
1 |
|
def connection_lost(self, exc): |
176
|
|
|
"""Close the connection socket and generate connection lost event. |
177
|
|
|
|
178
|
|
|
Emits a ``kytos/core.{protocol}.connection.lost`` event through the |
179
|
|
|
App buffer. |
180
|
|
|
""" |
181
|
1 |
|
reason = exc or "Request closed by client" |
182
|
1 |
|
LOG.info("Connection lost with client %s:%s. Reason: %s", |
183
|
|
|
self.connection.address, self.connection.port, reason) |
184
|
|
|
|
185
|
1 |
|
self.connection.close() |
186
|
|
|
|
187
|
1 |
|
content = {'source': self.connection} |
188
|
1 |
|
if exc: |
189
|
1 |
|
content['exception'] = exc |
190
|
1 |
|
event_name = \ |
191
|
|
|
f'kytos/core.{self.connection.protocol.name}.connection.lost' |
192
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
193
|
|
|
|
194
|
|
|
self._loop.create_task(self.server.controller.buffers.app.aput(event)) |
195
|
|
|
|