1 | """Kytos SDN Platform main class. |
||
2 | |||
3 | This module contains the main class of Kytos, which is |
||
4 | :class:`~.core.Controller`. |
||
5 | |||
6 | Basic usage: |
||
7 | |||
8 | .. code-block:: python3 |
||
9 | |||
10 | from kytos.config import KytosConfig |
||
11 | from kytos.core import Controller |
||
12 | config = KytosConfig() |
||
13 | controller = Controller(config.options) |
||
14 | controller.start() |
||
15 | """ |
||
16 | 1 | import asyncio |
|
17 | 1 | import atexit |
|
18 | 1 | import json |
|
19 | 1 | import logging |
|
20 | 1 | import os |
|
21 | 1 | import re |
|
22 | 1 | import sys |
|
23 | 1 | import threading |
|
24 | 1 | from concurrent.futures import ThreadPoolExecutor |
|
25 | 1 | from importlib import reload as reload_module |
|
26 | 1 | from importlib import import_module |
|
27 | 1 | from importlib.util import module_from_spec, spec_from_file_location |
|
28 | 1 | from pathlib import Path |
|
29 | |||
30 | 1 | from kytos.core.api_server import APIServer |
|
31 | # from kytos.core.tcp_server import KytosRequestHandler, KytosServer |
||
32 | 1 | from kytos.core.atcp_server import KytosServer, KytosServerProtocol |
|
33 | 1 | from kytos.core.buffers import KytosBuffers |
|
34 | 1 | from kytos.core.config import KytosConfig |
|
35 | 1 | from kytos.core.connection import ConnectionState |
|
36 | 1 | from kytos.core.events import KytosEvent |
|
37 | 1 | from kytos.core.helpers import now |
|
38 | 1 | from kytos.core.interface import Interface |
|
39 | 1 | from kytos.core.logs import LogManager |
|
40 | 1 | from kytos.core.napps.base import NApp |
|
41 | 1 | from kytos.core.napps.manager import NAppsManager |
|
42 | 1 | from kytos.core.napps.napp_dir_listener import NAppDirListener |
|
43 | from kytos.core.switch import Switch |
||
44 | 1 | ||
45 | __all__ = ('Controller',) |
||
46 | |||
47 | 1 | ||
48 | class Controller: |
||
49 | """Main class of Kytos. |
||
50 | |||
51 | The main responsibilities of this class are: |
||
52 | - start a thread with :class:`~.core.tcp_server.KytosServer`; |
||
53 | - manage KytosNApps (install, load and unload); |
||
54 | - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`); |
||
55 | - manage which event should be sent to NApps methods; |
||
56 | - manage the buffers handlers, considering one thread per handler. |
||
57 | """ |
||
58 | |||
59 | # Created issue #568 for the disabled checks. |
||
60 | 1 | # pylint: disable=too-many-instance-attributes,too-many-public-methods |
|
61 | def __init__(self, options=None, loop=None): |
||
62 | """Init method of Controller class takes the parameters below. |
||
63 | |||
64 | Args: |
||
65 | options (:attr:`ParseArgs.args`): :attr:`options` attribute from an |
||
66 | instance of :class:`~kytos.core.config.KytosConfig` class. |
||
67 | 1 | """ |
|
68 | if options is None: |
||
69 | options = KytosConfig().options['daemon'] |
||
70 | 1 | ||
71 | 1 | self._loop = loop or asyncio.get_event_loop() |
|
72 | self._pool = ThreadPoolExecutor(max_workers=1) |
||
73 | |||
74 | 1 | #: dict: keep the main threads of the controller (buffers and handler) |
|
75 | self._threads = {} |
||
76 | 1 | #: KytosBuffers: KytosBuffer object with Controller buffers |
|
77 | self.buffers = KytosBuffers(loop=self._loop) |
||
78 | #: dict: keep track of the socket connections labeled by ``(ip, port)`` |
||
79 | #: |
||
80 | #: This dict stores all connections between the controller and the |
||
81 | #: switches. The key for this dict is a tuple (ip, port). The content |
||
82 | 1 | #: is another dict with the connection information. |
|
83 | self.connections = {} |
||
84 | #: dict: mapping of events and event listeners. |
||
85 | #: |
||
86 | #: The key of the dict is a KytosEvent (or a string that represent a |
||
87 | #: regex to match agains KytosEvents) and the value is a list of |
||
88 | 1 | #: methods that will receive the referenced event |
|
89 | self.events_listeners = {'kytos/core.connection.new': |
||
90 | [self.new_connection]} |
||
91 | |||
92 | #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance) |
||
93 | #: |
||
94 | #: The key is the napp name (string), while the value is the napp |
||
95 | 1 | #: instance itself. |
|
96 | self.napps = {} |
||
97 | 1 | #: Object generated by ParseArgs on config.py file |
|
98 | self.options = options |
||
99 | #: KytosServer: Instance of KytosServer that will be listening to TCP |
||
100 | 1 | #: connections. |
|
101 | self.server = None |
||
102 | #: dict: Current existing switches. |
||
103 | #: |
||
104 | 1 | #: The key is the switch dpid, while the value is a Switch object. |
|
105 | self.switches = {} # dpid: Switch() |
||
106 | |||
107 | 1 | #: datetime.datetime: Time when the controller finished starting. |
|
108 | self.started_at = None |
||
109 | |||
110 | 1 | #: logging.Logger: Logger instance used by Kytos. |
|
111 | self.log = None |
||
112 | |||
113 | 1 | #: API Server used to expose rest endpoints. |
|
114 | self.api_server = APIServer(__name__, self.options.listen, |
||
115 | self.options.api_port, |
||
116 | napps_dir=self.options.napps) |
||
117 | 1 | ||
118 | self._register_endpoints() |
||
119 | |||
120 | 1 | #: Observer that handle NApps when they are enabled or disabled. |
|
121 | self.napp_dir_listener = NAppDirListener(self) |
||
122 | |||
123 | self.napps_manager = NAppsManager(self) |
||
124 | |||
125 | 1 | #: Adding the napps 'enabled' directory into the PATH |
|
126 | #: Now you can access the enabled napps with: |
||
127 | 1 | #: from napps.<username>.<napp_name> import ?.... |
|
128 | sys.path.append(os.path.join(self.options.napps, os.pardir)) |
||
129 | 1 | ||
130 | 1 | def enable_logs(self): |
|
131 | 1 | """Register kytos log and enable the logs.""" |
|
132 | LogManager.load_config_file(self.options.logging, self.options.debug) |
||
133 | 1 | LogManager.enable_websocket(self.api_server.server) |
|
134 | self.log = logging.getLogger("controller") |
||
135 | |||
136 | def start(self, restart=False): |
||
137 | """Create pidfile and call start_controller method.""" |
||
138 | self.enable_logs() |
||
139 | if not restart: |
||
140 | 1 | self.create_pidfile() |
|
141 | self.start_controller() |
||
142 | |||
143 | def create_pidfile(self): |
||
144 | """Create a pidfile.""" |
||
145 | pid = os.getpid() |
||
146 | |||
147 | # Creates directory if it doesn't exist |
||
148 | # System can erase /var/run's content |
||
149 | pid_folder = Path(self.options.pidfile).parent |
||
150 | self.log.info(pid_folder) |
||
151 | |||
152 | # Pylint incorrectly infers Path objects |
||
153 | # https://github.com/PyCQA/pylint/issues/224 |
||
154 | # pylint: disable=no-member |
||
155 | if not pid_folder.exists(): |
||
156 | pid_folder.mkdir() |
||
157 | pid_folder.chmod(0o1777) |
||
158 | # pylint: enable=no-member |
||
159 | |||
160 | # Make sure the file is deleted when controller stops |
||
161 | atexit.register(Path(self.options.pidfile).unlink) |
||
162 | |||
163 | # Checks if a pidfile exists. Creates a new file. |
||
164 | try: |
||
165 | pidfile = open(self.options.pidfile, mode='x') |
||
166 | except OSError: |
||
167 | # This happens if there is a pidfile already. |
||
168 | # We shall check if the process that created the pidfile is still |
||
169 | # running. |
||
170 | try: |
||
171 | existing_file = open(self.options.pidfile, mode='r') |
||
172 | old_pid = int(existing_file.read()) |
||
173 | os.kill(old_pid, 0) |
||
174 | # If kill() doesn't return an error, there's a process running |
||
175 | # with the same PID. We assume it is Kytos and quit. |
||
176 | # Otherwise, overwrite the file and proceed. |
||
177 | error_msg = ("PID file {} exists. Delete it if Kytos is not " |
||
178 | "running. Aborting.") |
||
179 | sys.exit(error_msg.format(self.options.pidfile)) |
||
180 | except OSError: |
||
181 | try: |
||
182 | pidfile = open(self.options.pidfile, mode='w') |
||
183 | except OSError as exception: |
||
184 | error_msg = "Failed to create pidfile {}: {}." |
||
185 | sys.exit(error_msg.format(self.options.pidfile, exception)) |
||
186 | |||
187 | # Identifies the process that created the pidfile. |
||
188 | 1 | pidfile.write(str(pid)) |
|
189 | pidfile.close() |
||
190 | |||
191 | def start_controller(self): |
||
192 | """Start the controller. |
||
193 | |||
194 | Starts the KytosServer (TCP Server) coroutine. |
||
195 | Starts a thread for each buffer handler. |
||
196 | Load the installed apps. |
||
197 | """ |
||
198 | self.log.info("Starting Kytos - Kytos Controller") |
||
199 | self.server = KytosServer((self.options.listen, |
||
200 | int(self.options.port)), |
||
201 | KytosServerProtocol, |
||
202 | self, |
||
203 | self.options.protocol_name) |
||
204 | |||
205 | self.log.info("Starting TCP server: %s", self.server) |
||
206 | self.server.serve_forever() |
||
207 | |||
208 | def _stop_loop(_): |
||
209 | loop = asyncio.get_event_loop() |
||
210 | # print(_.result()) |
||
211 | threads = threading.enumerate() |
||
212 | self.log.debug("%s threads before loop.stop: %s", |
||
213 | len(threads), threads) |
||
214 | loop.stop() |
||
215 | |||
216 | async def _run_api_server_thread(executor): |
||
217 | log = logging.getLogger('controller.api_server_thread') |
||
218 | log.debug('starting') |
||
219 | # log.debug('creating tasks') |
||
220 | loop = asyncio.get_event_loop() |
||
221 | blocking_tasks = [ |
||
222 | loop.run_in_executor(executor, self.api_server.run) |
||
223 | ] |
||
224 | # log.debug('waiting for tasks') |
||
225 | completed, pending = await asyncio.wait(blocking_tasks) |
||
226 | # results = [t.result() for t in completed] |
||
227 | # log.debug('results: {!r}'.format(results)) |
||
228 | log.debug('completed: %d, pending: %d', |
||
229 | len(completed), len(pending)) |
||
230 | |||
231 | task = self._loop.create_task(self.raw_event_handler()) |
||
232 | task = self._loop.create_task(self.msg_in_event_handler()) |
||
233 | task = self._loop.create_task(self.msg_out_event_handler()) |
||
234 | task = self._loop.create_task(self.app_event_handler()) |
||
235 | task = self._loop.create_task(_run_api_server_thread(self._pool)) |
||
236 | task.add_done_callback(_stop_loop) |
||
237 | |||
238 | self.log.info("ThreadPool started: %s", self._pool) |
||
239 | |||
240 | # ASYNC TODO: ensure all threads started correctly |
||
241 | # This is critical, if any of them failed starting we should exit. |
||
242 | # sys.exit(error_msg.format(thread, exception)) |
||
243 | |||
244 | self.log.info("Loading Kytos NApps...") |
||
245 | self.napp_dir_listener.start() |
||
246 | self.pre_install_napps(self.options.napps_pre_installed) |
||
247 | self.load_napps() |
||
248 | 1 | ||
249 | self.started_at = now() |
||
250 | |||
251 | def _register_endpoints(self): |
||
252 | """Register all rest endpoint served by kytos. |
||
253 | |||
254 | - Register APIServer endpoints |
||
255 | 1 | - Register WebUI endpoints |
|
256 | - Register ``/api/kytos/core/config`` endpoint |
||
257 | 1 | """ |
|
258 | self.api_server.start_api() |
||
259 | # Register controller endpoints as /api/kytos/core/... |
||
260 | 1 | self.api_server.register_core_endpoint('config/', |
|
261 | self.configuration_endpoint) |
||
262 | |||
263 | 1 | self.api_server.register_core_endpoint( |
|
264 | 'reload/<username>/<napp_name>/', |
||
265 | self.rest_reload_napp) |
||
266 | 1 | self.api_server.register_core_endpoint('reload/all', |
|
267 | self.rest_reload_all_napps) |
||
268 | 1 | ||
269 | def register_rest_endpoint(self, url, function, methods): |
||
270 | 1 | """Deprecate in favor of @rest decorator.""" |
|
271 | self.api_server.register_rest_endpoint(url, function, methods) |
||
272 | |||
273 | def configuration_endpoint(self): |
||
274 | """Return the configuration options used by Kytos. |
||
275 | |||
276 | Returns: |
||
277 | 1 | string: Json with current configurations used by kytos. |
|
278 | |||
279 | 1 | """ |
|
280 | return json.dumps(self.options.__dict__) |
||
281 | |||
282 | def restart(self, graceful=True): |
||
283 | """Restart Kytos SDN Controller. |
||
284 | |||
285 | Args: |
||
286 | graceful(bool): Represents the way that Kytos will restart. |
||
287 | """ |
||
288 | if self.started_at is not None: |
||
289 | self.stop(graceful) |
||
290 | self.__init__(self.options) |
||
291 | 1 | ||
292 | self.start(restart=True) |
||
293 | |||
294 | def stop(self, graceful=True): |
||
295 | """Shutdown all services used by kytos. |
||
296 | |||
297 | This method should: |
||
298 | - stop all Websockets |
||
299 | - stop the API Server |
||
300 | - stop the Controller |
||
301 | """ |
||
302 | 1 | if self.started_at: |
|
303 | self.stop_controller(graceful) |
||
304 | |||
305 | def stop_controller(self, graceful=True): |
||
306 | """Stop the controller. |
||
307 | |||
308 | This method should: |
||
309 | - announce on the network that the controller will shutdown; |
||
310 | - stop receiving incoming packages; |
||
311 | - call the 'shutdown' method of each KytosNApp that is running; |
||
312 | - finish reading the events on all buffers; |
||
313 | - stop each running handler; |
||
314 | - stop all running threads; |
||
315 | - stop the KytosServer; |
||
316 | """ |
||
317 | self.log.info("Stopping Kytos") |
||
318 | |||
319 | self.buffers.send_stop_signal() |
||
320 | self.api_server.stop_api_server() |
||
321 | self.napp_dir_listener.stop() |
||
322 | |||
323 | self.log.info("Stopping threadpool: %s", self._pool) |
||
324 | |||
325 | threads = threading.enumerate() |
||
326 | self.log.debug("%s threads before threadpool shutdown: %s", |
||
327 | len(threads), threads) |
||
328 | |||
329 | self._pool.shutdown(wait=graceful) |
||
330 | |||
331 | # self.server.socket.shutdown() |
||
332 | # self.server.socket.close() |
||
333 | |||
334 | # for thread in self._threads.values(): |
||
335 | # self.log.info("Stopping thread: %s", thread.name) |
||
336 | # thread.join() |
||
337 | |||
338 | # for thread in self._threads.values(): |
||
339 | # while thread.is_alive(): |
||
340 | # self.log.info("Thread is alive: %s", thread.name) |
||
341 | # pass |
||
342 | |||
343 | self.started_at = None |
||
344 | self.unload_napps() |
||
345 | self.buffers = KytosBuffers() |
||
346 | |||
347 | # ASYNC TODO: close connections |
||
348 | # self.server.server_close() |
||
349 | |||
350 | 1 | # Shutdown the TCP server and the main asyncio loop |
|
351 | self.server.shutdown() |
||
352 | |||
353 | def status(self): |
||
354 | """Return status of Kytos Server. |
||
355 | |||
356 | If the controller kytos is running this method will be returned |
||
357 | "Running since 'Started_At'", otherwise "Stopped". |
||
358 | |||
359 | Returns: |
||
360 | string: String with kytos status. |
||
361 | |||
362 | """ |
||
363 | if self.started_at: |
||
364 | 1 | return "Running since %s" % self.started_at |
|
365 | return "Stopped" |
||
366 | |||
367 | def uptime(self): |
||
368 | """Return the uptime of kytos server. |
||
369 | |||
370 | This method should return: |
||
371 | - 0 if Kytos Server is stopped. |
||
372 | - (kytos.start_at - datetime.now) if Kytos Server is running. |
||
373 | |||
374 | Returns: |
||
375 | datetime.timedelta: The uptime interval. |
||
376 | |||
377 | 1 | """ |
|
378 | return now() - self.started_at if self.started_at else 0 |
||
379 | |||
380 | def notify_listeners(self, event): |
||
381 | """Send the event to the specified listeners. |
||
382 | |||
383 | Loops over self.events_listeners matching (by regexp) the attribute |
||
384 | name of the event with the keys of events_listeners. If a match occurs, |
||
385 | then send the event to each registered listener. |
||
386 | |||
387 | Args: |
||
388 | event (~kytos.core.KytosEvent): An instance of a KytosEvent. |
||
389 | """ |
||
390 | self.log.debug("looking for listeners for %s", event) |
||
391 | for event_regex, listeners in dict(self.events_listeners).items(): |
||
392 | # self.log.debug("listeners found for %s: %r => %s", event, |
||
393 | # event_regex, [l.__qualname__ for l in listeners]) |
||
394 | # Do not match if the event has more characters |
||
395 | # e.g. "shutdown" won't match "shutdown.kytos/of_core" |
||
396 | if event_regex[-1] != '$' or event_regex[-2] == '\\': |
||
397 | event_regex += '$' |
||
398 | if re.match(event_regex, event.name): |
||
399 | # self.log.debug('Calling listeners for %s', event) |
||
400 | 1 | for listener in listeners: |
|
401 | listener(event) |
||
402 | |||
403 | async def raw_event_handler(self): |
||
404 | """Handle raw events. |
||
405 | |||
406 | Listen to the raw_buffer and send all its events to the |
||
407 | corresponding listeners. |
||
408 | """ |
||
409 | self.log.info("Raw Event Handler started") |
||
410 | while True: |
||
411 | event = await self.buffers.raw.aget() |
||
412 | self.notify_listeners(event) |
||
413 | self.log.debug("Raw Event handler called") |
||
414 | |||
415 | if event.name == "kytos/core.shutdown": |
||
416 | 1 | self.log.debug("Raw Event handler stopped") |
|
417 | break |
||
418 | |||
419 | async def msg_in_event_handler(self): |
||
420 | """Handle msg_in events. |
||
421 | |||
422 | Listen to the msg_in buffer and send all its events to the |
||
423 | corresponding listeners. |
||
424 | """ |
||
425 | self.log.info("Message In Event Handler started") |
||
426 | while True: |
||
427 | event = await self.buffers.msg_in.aget() |
||
428 | self.notify_listeners(event) |
||
429 | self.log.debug("Message In Event handler called") |
||
430 | |||
431 | if event.name == "kytos/core.shutdown": |
||
432 | 1 | self.log.debug("Message In Event handler stopped") |
|
433 | break |
||
434 | |||
435 | async def msg_out_event_handler(self): |
||
436 | """Handle msg_out events. |
||
437 | |||
438 | Listen to the msg_out buffer and send all its events to the |
||
439 | corresponding listeners. |
||
440 | """ |
||
441 | self.log.info("Message Out Event Handler started") |
||
442 | while True: |
||
443 | triggered_event = await self.buffers.msg_out.aget() |
||
444 | |||
445 | if triggered_event.name == "kytos/core.shutdown": |
||
446 | self.log.debug("Message Out Event handler stopped") |
||
447 | break |
||
448 | |||
449 | message = triggered_event.content['message'] |
||
450 | destination = triggered_event.destination |
||
451 | if (destination and |
||
452 | not destination.state == ConnectionState.FINISHED): |
||
453 | packet = message.pack() |
||
454 | destination.send(packet) |
||
455 | self.log.debug('Connection %s: OUT OFP, ' |
||
456 | 'version: %s, type: %s, xid: %s - %s', |
||
457 | destination.id, |
||
458 | message.header.version, |
||
459 | message.header.message_type, |
||
460 | message.header.xid, |
||
461 | packet.hex()) |
||
462 | self.notify_listeners(triggered_event) |
||
463 | self.log.debug("Message Out Event handler called") |
||
464 | 1 | else: |
|
465 | self.log.info("connection closed. Cannot send message") |
||
466 | |||
467 | async def app_event_handler(self): |
||
468 | """Handle app events. |
||
469 | |||
470 | Listen to the app buffer and send all its events to the |
||
471 | corresponding listeners. |
||
472 | """ |
||
473 | self.log.info("App Event Handler started") |
||
474 | while True: |
||
475 | event = await self.buffers.app.aget() |
||
476 | self.notify_listeners(event) |
||
477 | self.log.debug("App Event handler called") |
||
478 | |||
479 | if event.name == "kytos/core.shutdown": |
||
480 | 1 | self.log.debug("App Event handler stopped") |
|
481 | break |
||
482 | |||
483 | def get_interface_by_id(self, interface_id): |
||
484 | """Find a Interface with interface_id. |
||
485 | |||
486 | Args: |
||
487 | interface_id(str): Interface Identifier. |
||
488 | |||
489 | Returns: |
||
490 | Interface: Instance of Interface with the id given. |
||
491 | |||
492 | """ |
||
493 | if interface_id is None: |
||
494 | return None |
||
495 | |||
496 | switch_id = ":".join(interface_id.split(":")[:-1]) |
||
497 | interface_number = int(interface_id.split(":")[-1]) |
||
498 | |||
499 | switch = self.switches.get(switch_id) |
||
500 | |||
501 | if not switch: |
||
502 | return None |
||
503 | 1 | ||
504 | return switch.interfaces.get(interface_number, None) |
||
505 | |||
506 | def get_switch_by_dpid(self, dpid): |
||
507 | """Return a specific switch by dpid. |
||
508 | |||
509 | Args: |
||
510 | dpid (|DPID|): dpid object used to identify a switch. |
||
511 | |||
512 | Returns: |
||
513 | :class:`~kytos.core.switch.Switch`: Switch with dpid specified. |
||
514 | |||
515 | 1 | """ |
|
516 | return self.switches.get(dpid) |
||
517 | |||
518 | def get_switch_or_create(self, dpid, connection): |
||
519 | """Return switch or create it if necessary. |
||
520 | |||
521 | Args: |
||
522 | dpid (|DPID|): dpid object used to identify a switch. |
||
523 | connection (:class:`~kytos.core.connection.Connection`): |
||
524 | connection used by switch. If a switch has a connection that |
||
525 | will be updated. |
||
526 | |||
527 | Returns: |
||
528 | :class:`~kytos.core.switch.Switch`: new or existent switch. |
||
529 | |||
530 | """ |
||
531 | self.create_or_update_connection(connection) |
||
532 | switch = self.get_switch_by_dpid(dpid) |
||
533 | event_name = 'kytos/core.switch.' |
||
534 | |||
535 | if switch is None: |
||
536 | switch = Switch(dpid=dpid) |
||
537 | self.add_new_switch(switch) |
||
538 | event_name += 'new' |
||
539 | else: |
||
540 | event_name += 'reconnected' |
||
541 | |||
542 | self.set_switch_options(dpid=dpid) |
||
543 | event = KytosEvent(name=event_name, content={'switch': switch}) |
||
544 | |||
545 | old_connection = switch.connection |
||
546 | switch.update_connection(connection) |
||
547 | |||
548 | if old_connection is not connection: |
||
549 | self.remove_connection(old_connection) |
||
550 | |||
551 | 1 | self.buffers.app.put(event) |
|
552 | |||
553 | return switch |
||
554 | |||
555 | def set_switch_options(self, dpid): |
||
556 | """Update the switch settings based on kytos.conf options. |
||
557 | |||
558 | Args: |
||
559 | dpid (str): dpid used to identify a switch. |
||
560 | 1 | ||
561 | """ |
||
562 | switch = self.switches.get(dpid) |
||
563 | if not switch: |
||
564 | return |
||
565 | |||
566 | vlan_pool = {} |
||
567 | try: |
||
568 | vlan_pool = json.loads(self.options.vlan_pool) |
||
569 | if not vlan_pool: |
||
570 | return |
||
571 | except (TypeError, json.JSONDecodeError) as err: |
||
572 | self.log.error("Invalid vlan_pool settings: %s", err) |
||
573 | 1 | ||
574 | if vlan_pool.get(dpid): |
||
575 | self.log.info(f"Loading vlan_pool configuration for dpid {dpid}") |
||
576 | for intf_num, port_list in vlan_pool[dpid].items(): |
||
577 | if not switch.interfaces.get((intf_num)): |
||
578 | vlan_ids = set() |
||
579 | for vlan_range in port_list: |
||
580 | (vlan_begin, vlan_end) = (vlan_range[0:2]) |
||
581 | for vlan_id in range(vlan_begin, vlan_end): |
||
582 | vlan_ids.add(vlan_id) |
||
583 | intf_num = int(intf_num) |
||
584 | intf = Interface(name=intf_num, port_number=intf_num, |
||
585 | switch=switch) |
||
586 | intf.set_available_tags(vlan_ids) |
||
587 | switch.update_interface(intf) |
||
588 | |||
589 | def create_or_update_connection(self, connection): |
||
590 | 1 | """Update a connection. |
|
591 | |||
592 | Args: |
||
593 | connection (:class:`~kytos.core.connection.Connection`): |
||
594 | Instance of connection that will be updated. |
||
595 | """ |
||
596 | self.connections[connection.id] = connection |
||
597 | |||
598 | def get_connection_by_id(self, conn_id): |
||
599 | """Return a existent connection by id. |
||
600 | |||
601 | Args: |
||
602 | id (int): id from a connection. |
||
603 | 1 | ||
604 | Returns: |
||
605 | :class:`~kytos.core.connection.Connection`: Instance of connection |
||
606 | or None Type. |
||
607 | |||
608 | """ |
||
609 | return self.connections.get(conn_id) |
||
610 | |||
611 | def remove_connection(self, connection): |
||
612 | """Close a existent connection and remove it. |
||
613 | |||
614 | Args: |
||
615 | connection (:class:`~kytos.core.connection.Connection`): |
||
616 | Instance of connection that will be removed. |
||
617 | """ |
||
618 | if connection is None: |
||
619 | return False |
||
620 | |||
621 | try: |
||
622 | connection.close() |
||
623 | del self.connections[connection.id] |
||
624 | except KeyError: |
||
625 | return False |
||
626 | return True |
||
627 | |||
628 | def remove_switch(self, switch): |
||
629 | 1 | """Remove an existent switch. |
|
630 | |||
631 | Args: |
||
632 | switch (:class:`~kytos.core.switch.Switch`): |
||
633 | Instance of switch that will be removed. |
||
634 | """ |
||
635 | try: |
||
636 | del self.switches[switch.dpid] |
||
637 | 1 | except KeyError: |
|
638 | return False |
||
639 | return True |
||
640 | |||
641 | def new_connection(self, event): |
||
642 | """Handle a kytos/core.connection.new event. |
||
643 | |||
644 | This method will read new connection event and store the connection |
||
645 | (socket) into the connections attribute on the controller. |
||
646 | |||
647 | It also clear all references to the connection since it is a new |
||
648 | connection on the same ip:port. |
||
649 | |||
650 | Args: |
||
651 | event (~kytos.core.KytosEvent): |
||
652 | The received event (``kytos/core.connection.new``) with the |
||
653 | needed info. |
||
654 | 1 | """ |
|
655 | self.log.info("Handling %s...", event) |
||
656 | |||
657 | connection = event.source |
||
658 | self.log.debug("Event source: %s", event.source) |
||
659 | |||
660 | # Remove old connection (aka cleanup) if it exists |
||
661 | if self.get_connection_by_id(connection.id): |
||
662 | self.remove_connection(connection.id) |
||
663 | |||
664 | # Update connections with the new connection |
||
665 | self.create_or_update_connection(connection) |
||
666 | |||
667 | def add_new_switch(self, switch): |
||
668 | """Add a new switch on the controller. |
||
669 | |||
670 | Args: |
||
671 | switch (Switch): A Switch object |
||
672 | """ |
||
673 | self.switches[switch.dpid] = switch |
||
674 | |||
675 | def _import_napp(self, username, napp_name): |
||
676 | """Import a NApp module. |
||
677 | |||
678 | Raises: |
||
679 | FileNotFoundError: if NApp's main.py is not found. |
||
680 | ModuleNotFoundError: if any NApp requirement is not installed. |
||
681 | |||
682 | """ |
||
683 | mod_name = '.'.join(['napps', username, napp_name, 'main']) |
||
684 | path = os.path.join(self.options.napps, username, napp_name, |
||
685 | 1 | 'main.py') |
|
686 | napp_spec = spec_from_file_location(mod_name, path) |
||
687 | napp_module = module_from_spec(napp_spec) |
||
688 | sys.modules[napp_spec.name] = napp_module |
||
689 | napp_spec.loader.exec_module(napp_module) |
||
690 | return napp_module |
||
691 | |||
692 | def load_napp(self, username, napp_name): |
||
693 | """Load a single NApp. |
||
694 | |||
695 | Args: |
||
696 | username (str): NApp username (makes up NApp's path). |
||
697 | napp_name (str): Name of the NApp to be loaded. |
||
698 | """ |
||
699 | 1 | if (username, napp_name) in self.napps: |
|
700 | message = 'NApp %s/%s was already loaded' |
||
701 | self.log.warning(message, username, napp_name) |
||
702 | return |
||
703 | |||
704 | try: |
||
705 | napp_module = self._import_napp(username, napp_name) |
||
706 | except ModuleNotFoundError as err: |
||
1 ignored issue
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
707 | self.log.error("Error loading NApp '%s/%s': %s", |
||
708 | username, napp_name, err) |
||
709 | return |
||
710 | 1 | except FileNotFoundError as err: |
|
711 | msg = "NApp module not found, assuming it's a meta napp: %s" |
||
712 | self.log.warning(msg, err.filename) |
||
713 | return |
||
714 | |||
715 | napp = napp_module.Main(controller=self) |
||
716 | |||
717 | 1 | self.napps[(username, napp_name)] = napp |
|
718 | |||
719 | 1 | napp.start() |
|
720 | self.api_server.register_napp_endpoints(napp) |
||
721 | |||
722 | 1 | # pylint: disable=protected-access |
|
723 | 1 | for event, listeners in napp._listeners.items(): |
|
724 | 1 | self.events_listeners.setdefault(event, []).extend(listeners) |
|
725 | 1 | # pylint: enable=protected-access |
|
726 | |||
727 | 1 | def pre_install_napps(self, napps, enable=True): |
|
728 | """Pre install and enable NApps. |
||
729 | |||
730 | 1 | Before installing, it'll check if it's installed yet. |
|
731 | |||
732 | Args: |
||
733 | napps ([str]): List of NApps to be pre-installed and enabled. |
||
734 | 1 | """ |
|
735 | all_napps = self.napps_manager.get_installed_napps() |
||
736 | installed = [str(napp) for napp in all_napps] |
||
737 | napps_diff = [napp for napp in napps if napp not in installed] |
||
738 | for napp in napps_diff: |
||
739 | self.napps_manager.install(napp, enable=enable) |
||
740 | |||
741 | def load_napps(self): |
||
742 | 1 | """Load all NApps enabled on the NApps dir.""" |
|
743 | for napp in self.napps_manager.get_enabled_napps(): |
||
744 | try: |
||
745 | self.log.info("Loading NApp %s", napp.id) |
||
746 | self.load_napp(napp.username, napp.name) |
||
747 | except FileNotFoundError as exception: |
||
748 | self.log.error("Could not load NApp %s: %s", |
||
749 | napp.id, exception) |
||
750 | |||
751 | 1 | def unload_napp(self, username, napp_name): |
|
752 | """Unload a specific NApp. |
||
753 | |||
754 | Args: |
||
755 | username (str): NApp username. |
||
756 | napp_name (str): Name of the NApp to be unloaded. |
||
757 | """ |
||
758 | napp = self.napps.pop((username, napp_name), None) |
||
759 | |||
760 | if napp is None: |
||
761 | self.log.warning('NApp %s/%s was not loaded', username, napp_name) |
||
762 | else: |
||
763 | self.log.info("Shutting down NApp %s/%s...", username, napp_name) |
||
764 | napp_id = NApp(username, napp_name).id |
||
765 | event = KytosEvent(name='kytos/core.shutdown.' + napp_id) |
||
766 | napp_shutdown_fn = self.events_listeners[event.name][0] |
||
767 | # Call listener before removing it from events_listeners |
||
768 | napp_shutdown_fn(event) |
||
769 | |||
770 | # Remove rest endpoints from that napp |
||
771 | self.api_server.remove_napp_endpoints(napp) |
||
772 | |||
773 | # Removing listeners from that napp |
||
774 | 1 | # pylint: disable=protected-access |
|
775 | for event_type, napp_listeners in napp._listeners.items(): |
||
776 | event_listeners = self.events_listeners[event_type] |
||
777 | for listener in napp_listeners: |
||
778 | event_listeners.remove(listener) |
||
779 | 1 | if not event_listeners: |
|
780 | del self.events_listeners[event_type] |
||
781 | # pylint: enable=protected-access |
||
782 | |||
783 | def unload_napps(self): |
||
784 | """Unload all loaded NApps that are not core NApps.""" |
||
785 | # list() is used here to avoid the error: |
||
786 | # 'RuntimeError: dictionary changed size during iteration' |
||
787 | # This is caused by looping over an dictionary while removing |
||
788 | # items from it. |
||
789 | for (username, napp_name) in list(self.napps.keys()): # noqa |
||
790 | self.unload_napp(username, napp_name) |
||
791 | |||
792 | def reload_napp_module(self, username, napp_name, napp_file): |
||
793 | """Reload a NApp Module.""" |
||
794 | mod_name = '.'.join(['napps', username, napp_name, napp_file]) |
||
795 | try: |
||
796 | napp_module = import_module(mod_name) |
||
797 | except ModuleNotFoundError as err: |
||
1 ignored issue
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
798 | self.log.error("Module '%s' not found", mod_name) |
||
799 | raise |
||
800 | try: |
||
801 | napp_module = reload_module(napp_module) |
||
802 | except ImportError as err: |
||
803 | self.log.error("Error reloading NApp '%s/%s': %s", |
||
804 | username, napp_name, err) |
||
805 | raise |
||
806 | |||
807 | def reload_napp(self, username, napp_name): |
||
808 | """Reload a NApp.""" |
||
809 | self.unload_napp(username, napp_name) |
||
810 | try: |
||
811 | self.reload_napp_module(username, napp_name, 'settings') |
||
812 | self.reload_napp_module(username, napp_name, 'main') |
||
813 | except (ModuleNotFoundError, ImportError): |
||
1 ignored issue
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
814 | return 400 |
||
815 | self.log.info("NApp '%s/%s' successfully reloaded", |
||
816 | username, napp_name) |
||
817 | self.load_napp(username, napp_name) |
||
818 | return 200 |
||
819 | |||
820 | def rest_reload_napp(self, username, napp_name): |
||
821 | """Request reload a NApp.""" |
||
822 | res = self.reload_napp(username, napp_name) |
||
823 | return 'reloaded', res |
||
824 | |||
825 | def rest_reload_all_napps(self): |
||
826 | """Request reload all NApps.""" |
||
827 | for napp in self.napps: |
||
828 | self.reload_napp(*napp) |
||
829 | return 'reloaded', 200 |
||
830 |