1 | """Kytos SDN Platform main class. |
||
2 | |||
3 | This module contains the main class of Kytos, which is |
||
4 | :class:`~.core.Controller`. |
||
5 | |||
6 | Basic usage: |
||
7 | |||
8 | .. code-block:: python3 |
||
9 | |||
10 | from kytos.config import KytosConfig |
||
11 | from kytos.core import Controller |
||
12 | config = KytosConfig() |
||
13 | controller = Controller(config.options) |
||
14 | controller.start() |
||
15 | """ |
||
16 | 1 | import asyncio |
|
17 | 1 | import atexit |
|
18 | 1 | import json |
|
19 | 1 | import logging |
|
20 | 1 | import os |
|
21 | 1 | import re |
|
22 | 1 | import sys |
|
23 | 1 | import threading |
|
24 | 1 | from concurrent.futures import ThreadPoolExecutor |
|
25 | 1 | from importlib import import_module |
|
26 | 1 | from importlib import reload as reload_module |
|
27 | 1 | from importlib.util import module_from_spec, spec_from_file_location |
|
28 | 1 | from pathlib import Path |
|
29 | |||
30 | 1 | from kytos.core.api_server import APIServer |
|
31 | # from kytos.core.tcp_server import KytosRequestHandler, KytosServer |
||
32 | 1 | from kytos.core.atcp_server import KytosServer, KytosServerProtocol |
|
33 | 1 | from kytos.core.buffers import KytosBuffers |
|
34 | 1 | from kytos.core.config import KytosConfig |
|
35 | 1 | from kytos.core.connection import ConnectionState |
|
36 | 1 | from kytos.core.events import KytosEvent |
|
37 | 1 | from kytos.core.helpers import now |
|
38 | 1 | from kytos.core.interface import Interface |
|
39 | 1 | from kytos.core.logs import LogManager |
|
40 | 1 | from kytos.core.napps.base import NApp |
|
41 | 1 | from kytos.core.napps.manager import NAppsManager |
|
42 | 1 | from kytos.core.napps.napp_dir_listener import NAppDirListener |
|
43 | 1 | from kytos.core.switch import Switch |
|
44 | |||
45 | 1 | __all__ = ('Controller',) |
|
46 | |||
47 | |||
48 | 1 | class Controller: |
|
49 | """Main class of Kytos. |
||
50 | |||
51 | The main responsibilities of this class are: |
||
52 | - start a thread with :class:`~.core.tcp_server.KytosServer`; |
||
53 | - manage KytosNApps (install, load and unload); |
||
54 | - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`); |
||
55 | - manage which event should be sent to NApps methods; |
||
56 | - manage the buffers handlers, considering one thread per handler. |
||
57 | """ |
||
58 | |||
59 | # Created issue #568 for the disabled checks. |
||
60 | # pylint: disable=too-many-instance-attributes,too-many-public-methods |
||
61 | 1 | def __init__(self, options=None, loop=None): |
|
62 | """Init method of Controller class takes the parameters below. |
||
63 | |||
64 | Args: |
||
65 | options (:attr:`ParseArgs.args`): :attr:`options` attribute from an |
||
66 | instance of :class:`~kytos.core.config.KytosConfig` class. |
||
67 | """ |
||
68 | 1 | if options is None: |
|
69 | options = KytosConfig().options['daemon'] |
||
70 | |||
71 | 1 | self._loop = loop or asyncio.get_event_loop() |
|
72 | 1 | self._pool = ThreadPoolExecutor(max_workers=1) |
|
73 | |||
74 | #: dict: keep the main threads of the controller (buffers and handler) |
||
75 | 1 | self._threads = {} |
|
76 | #: KytosBuffers: KytosBuffer object with Controller buffers |
||
77 | 1 | self.buffers = KytosBuffers(loop=self._loop) |
|
78 | #: dict: keep track of the socket connections labeled by ``(ip, port)`` |
||
79 | #: |
||
80 | #: This dict stores all connections between the controller and the |
||
81 | #: switches. The key for this dict is a tuple (ip, port). The content |
||
82 | #: is another dict with the connection information. |
||
83 | 1 | self.connections = {} |
|
84 | #: dict: mapping of events and event listeners. |
||
85 | #: |
||
86 | #: The key of the dict is a KytosEvent (or a string that represent a |
||
87 | #: regex to match agains KytosEvents) and the value is a list of |
||
88 | #: methods that will receive the referenced event |
||
89 | 1 | self.events_listeners = {'kytos/core.connection.new': |
|
90 | [self.new_connection]} |
||
91 | |||
92 | #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance) |
||
93 | #: |
||
94 | #: The key is the napp name (string), while the value is the napp |
||
95 | #: instance itself. |
||
96 | 1 | self.napps = {} |
|
97 | #: Object generated by ParseArgs on config.py file |
||
98 | 1 | self.options = options |
|
99 | #: KytosServer: Instance of KytosServer that will be listening to TCP |
||
100 | #: connections. |
||
101 | 1 | self.server = None |
|
102 | #: dict: Current existing switches. |
||
103 | #: |
||
104 | #: The key is the switch dpid, while the value is a Switch object. |
||
105 | 1 | self.switches = {} # dpid: Switch() |
|
106 | |||
107 | #: datetime.datetime: Time when the controller finished starting. |
||
108 | 1 | self.started_at = None |
|
109 | |||
110 | #: logging.Logger: Logger instance used by Kytos. |
||
111 | 1 | self.log = None |
|
112 | |||
113 | #: Observer that handle NApps when they are enabled or disabled. |
||
114 | 1 | self.napp_dir_listener = NAppDirListener(self) |
|
115 | |||
116 | 1 | self.napps_manager = NAppsManager(self) |
|
117 | |||
118 | #: API Server used to expose rest endpoints. |
||
119 | 1 | self.api_server = APIServer(__name__, self.options.listen, |
|
120 | self.options.api_port, |
||
121 | self.napps_manager, self.options.napps) |
||
122 | |||
123 | 1 | self._register_endpoints() |
|
124 | #: Adding the napps 'enabled' directory into the PATH |
||
125 | #: Now you can access the enabled napps with: |
||
126 | #: from napps.<username>.<napp_name> import ?.... |
||
127 | 1 | sys.path.append(os.path.join(self.options.napps, os.pardir)) |
|
128 | |||
129 | 1 | def enable_logs(self): |
|
130 | """Register kytos log and enable the logs.""" |
||
131 | 1 | LogManager.load_config_file(self.options.logging, self.options.debug) |
|
132 | 1 | LogManager.enable_websocket(self.api_server.server) |
|
133 | 1 | self.log = logging.getLogger("controller") |
|
134 | |||
135 | 1 | def start(self, restart=False): |
|
136 | """Create pidfile and call start_controller method.""" |
||
137 | self.enable_logs() |
||
138 | if not restart: |
||
139 | self.create_pidfile() |
||
140 | self.start_controller() |
||
141 | |||
142 | 1 | def create_pidfile(self): |
|
143 | """Create a pidfile.""" |
||
144 | pid = os.getpid() |
||
145 | |||
146 | # Creates directory if it doesn't exist |
||
147 | # System can erase /var/run's content |
||
148 | pid_folder = Path(self.options.pidfile).parent |
||
149 | self.log.info(pid_folder) |
||
150 | |||
151 | # Pylint incorrectly infers Path objects |
||
152 | # https://github.com/PyCQA/pylint/issues/224 |
||
153 | # pylint: disable=no-member |
||
154 | if not pid_folder.exists(): |
||
155 | pid_folder.mkdir() |
||
156 | pid_folder.chmod(0o1777) |
||
157 | # pylint: enable=no-member |
||
158 | |||
159 | # Make sure the file is deleted when controller stops |
||
160 | atexit.register(Path(self.options.pidfile).unlink) |
||
161 | |||
162 | # Checks if a pidfile exists. Creates a new file. |
||
163 | try: |
||
164 | pidfile = open(self.options.pidfile, mode='x') |
||
165 | except OSError: |
||
166 | # This happens if there is a pidfile already. |
||
167 | # We shall check if the process that created the pidfile is still |
||
168 | # running. |
||
169 | try: |
||
170 | existing_file = open(self.options.pidfile, mode='r') |
||
171 | old_pid = int(existing_file.read()) |
||
172 | os.kill(old_pid, 0) |
||
173 | # If kill() doesn't return an error, there's a process running |
||
174 | # with the same PID. We assume it is Kytos and quit. |
||
175 | # Otherwise, overwrite the file and proceed. |
||
176 | error_msg = ("PID file {} exists. Delete it if Kytos is not " |
||
177 | "running. Aborting.") |
||
178 | sys.exit(error_msg.format(self.options.pidfile)) |
||
179 | except OSError: |
||
180 | try: |
||
181 | pidfile = open(self.options.pidfile, mode='w') |
||
182 | except OSError as exception: |
||
183 | error_msg = "Failed to create pidfile {}: {}." |
||
184 | sys.exit(error_msg.format(self.options.pidfile, exception)) |
||
185 | |||
186 | # Identifies the process that created the pidfile. |
||
187 | pidfile.write(str(pid)) |
||
188 | pidfile.close() |
||
189 | |||
190 | 1 | def start_controller(self): |
|
191 | """Start the controller. |
||
192 | |||
193 | Starts the KytosServer (TCP Server) coroutine. |
||
194 | Starts a thread for each buffer handler. |
||
195 | Load the installed apps. |
||
196 | """ |
||
197 | self.log.info("Starting Kytos - Kytos Controller") |
||
198 | self.server = KytosServer((self.options.listen, |
||
199 | int(self.options.port)), |
||
200 | KytosServerProtocol, |
||
201 | self, |
||
202 | self.options.protocol_name) |
||
203 | |||
204 | self.log.info("Starting TCP server: %s", self.server) |
||
205 | self.server.serve_forever() |
||
206 | |||
207 | def _stop_loop(_): |
||
208 | loop = asyncio.get_event_loop() |
||
209 | # print(_.result()) |
||
210 | threads = threading.enumerate() |
||
211 | self.log.debug("%s threads before loop.stop: %s", |
||
212 | len(threads), threads) |
||
213 | loop.stop() |
||
214 | |||
215 | async def _run_api_server_thread(executor): |
||
216 | log = logging.getLogger('controller.api_server_thread') |
||
217 | log.debug('starting') |
||
218 | # log.debug('creating tasks') |
||
219 | loop = asyncio.get_event_loop() |
||
220 | blocking_tasks = [ |
||
221 | loop.run_in_executor(executor, self.api_server.run) |
||
222 | ] |
||
223 | # log.debug('waiting for tasks') |
||
224 | completed, pending = await asyncio.wait(blocking_tasks) |
||
225 | # results = [t.result() for t in completed] |
||
226 | # log.debug('results: {!r}'.format(results)) |
||
227 | log.debug('completed: %d, pending: %d', |
||
228 | len(completed), len(pending)) |
||
229 | |||
230 | task = self._loop.create_task(self.raw_event_handler()) |
||
231 | task = self._loop.create_task(self.msg_in_event_handler()) |
||
232 | task = self._loop.create_task(self.msg_out_event_handler()) |
||
233 | task = self._loop.create_task(self.app_event_handler()) |
||
234 | task = self._loop.create_task(_run_api_server_thread(self._pool)) |
||
235 | task.add_done_callback(_stop_loop) |
||
236 | |||
237 | self.log.info("ThreadPool started: %s", self._pool) |
||
238 | |||
239 | # ASYNC TODO: ensure all threads started correctly |
||
240 | # This is critical, if any of them failed starting we should exit. |
||
241 | # sys.exit(error_msg.format(thread, exception)) |
||
242 | |||
243 | self.log.info("Loading Kytos NApps...") |
||
244 | self.napp_dir_listener.start() |
||
245 | self.pre_install_napps(self.options.napps_pre_installed) |
||
246 | self.load_napps() |
||
247 | |||
248 | self.started_at = now() |
||
249 | |||
250 | 1 | def _register_endpoints(self): |
|
251 | """Register all rest endpoint served by kytos. |
||
252 | |||
253 | - Register APIServer endpoints |
||
254 | - Register WebUI endpoints |
||
255 | - Register ``/api/kytos/core/config`` endpoint |
||
256 | """ |
||
257 | 1 | self.api_server.start_api() |
|
258 | # Register controller endpoints as /api/kytos/core/... |
||
259 | 1 | self.api_server.register_core_endpoint('config/', |
|
260 | self.configuration_endpoint) |
||
261 | |||
262 | 1 | self.api_server.register_core_endpoint( |
|
263 | 'reload/<username>/<napp_name>/', |
||
264 | self.rest_reload_napp) |
||
265 | 1 | self.api_server.register_core_endpoint('reload/all', |
|
266 | self.rest_reload_all_napps) |
||
267 | |||
268 | 1 | def register_rest_endpoint(self, url, function, methods): |
|
269 | """Deprecate in favor of @rest decorator.""" |
||
270 | 1 | self.api_server.register_rest_endpoint(url, function, methods) |
|
271 | |||
272 | 1 | def configuration_endpoint(self): |
|
273 | """Return the configuration options used by Kytos. |
||
274 | |||
275 | Returns: |
||
276 | string: Json with current configurations used by kytos. |
||
277 | |||
278 | """ |
||
279 | 1 | return json.dumps(self.options.__dict__) |
|
280 | |||
281 | 1 | def restart(self, graceful=True): |
|
282 | """Restart Kytos SDN Controller. |
||
283 | |||
284 | Args: |
||
285 | graceful(bool): Represents the way that Kytos will restart. |
||
286 | """ |
||
287 | if self.started_at is not None: |
||
288 | self.stop(graceful) |
||
289 | self.__init__(self.options) |
||
290 | |||
291 | self.start(restart=True) |
||
292 | |||
293 | 1 | def stop(self, graceful=True): |
|
294 | """Shutdown all services used by kytos. |
||
295 | |||
296 | This method should: |
||
297 | - stop all Websockets |
||
298 | - stop the API Server |
||
299 | - stop the Controller |
||
300 | """ |
||
301 | if self.started_at: |
||
302 | self.stop_controller(graceful) |
||
303 | |||
304 | 1 | def stop_controller(self, graceful=True): |
|
305 | """Stop the controller. |
||
306 | |||
307 | This method should: |
||
308 | - announce on the network that the controller will shutdown; |
||
309 | - stop receiving incoming packages; |
||
310 | - call the 'shutdown' method of each KytosNApp that is running; |
||
311 | - finish reading the events on all buffers; |
||
312 | - stop each running handler; |
||
313 | - stop all running threads; |
||
314 | - stop the KytosServer; |
||
315 | """ |
||
316 | self.log.info("Stopping Kytos") |
||
317 | |||
318 | self.buffers.send_stop_signal() |
||
319 | self.api_server.stop_api_server() |
||
320 | self.napp_dir_listener.stop() |
||
321 | |||
322 | self.log.info("Stopping threadpool: %s", self._pool) |
||
323 | |||
324 | threads = threading.enumerate() |
||
325 | self.log.debug("%s threads before threadpool shutdown: %s", |
||
326 | len(threads), threads) |
||
327 | |||
328 | self._pool.shutdown(wait=graceful) |
||
329 | |||
330 | # self.server.socket.shutdown() |
||
331 | # self.server.socket.close() |
||
332 | |||
333 | # for thread in self._threads.values(): |
||
334 | # self.log.info("Stopping thread: %s", thread.name) |
||
335 | # thread.join() |
||
336 | |||
337 | # for thread in self._threads.values(): |
||
338 | # while thread.is_alive(): |
||
339 | # self.log.info("Thread is alive: %s", thread.name) |
||
340 | # pass |
||
341 | |||
342 | self.started_at = None |
||
343 | self.unload_napps() |
||
344 | self.buffers = KytosBuffers() |
||
345 | |||
346 | # ASYNC TODO: close connections |
||
347 | # self.server.server_close() |
||
348 | |||
349 | # Shutdown the TCP server and the main asyncio loop |
||
350 | self.server.shutdown() |
||
351 | |||
352 | 1 | def status(self): |
|
353 | """Return status of Kytos Server. |
||
354 | |||
355 | If the controller kytos is running this method will be returned |
||
356 | "Running since 'Started_At'", otherwise "Stopped". |
||
357 | |||
358 | Returns: |
||
359 | string: String with kytos status. |
||
360 | |||
361 | """ |
||
362 | if self.started_at: |
||
363 | return "Running since %s" % self.started_at |
||
364 | return "Stopped" |
||
365 | |||
366 | 1 | def uptime(self): |
|
367 | """Return the uptime of kytos server. |
||
368 | |||
369 | This method should return: |
||
370 | - 0 if Kytos Server is stopped. |
||
371 | - (kytos.start_at - datetime.now) if Kytos Server is running. |
||
372 | |||
373 | Returns: |
||
374 | datetime.timedelta: The uptime interval. |
||
375 | |||
376 | """ |
||
377 | return now() - self.started_at if self.started_at else 0 |
||
378 | |||
379 | 1 | def notify_listeners(self, event): |
|
380 | """Send the event to the specified listeners. |
||
381 | |||
382 | Loops over self.events_listeners matching (by regexp) the attribute |
||
383 | name of the event with the keys of events_listeners. If a match occurs, |
||
384 | then send the event to each registered listener. |
||
385 | |||
386 | Args: |
||
387 | event (~kytos.core.KytosEvent): An instance of a KytosEvent. |
||
388 | """ |
||
389 | self.log.debug("looking for listeners for %s", event) |
||
390 | for event_regex, listeners in dict(self.events_listeners).items(): |
||
391 | # self.log.debug("listeners found for %s: %r => %s", event, |
||
392 | # event_regex, [l.__qualname__ for l in listeners]) |
||
393 | # Do not match if the event has more characters |
||
394 | # e.g. "shutdown" won't match "shutdown.kytos/of_core" |
||
395 | if event_regex[-1] != '$' or event_regex[-2] == '\\': |
||
396 | event_regex += '$' |
||
397 | if re.match(event_regex, event.name): |
||
398 | # self.log.debug('Calling listeners for %s', event) |
||
399 | for listener in listeners: |
||
400 | listener(event) |
||
401 | |||
402 | 1 | async def raw_event_handler(self): |
|
403 | """Handle raw events. |
||
404 | |||
405 | Listen to the raw_buffer and send all its events to the |
||
406 | corresponding listeners. |
||
407 | """ |
||
408 | self.log.info("Raw Event Handler started") |
||
409 | while True: |
||
410 | event = await self.buffers.raw.aget() |
||
411 | self.notify_listeners(event) |
||
412 | self.log.debug("Raw Event handler called") |
||
413 | |||
414 | if event.name == "kytos/core.shutdown": |
||
415 | self.log.debug("Raw Event handler stopped") |
||
416 | break |
||
417 | |||
418 | 1 | async def msg_in_event_handler(self): |
|
419 | """Handle msg_in events. |
||
420 | |||
421 | Listen to the msg_in buffer and send all its events to the |
||
422 | corresponding listeners. |
||
423 | """ |
||
424 | self.log.info("Message In Event Handler started") |
||
425 | while True: |
||
426 | event = await self.buffers.msg_in.aget() |
||
427 | self.notify_listeners(event) |
||
428 | self.log.debug("Message In Event handler called") |
||
429 | |||
430 | if event.name == "kytos/core.shutdown": |
||
431 | self.log.debug("Message In Event handler stopped") |
||
432 | break |
||
433 | |||
434 | 1 | async def msg_out_event_handler(self): |
|
435 | """Handle msg_out events. |
||
436 | |||
437 | Listen to the msg_out buffer and send all its events to the |
||
438 | corresponding listeners. |
||
439 | """ |
||
440 | self.log.info("Message Out Event Handler started") |
||
441 | while True: |
||
442 | triggered_event = await self.buffers.msg_out.aget() |
||
443 | |||
444 | if triggered_event.name == "kytos/core.shutdown": |
||
445 | self.log.debug("Message Out Event handler stopped") |
||
446 | break |
||
447 | |||
448 | message = triggered_event.content['message'] |
||
449 | destination = triggered_event.destination |
||
450 | if (destination and |
||
451 | not destination.state == ConnectionState.FINISHED): |
||
452 | packet = message.pack() |
||
453 | destination.send(packet) |
||
454 | self.log.debug('Connection %s: OUT OFP, ' |
||
455 | 'version: %s, type: %s, xid: %s - %s', |
||
456 | destination.id, |
||
457 | message.header.version, |
||
458 | message.header.message_type, |
||
459 | message.header.xid, |
||
460 | packet.hex()) |
||
461 | self.notify_listeners(triggered_event) |
||
462 | self.log.debug("Message Out Event handler called") |
||
463 | else: |
||
464 | self.log.info("connection closed. Cannot send message") |
||
465 | |||
466 | 1 | async def app_event_handler(self): |
|
467 | """Handle app events. |
||
468 | |||
469 | Listen to the app buffer and send all its events to the |
||
470 | corresponding listeners. |
||
471 | """ |
||
472 | self.log.info("App Event Handler started") |
||
473 | while True: |
||
474 | event = await self.buffers.app.aget() |
||
475 | self.notify_listeners(event) |
||
476 | self.log.debug("App Event handler called") |
||
477 | |||
478 | if event.name == "kytos/core.shutdown": |
||
479 | self.log.debug("App Event handler stopped") |
||
480 | break |
||
481 | |||
482 | 1 | def get_interface_by_id(self, interface_id): |
|
483 | """Find a Interface with interface_id. |
||
484 | |||
485 | Args: |
||
486 | interface_id(str): Interface Identifier. |
||
487 | |||
488 | Returns: |
||
489 | Interface: Instance of Interface with the id given. |
||
490 | |||
491 | """ |
||
492 | if interface_id is None: |
||
493 | return None |
||
494 | |||
495 | switch_id = ":".join(interface_id.split(":")[:-1]) |
||
496 | interface_number = int(interface_id.split(":")[-1]) |
||
497 | |||
498 | switch = self.switches.get(switch_id) |
||
499 | |||
500 | if not switch: |
||
501 | return None |
||
502 | |||
503 | return switch.interfaces.get(interface_number, None) |
||
504 | |||
505 | 1 | def get_switch_by_dpid(self, dpid): |
|
506 | """Return a specific switch by dpid. |
||
507 | |||
508 | Args: |
||
509 | dpid (|DPID|): dpid object used to identify a switch. |
||
510 | |||
511 | Returns: |
||
512 | :class:`~kytos.core.switch.Switch`: Switch with dpid specified. |
||
513 | |||
514 | """ |
||
515 | 1 | return self.switches.get(dpid) |
|
516 | |||
517 | 1 | def get_switch_or_create(self, dpid, connection): |
|
518 | """Return switch or create it if necessary. |
||
519 | |||
520 | Args: |
||
521 | dpid (|DPID|): dpid object used to identify a switch. |
||
522 | connection (:class:`~kytos.core.connection.Connection`): |
||
523 | connection used by switch. If a switch has a connection that |
||
524 | will be updated. |
||
525 | |||
526 | Returns: |
||
527 | :class:`~kytos.core.switch.Switch`: new or existent switch. |
||
528 | |||
529 | """ |
||
530 | 1 | self.create_or_update_connection(connection) |
|
531 | 1 | switch = self.get_switch_by_dpid(dpid) |
|
532 | 1 | event_name = 'kytos/core.switch.' |
|
533 | |||
534 | 1 | if switch is None: |
|
535 | switch = Switch(dpid=dpid) |
||
536 | self.add_new_switch(switch) |
||
537 | event_name += 'new' |
||
538 | else: |
||
539 | 1 | event_name += 'reconnected' |
|
540 | |||
541 | 1 | self.set_switch_options(dpid=dpid) |
|
542 | 1 | event = KytosEvent(name=event_name, content={'switch': switch}) |
|
543 | |||
544 | 1 | old_connection = switch.connection |
|
545 | 1 | switch.update_connection(connection) |
|
546 | |||
547 | 1 | if old_connection is not connection: |
|
548 | self.remove_connection(old_connection) |
||
549 | |||
550 | 1 | self.buffers.app.put(event) |
|
551 | |||
552 | 1 | return switch |
|
553 | |||
554 | 1 | def set_switch_options(self, dpid): |
|
555 | """Update the switch settings based on kytos.conf options. |
||
556 | |||
557 | Args: |
||
558 | dpid (str): dpid used to identify a switch. |
||
559 | |||
560 | """ |
||
561 | 1 | switch = self.switches.get(dpid) |
|
562 | 1 | if not switch: |
|
563 | return |
||
564 | |||
565 | 1 | vlan_pool = {} |
|
566 | 1 | try: |
|
567 | 1 | vlan_pool = json.loads(self.options.vlan_pool) |
|
568 | 1 | if not vlan_pool: |
|
569 | return |
||
570 | except (TypeError, json.JSONDecodeError) as err: |
||
571 | self.log.error("Invalid vlan_pool settings: %s", err) |
||
572 | |||
573 | 1 | if vlan_pool.get(dpid): |
|
574 | 1 | self.log.info("Loading vlan_pool configuration for dpid %s", dpid) |
|
575 | 1 | for intf_num, port_list in vlan_pool[dpid].items(): |
|
576 | 1 | if not switch.interfaces.get((intf_num)): |
|
577 | 1 | vlan_ids = set() |
|
578 | 1 | for vlan_range in port_list: |
|
579 | 1 | (vlan_begin, vlan_end) = (vlan_range[0:2]) |
|
580 | 1 | for vlan_id in range(vlan_begin, vlan_end): |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
581 | 1 | vlan_ids.add(vlan_id) |
|
582 | 1 | intf_num = int(intf_num) |
|
583 | 1 | intf = Interface(name=intf_num, port_number=intf_num, |
|
584 | switch=switch) |
||
585 | 1 | intf.set_available_tags(vlan_ids) |
|
586 | 1 | switch.update_interface(intf) |
|
587 | |||
588 | 1 | def create_or_update_connection(self, connection): |
|
589 | """Update a connection. |
||
590 | |||
591 | Args: |
||
592 | connection (:class:`~kytos.core.connection.Connection`): |
||
593 | Instance of connection that will be updated. |
||
594 | """ |
||
595 | 1 | self.connections[connection.id] = connection |
|
596 | |||
597 | 1 | def get_connection_by_id(self, conn_id): |
|
598 | """Return a existent connection by id. |
||
599 | |||
600 | Args: |
||
601 | id (int): id from a connection. |
||
602 | |||
603 | Returns: |
||
604 | :class:`~kytos.core.connection.Connection`: Instance of connection |
||
605 | or None Type. |
||
606 | |||
607 | """ |
||
608 | return self.connections.get(conn_id) |
||
609 | |||
610 | 1 | def remove_connection(self, connection): |
|
611 | """Close a existent connection and remove it. |
||
612 | |||
613 | Args: |
||
614 | connection (:class:`~kytos.core.connection.Connection`): |
||
615 | Instance of connection that will be removed. |
||
616 | """ |
||
617 | if connection is None: |
||
618 | return False |
||
619 | |||
620 | try: |
||
621 | connection.close() |
||
622 | del self.connections[connection.id] |
||
623 | except KeyError: |
||
624 | return False |
||
625 | return True |
||
626 | |||
627 | 1 | def remove_switch(self, switch): |
|
628 | """Remove an existent switch. |
||
629 | |||
630 | Args: |
||
631 | switch (:class:`~kytos.core.switch.Switch`): |
||
632 | Instance of switch that will be removed. |
||
633 | """ |
||
634 | try: |
||
635 | del self.switches[switch.dpid] |
||
636 | except KeyError: |
||
637 | return False |
||
638 | return True |
||
639 | |||
640 | 1 | def new_connection(self, event): |
|
641 | """Handle a kytos/core.connection.new event. |
||
642 | |||
643 | This method will read new connection event and store the connection |
||
644 | (socket) into the connections attribute on the controller. |
||
645 | |||
646 | It also clear all references to the connection since it is a new |
||
647 | connection on the same ip:port. |
||
648 | |||
649 | Args: |
||
650 | event (~kytos.core.KytosEvent): |
||
651 | The received event (``kytos/core.connection.new``) with the |
||
652 | needed info. |
||
653 | """ |
||
654 | self.log.info("Handling %s...", event) |
||
655 | |||
656 | connection = event.source |
||
657 | self.log.debug("Event source: %s", event.source) |
||
658 | |||
659 | # Remove old connection (aka cleanup) if it exists |
||
660 | if self.get_connection_by_id(connection.id): |
||
661 | self.remove_connection(connection.id) |
||
662 | |||
663 | # Update connections with the new connection |
||
664 | self.create_or_update_connection(connection) |
||
665 | |||
666 | 1 | def add_new_switch(self, switch): |
|
667 | """Add a new switch on the controller. |
||
668 | |||
669 | Args: |
||
670 | switch (Switch): A Switch object |
||
671 | """ |
||
672 | self.switches[switch.dpid] = switch |
||
673 | |||
674 | 1 | def _import_napp(self, username, napp_name): |
|
675 | """Import a NApp module. |
||
676 | |||
677 | Raises: |
||
678 | FileNotFoundError: if NApp's main.py is not found. |
||
679 | ModuleNotFoundError: if any NApp requirement is not installed. |
||
680 | |||
681 | """ |
||
682 | mod_name = '.'.join(['napps', username, napp_name, 'main']) |
||
683 | path = os.path.join(self.options.napps, username, napp_name, |
||
684 | 'main.py') |
||
685 | napp_spec = spec_from_file_location(mod_name, path) |
||
686 | napp_module = module_from_spec(napp_spec) |
||
687 | sys.modules[napp_spec.name] = napp_module |
||
688 | napp_spec.loader.exec_module(napp_module) |
||
689 | return napp_module |
||
690 | |||
691 | 1 | def load_napp(self, username, napp_name): |
|
692 | """Load a single NApp. |
||
693 | |||
694 | Args: |
||
695 | username (str): NApp username (makes up NApp's path). |
||
696 | napp_name (str): Name of the NApp to be loaded. |
||
697 | """ |
||
698 | if (username, napp_name) in self.napps: |
||
699 | message = 'NApp %s/%s was already loaded' |
||
700 | self.log.warning(message, username, napp_name) |
||
701 | return |
||
702 | |||
703 | try: |
||
704 | napp_module = self._import_napp(username, napp_name) |
||
705 | except ModuleNotFoundError as err: |
||
706 | self.log.error("Error loading NApp '%s/%s': %s", |
||
707 | username, napp_name, err) |
||
708 | return |
||
709 | except FileNotFoundError as err: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
710 | msg = "NApp module not found, assuming it's a meta napp: %s" |
||
711 | self.log.warning(msg, err.filename) |
||
712 | return |
||
713 | |||
714 | napp = napp_module.Main(controller=self) |
||
715 | |||
716 | self.napps[(username, napp_name)] = napp |
||
717 | |||
718 | napp.start() |
||
719 | self.api_server.register_napp_endpoints(napp) |
||
720 | |||
721 | # pylint: disable=protected-access |
||
722 | for event, listeners in napp._listeners.items(): |
||
723 | self.events_listeners.setdefault(event, []).extend(listeners) |
||
724 | # pylint: enable=protected-access |
||
725 | |||
726 | 1 | def pre_install_napps(self, napps, enable=True): |
|
727 | """Pre install and enable NApps. |
||
728 | |||
729 | Before installing, it'll check if it's installed yet. |
||
730 | |||
731 | Args: |
||
732 | napps ([str]): List of NApps to be pre-installed and enabled. |
||
733 | """ |
||
734 | all_napps = self.napps_manager.get_installed_napps() |
||
735 | installed = [str(napp) for napp in all_napps] |
||
736 | napps_diff = [napp for napp in napps if napp not in installed] |
||
737 | for napp in napps_diff: |
||
738 | self.napps_manager.install(napp, enable=enable) |
||
739 | |||
740 | 1 | def load_napps(self): |
|
741 | """Load all NApps enabled on the NApps dir.""" |
||
742 | for napp in self.napps_manager.get_enabled_napps(): |
||
743 | try: |
||
744 | self.log.info("Loading NApp %s", napp.id) |
||
745 | self.load_napp(napp.username, napp.name) |
||
746 | except FileNotFoundError as exception: |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
|
|||
747 | self.log.error("Could not load NApp %s: %s", |
||
748 | napp.id, exception) |
||
749 | |||
750 | 1 | def unload_napp(self, username, napp_name): |
|
751 | """Unload a specific NApp. |
||
752 | |||
753 | Args: |
||
754 | username (str): NApp username. |
||
755 | napp_name (str): Name of the NApp to be unloaded. |
||
756 | """ |
||
757 | 1 | napp = self.napps.pop((username, napp_name), None) |
|
758 | |||
759 | 1 | if napp is None: |
|
760 | self.log.warning('NApp %s/%s was not loaded', username, napp_name) |
||
761 | else: |
||
762 | 1 | self.log.info("Shutting down NApp %s/%s...", username, napp_name) |
|
763 | 1 | napp_id = NApp(username, napp_name).id |
|
764 | 1 | event = KytosEvent(name='kytos/core.shutdown.' + napp_id) |
|
765 | 1 | napp_shutdown_fn = self.events_listeners[event.name][0] |
|
766 | # Call listener before removing it from events_listeners |
||
767 | 1 | napp_shutdown_fn(event) |
|
768 | |||
769 | # Remove rest endpoints from that napp |
||
770 | 1 | self.api_server.remove_napp_endpoints(napp) |
|
771 | |||
772 | # Removing listeners from that napp |
||
773 | # pylint: disable=protected-access |
||
774 | 1 | for event_type, napp_listeners in napp._listeners.items(): |
|
775 | event_listeners = self.events_listeners[event_type] |
||
776 | for listener in napp_listeners: |
||
777 | event_listeners.remove(listener) |
||
778 | if not event_listeners: |
||
779 | del self.events_listeners[event_type] |
||
780 | # pylint: enable=protected-access |
||
781 | |||
782 | 1 | def unload_napps(self): |
|
783 | """Unload all loaded NApps that are not core NApps.""" |
||
784 | # list() is used here to avoid the error: |
||
785 | # 'RuntimeError: dictionary changed size during iteration' |
||
786 | # This is caused by looping over an dictionary while removing |
||
787 | # items from it. |
||
788 | for (username, napp_name) in list(self.napps.keys()): # noqa |
||
789 | self.unload_napp(username, napp_name) |
||
790 | |||
791 | 1 | def reload_napp_module(self, username, napp_name, napp_file): |
|
792 | """Reload a NApp Module.""" |
||
793 | mod_name = '.'.join(['napps', username, napp_name, napp_file]) |
||
794 | try: |
||
795 | napp_module = import_module(mod_name) |
||
796 | except ModuleNotFoundError as err: |
||
797 | self.log.error("Module '%s' not found", mod_name) |
||
798 | raise |
||
799 | try: |
||
800 | napp_module = reload_module(napp_module) |
||
801 | except ImportError as err: |
||
802 | self.log.error("Error reloading NApp '%s/%s': %s", |
||
803 | username, napp_name, err) |
||
804 | raise |
||
805 | |||
806 | 1 | def reload_napp(self, username, napp_name): |
|
807 | """Reload a NApp.""" |
||
808 | self.unload_napp(username, napp_name) |
||
809 | try: |
||
810 | self.reload_napp_module(username, napp_name, 'settings') |
||
811 | self.reload_napp_module(username, napp_name, 'main') |
||
812 | except (ModuleNotFoundError, ImportError): |
||
813 | return 400 |
||
814 | self.log.info("NApp '%s/%s' successfully reloaded", |
||
815 | username, napp_name) |
||
816 | self.load_napp(username, napp_name) |
||
817 | return 200 |
||
818 | |||
819 | 1 | def rest_reload_napp(self, username, napp_name): |
|
820 | """Request reload a NApp.""" |
||
821 | res = self.reload_napp(username, napp_name) |
||
822 | return 'reloaded', res |
||
823 | |||
824 | 1 | def rest_reload_all_napps(self): |
|
825 | """Request reload all NApps.""" |
||
826 | for napp in self.napps: |
||
827 | self.reload_napp(*napp) |
||
828 | return 'reloaded', 200 |
||
829 |