|
1
|
|
|
"""AsyncIO TCP Server for Kytos.""" |
|
2
|
|
|
|
|
3
|
1 |
|
import asyncio |
|
4
|
1 |
|
import errno |
|
5
|
1 |
|
import logging |
|
6
|
|
|
|
|
7
|
1 |
|
from kytos.core.connection import Connection |
|
8
|
1 |
|
from kytos.core.events import KytosEvent |
|
9
|
|
|
|
|
10
|
1 |
|
LOG = logging.getLogger(__name__) |
|
11
|
|
|
|
|
12
|
|
|
|
|
13
|
1 |
|
def exception_handler(loop, context): |
|
14
|
|
|
"""Exception handler to avoid tracebacks because of network timeouts.""" |
|
15
|
1 |
|
exc = context.get('exception') |
|
16
|
1 |
|
transport = context.get('transport') |
|
17
|
|
|
|
|
18
|
1 |
|
if isinstance(exc, TimeoutError): |
|
19
|
1 |
|
LOG.info('Socket timeout: %r', transport) |
|
20
|
1 |
|
elif isinstance(exc, OSError) and exc.errno == errno.EBADF: |
|
21
|
1 |
|
LOG.info('Socket closed: %r', transport) |
|
22
|
|
|
else: |
|
23
|
1 |
|
loop.default_exception_handler(context) |
|
24
|
|
|
|
|
25
|
|
|
|
|
26
|
1 |
|
class KytosServer: |
|
27
|
|
|
"""Abstraction of a TCP Server to listen to packages from the network. |
|
28
|
|
|
|
|
29
|
|
|
The KytosServer will listen on the specified port |
|
30
|
|
|
for any new TCP request from the network and then instantiate the |
|
31
|
|
|
specified RequestHandler to handle the new request. |
|
32
|
|
|
It creates a new thread for each Handler. |
|
33
|
|
|
""" |
|
34
|
|
|
|
|
35
|
1 |
|
def __init__(self, # pylint: disable=too-many-arguments |
|
36
|
|
|
server_address, server_protocol, controller, |
|
37
|
|
|
protocol_name, loop=None): |
|
38
|
|
|
"""Create the object without starting the server. |
|
39
|
|
|
|
|
40
|
|
|
Args: |
|
41
|
|
|
server_address (tuple): Address where the server is listening. |
|
42
|
|
|
example: ('127.0.0.1', 80) |
|
43
|
|
|
server_protocol (asyncio.Protocol): |
|
44
|
|
|
Class that will be instantiated to handle each request. |
|
45
|
|
|
controller (:class:`~kytos.core.controller.Controller`): |
|
46
|
|
|
An instance of Kytos Controller class. |
|
47
|
|
|
protocol_name (str): Southbound protocol name that will be used |
|
48
|
|
|
""" |
|
49
|
1 |
|
self.server_address = server_address |
|
50
|
1 |
|
self.server_protocol = server_protocol |
|
51
|
1 |
|
self.controller = controller |
|
52
|
1 |
|
self.protocol_name = protocol_name |
|
53
|
|
|
|
|
54
|
|
|
# This will be an `asyncio.Server` instance after `serve_forever` is |
|
55
|
|
|
# called |
|
56
|
1 |
|
self._server = None |
|
57
|
|
|
|
|
58
|
|
|
# Here we compose the received `server_protocol` class with a `server` |
|
59
|
|
|
# object pointing to this instance |
|
60
|
1 |
|
self.server_protocol.server = self |
|
61
|
|
|
|
|
62
|
1 |
|
self.loop = loop or asyncio.get_event_loop() |
|
63
|
1 |
|
self.loop.set_exception_handler(exception_handler) |
|
64
|
|
|
|
|
65
|
1 |
|
def serve_forever(self): |
|
66
|
|
|
"""Handle requests until an explicit shutdown() is called.""" |
|
67
|
1 |
|
addr, port = self.server_address[0], self.server_address[1] |
|
68
|
|
|
|
|
69
|
1 |
|
self._server = self.loop.create_server(self.server_protocol, |
|
70
|
|
|
addr, port) |
|
71
|
|
|
|
|
72
|
1 |
|
try: |
|
73
|
1 |
|
task = self.loop.create_task(self._server) |
|
74
|
1 |
|
LOG.info("Kytos listening at %s:%s", addr, port) |
|
75
|
|
|
except Exception: |
|
76
|
|
|
LOG.error('Failed to start Kytos TCP Server at %s:%s', addr, port) |
|
77
|
|
|
task.close() |
|
78
|
|
|
raise |
|
79
|
|
|
|
|
80
|
1 |
|
def shutdown(self): |
|
81
|
|
|
"""Call .close() on underlying TCP server, closing client sockets.""" |
|
82
|
|
|
self._server.close() |
|
83
|
|
|
# self.loop.run_until_complete(self._server.wait_closed()) |
|
84
|
|
|
|
|
85
|
|
|
|
|
86
|
1 |
|
class KytosServerProtocol(asyncio.Protocol): |
|
87
|
|
|
"""Kytos' main request handler. |
|
88
|
|
|
|
|
89
|
|
|
It is instantiated once per connection between each switch and the |
|
90
|
|
|
controller. |
|
91
|
|
|
The setup method will dispatch a KytosEvent (``kytos/core.connection.new``) |
|
92
|
|
|
on the controller, that will be processed by a Core App. |
|
93
|
|
|
The finish method will close the connection and dispatch a KytosEvent |
|
94
|
|
|
(``kytos/core.connection.closed``) on the controller. |
|
95
|
|
|
""" |
|
96
|
|
|
|
|
97
|
1 |
|
known_ports = { |
|
98
|
|
|
6633: 'openflow', |
|
99
|
|
|
6653: 'openflow' |
|
100
|
|
|
} |
|
101
|
|
|
|
|
102
|
1 |
|
def __init__(self): |
|
103
|
|
|
"""Initialize protocol and check if server attribute was set.""" |
|
104
|
1 |
|
self._loop = asyncio.get_event_loop() |
|
105
|
|
|
|
|
106
|
1 |
|
self.connection = None |
|
107
|
1 |
|
self.transport = None |
|
108
|
1 |
|
self._rest = b'' |
|
109
|
|
|
|
|
110
|
|
|
# server attribute is set outside this class, in KytosServer.init() |
|
111
|
|
|
# Here we initialize it to None to avoid pylint warnings |
|
112
|
1 |
|
if not getattr(self, 'server'): |
|
113
|
|
|
self.server = None |
|
114
|
|
|
|
|
115
|
|
|
# Then we check if it was really set |
|
116
|
1 |
|
if not self.server: |
|
117
|
|
|
raise ValueError("server instance must be assigned before init") |
|
118
|
|
|
|
|
119
|
1 |
|
def connection_made(self, transport): |
|
120
|
|
|
"""Handle new client connection, passing it to the controller. |
|
121
|
|
|
|
|
122
|
|
|
Build a new Kytos `Connection` and send a ``kytos/core.connection.new`` |
|
123
|
|
|
KytosEvent through the app buffer. |
|
124
|
|
|
""" |
|
125
|
1 |
|
self.transport = transport |
|
126
|
|
|
|
|
127
|
1 |
|
addr, port = transport.get_extra_info('peername') |
|
128
|
1 |
|
_, server_port = transport.get_extra_info('sockname') |
|
129
|
1 |
|
socket = transport.get_extra_info('socket') |
|
130
|
|
|
|
|
131
|
1 |
|
LOG.info("New connection from %s:%s", addr, port) |
|
132
|
|
|
|
|
133
|
1 |
|
self.connection = Connection(addr, port, socket) |
|
134
|
|
|
|
|
135
|
|
|
# This allows someone to inherit from KytosServer and start a server |
|
136
|
|
|
# on another port to handle a different protocol. |
|
137
|
1 |
|
if self.server.protocol_name: |
|
138
|
1 |
|
self.known_ports[server_port] = self.server.protocol_name |
|
139
|
|
|
|
|
140
|
1 |
|
if server_port in self.known_ports: |
|
141
|
1 |
|
protocol_name = self.known_ports[server_port] |
|
142
|
|
|
else: |
|
143
|
|
|
protocol_name = f'{server_port:04d}' |
|
144
|
1 |
|
self.connection.protocol.name = protocol_name |
|
145
|
|
|
|
|
146
|
1 |
|
event_name = f'kytos/core.{protocol_name}.connection.new' |
|
147
|
1 |
|
event = KytosEvent(name=event_name, |
|
148
|
|
|
content={'source': self.connection}) |
|
149
|
|
|
|
|
150
|
1 |
|
self._loop.create_task(self.server.controller.buffers.conn.aput(event)) |
|
151
|
|
|
|
|
152
|
1 |
|
def data_received(self, data): |
|
153
|
|
|
"""Handle each request and place its data in the raw event buffer. |
|
154
|
|
|
|
|
155
|
|
|
Sends the received binary data in a ``kytos/core.{protocol}.raw.in`` |
|
156
|
|
|
event on the raw buffer. |
|
157
|
|
|
""" |
|
158
|
|
|
# max_size = 2**16 |
|
159
|
|
|
# new_data = self.request.recv(max_size) |
|
160
|
|
|
|
|
161
|
1 |
|
data = self._rest + data |
|
162
|
|
|
|
|
163
|
1 |
|
if logging.DEBUG >= LOG.getEffectiveLevel(): |
|
164
|
|
|
LOG.debug("New data from %s:%s (%s bytes)", |
|
165
|
|
|
self.connection.address, self.connection.port, len(data)) |
|
166
|
|
|
|
|
167
|
|
|
# LOG.debug("New data from %s:%s (%s bytes): %s", self.addr, self.port, |
|
168
|
|
|
# len(data), binascii.hexlify(data)) |
|
169
|
|
|
|
|
170
|
1 |
|
content = {'source': self.connection, 'new_data': data} |
|
171
|
1 |
|
event_name = f'kytos/core.{self.connection.protocol.name}.raw.in' |
|
172
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
|
173
|
|
|
|
|
174
|
1 |
|
self._loop.create_task(self.server.controller.buffers.raw.aput(event)) |
|
175
|
|
|
|
|
176
|
1 |
|
def connection_lost(self, exc): |
|
177
|
|
|
"""Close the connection socket and generate connection lost event. |
|
178
|
|
|
|
|
179
|
|
|
Emits a ``kytos/core.{protocol}.connection.lost`` event through the |
|
180
|
|
|
App buffer. |
|
181
|
|
|
""" |
|
182
|
1 |
|
reason = exc or "Request closed by client" |
|
183
|
1 |
|
LOG.info("Connection lost with client %s:%s. Reason: %s", |
|
184
|
|
|
self.connection.address, self.connection.port, reason) |
|
185
|
|
|
|
|
186
|
1 |
|
self.connection.close() |
|
187
|
|
|
|
|
188
|
1 |
|
content = {'source': self.connection} |
|
189
|
1 |
|
if exc: |
|
190
|
1 |
|
content['exception'] = exc |
|
191
|
1 |
|
event_name = \ |
|
192
|
|
|
f'kytos/core.{self.connection.protocol.name}.connection.lost' |
|
193
|
1 |
|
event = KytosEvent(name=event_name, content=content) |
|
194
|
|
|
|
|
195
|
|
|
self._loop.create_task(self.server.controller.buffers.conn.aput(event)) |
|
196
|
|
|
|