1 | """Kytos SDN Platform main class. |
||
2 | |||
3 | This module contains the main class of Kytos, which is |
||
4 | :class:`~.core.Controller`. |
||
5 | |||
6 | Basic usage: |
||
7 | |||
8 | .. code-block:: python3 |
||
9 | |||
10 | from kytos.config import KytosConfig |
||
11 | from kytos.core import Controller |
||
12 | config = KytosConfig() |
||
13 | controller = Controller(config.options) |
||
14 | controller.start() |
||
15 | """ |
||
16 | 1 | import asyncio |
|
17 | 1 | import atexit |
|
18 | 1 | import json |
|
19 | 1 | import logging |
|
20 | 1 | import os |
|
21 | 1 | import re |
|
22 | 1 | import sys |
|
23 | 1 | import threading |
|
24 | 1 | from concurrent.futures import ThreadPoolExecutor |
|
25 | 1 | from importlib import import_module |
|
26 | 1 | from importlib import reload as reload_module |
|
27 | 1 | from importlib.util import module_from_spec, spec_from_file_location |
|
28 | 1 | from pathlib import Path |
|
29 | |||
30 | 1 | from kytos.core.api_server import APIServer |
|
31 | # from kytos.core.tcp_server import KytosRequestHandler, KytosServer |
||
32 | 1 | from kytos.core.atcp_server import KytosServer, KytosServerProtocol |
|
33 | 1 | from kytos.core.auth import Auth |
|
34 | 1 | from kytos.core.buffers import KytosBuffers |
|
35 | 1 | from kytos.core.config import KytosConfig |
|
36 | 1 | from kytos.core.connection import ConnectionState |
|
37 | 1 | from kytos.core.events import KytosEvent |
|
38 | 1 | from kytos.core.helpers import now |
|
39 | 1 | from kytos.core.interface import Interface |
|
40 | 1 | from kytos.core.logs import LogManager |
|
41 | 1 | from kytos.core.napps.base import NApp |
|
42 | 1 | from kytos.core.napps.manager import NAppsManager |
|
43 | 1 | from kytos.core.napps.napp_dir_listener import NAppDirListener |
|
44 | 1 | from kytos.core.switch import Switch |
|
45 | |||
46 | 1 | __all__ = ('Controller',) |
|
47 | |||
48 | |||
49 | 1 | class Controller: |
|
50 | """Main class of Kytos. |
||
51 | |||
52 | The main responsibilities of this class are: |
||
53 | - start a thread with :class:`~.core.tcp_server.KytosServer`; |
||
54 | - manage KytosNApps (install, load and unload); |
||
55 | - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`); |
||
56 | - manage which event should be sent to NApps methods; |
||
57 | - manage the buffers handlers, considering one thread per handler. |
||
58 | """ |
||
59 | |||
60 | # Created issue #568 for the disabled checks. |
||
61 | # pylint: disable=too-many-instance-attributes,too-many-public-methods |
||
62 | 1 | def __init__(self, options=None, loop=None): |
|
63 | """Init method of Controller class takes the parameters below. |
||
64 | |||
65 | Args: |
||
66 | options (:attr:`ParseArgs.args`): :attr:`options` attribute from an |
||
67 | instance of :class:`~kytos.core.config.KytosConfig` class. |
||
68 | """ |
||
69 | 1 | if options is None: |
|
70 | options = KytosConfig().options['daemon'] |
||
71 | |||
72 | 1 | self._loop = loop or asyncio.get_event_loop() |
|
73 | 1 | self._pool = ThreadPoolExecutor(max_workers=1) |
|
74 | |||
75 | #: dict: keep the main threads of the controller (buffers and handler) |
||
76 | 1 | self._threads = {} |
|
77 | #: KytosBuffers: KytosBuffer object with Controller buffers |
||
78 | 1 | self.buffers = KytosBuffers(loop=self._loop) |
|
79 | #: dict: keep track of the socket connections labeled by ``(ip, port)`` |
||
80 | #: |
||
81 | #: This dict stores all connections between the controller and the |
||
82 | #: switches. The key for this dict is a tuple (ip, port). The content |
||
83 | #: is another dict with the connection information. |
||
84 | 1 | self.connections = {} |
|
85 | #: dict: mapping of events and event listeners. |
||
86 | #: |
||
87 | #: The key of the dict is a KytosEvent (or a string that represent a |
||
88 | #: regex to match against KytosEvents) and the value is a list of |
||
89 | #: methods that will receive the referenced event |
||
90 | 1 | self.events_listeners = {'kytos/core.connection.new': |
|
91 | [self.new_connection]} |
||
92 | |||
93 | #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance) |
||
94 | #: |
||
95 | #: The key is the napp name (string), while the value is the napp |
||
96 | #: instance itself. |
||
97 | 1 | self.napps = {} |
|
98 | #: Object generated by ParseArgs on config.py file |
||
99 | 1 | self.options = options |
|
100 | #: KytosServer: Instance of KytosServer that will be listening to TCP |
||
101 | #: connections. |
||
102 | 1 | self.server = None |
|
103 | #: dict: Current existing switches. |
||
104 | #: |
||
105 | #: The key is the switch dpid, while the value is a Switch object. |
||
106 | 1 | self.switches = {} # dpid: Switch() |
|
107 | |||
108 | #: datetime.datetime: Time when the controller finished starting. |
||
109 | 1 | self.started_at = None |
|
110 | |||
111 | #: logging.Logger: Logger instance used by Kytos. |
||
112 | 1 | self.log = None |
|
113 | |||
114 | #: Observer that handle NApps when they are enabled or disabled. |
||
115 | 1 | self.napp_dir_listener = NAppDirListener(self) |
|
116 | |||
117 | 1 | self.napps_manager = NAppsManager(self) |
|
118 | |||
119 | #: API Server used to expose rest endpoints. |
||
120 | 1 | self.api_server = APIServer(__name__, self.options.listen, |
|
121 | self.options.api_port, |
||
122 | self.napps_manager, self.options.napps) |
||
123 | |||
124 | 1 | self.auth = Auth(self) |
|
125 | |||
126 | 1 | self._register_endpoints() |
|
127 | #: Adding the napps 'enabled' directory into the PATH |
||
128 | #: Now you can access the enabled napps with: |
||
129 | #: from napps.<username>.<napp_name> import ?.... |
||
130 | 1 | sys.path.append(os.path.join(self.options.napps, os.pardir)) |
|
131 | |||
132 | 1 | def enable_logs(self): |
|
133 | """Register kytos log and enable the logs.""" |
||
134 | 1 | LogManager.load_config_file(self.options.logging, self.options.debug) |
|
135 | 1 | LogManager.enable_websocket(self.api_server.server) |
|
136 | 1 | self.log = logging.getLogger(__name__) |
|
137 | |||
138 | 1 | @staticmethod |
|
139 | def loggers(): |
||
140 | """List all logging Loggers. |
||
141 | |||
142 | Return a list of Logger objects, with name and logging level. |
||
143 | """ |
||
144 | 1 | return [logging.getLogger(name) |
|
145 | for name in logging.root.manager.loggerDict |
||
146 | if "kytos" in name] |
||
147 | |||
148 | 1 | def toggle_debug(self, name=None): |
|
149 | """Enable/disable logging debug messages to a given logger name. |
||
150 | |||
151 | If the name parameter is not specified the debug will be |
||
152 | enabled/disabled following the initial config file. It will decide |
||
153 | to enable/disable using the 'kytos' name to find the current |
||
154 | logger level. |
||
155 | Obs: To disable the debug the logging will be set to NOTSET |
||
156 | |||
157 | Args: |
||
158 | name(text): Full hierarchy Logger name. Ex: "kytos.core.controller" |
||
159 | """ |
||
160 | 1 | if name and name not in logging.root.manager.loggerDict: |
|
161 | # A Logger name that is not declared in logging will raise an error |
||
162 | # otherwise logging would create a new Logger. |
||
163 | 1 | raise ValueError(f"Invalid logger name: {name}") |
|
164 | |||
165 | 1 | if not name: |
|
166 | # Logger name not specified. |
||
167 | 1 | level = logging.getLogger('kytos').getEffectiveLevel() |
|
168 | 1 | enable_debug = level != logging.DEBUG |
|
169 | |||
170 | # Enable/disable default Loggers |
||
171 | 1 | LogManager.load_config_file(self.options.logging, enable_debug) |
|
172 | 1 | return |
|
173 | |||
174 | # Get effective logger level for the name |
||
175 | 1 | level = logging.getLogger(name).getEffectiveLevel() |
|
176 | 1 | logger = logging.getLogger(name) |
|
177 | |||
178 | 1 | if level == logging.DEBUG: |
|
179 | # disable debug |
||
180 | 1 | logger.setLevel(logging.NOTSET) |
|
181 | else: |
||
182 | # enable debug |
||
183 | 1 | logger.setLevel(logging.DEBUG) |
|
184 | |||
185 | 1 | def start(self, restart=False): |
|
186 | """Create pidfile and call start_controller method.""" |
||
187 | 1 | self.enable_logs() |
|
188 | 1 | if not restart: |
|
189 | 1 | self.create_pidfile() |
|
190 | 1 | self.start_controller() |
|
191 | |||
192 | 1 | def create_pidfile(self): |
|
193 | """Create a pidfile.""" |
||
194 | 1 | pid = os.getpid() |
|
195 | |||
196 | # Creates directory if it doesn't exist |
||
197 | # System can erase /var/run's content |
||
198 | 1 | pid_folder = Path(self.options.pidfile).parent |
|
199 | 1 | self.log.info(pid_folder) |
|
200 | |||
201 | # Pylint incorrectly infers Path objects |
||
202 | # https://github.com/PyCQA/pylint/issues/224 |
||
203 | # pylint: disable=no-member |
||
204 | 1 | if not pid_folder.exists(): |
|
205 | pid_folder.mkdir() |
||
206 | pid_folder.chmod(0o1777) |
||
207 | # pylint: enable=no-member |
||
208 | |||
209 | # Make sure the file is deleted when controller stops |
||
210 | 1 | atexit.register(Path(self.options.pidfile).unlink) |
|
211 | |||
212 | # Checks if a pidfile exists. Creates a new file. |
||
213 | 1 | try: |
|
214 | 1 | pidfile = open(self.options.pidfile, mode='x') |
|
215 | 1 | except OSError: |
|
216 | # This happens if there is a pidfile already. |
||
217 | # We shall check if the process that created the pidfile is still |
||
218 | # running. |
||
219 | 1 | try: |
|
220 | 1 | existing_file = open(self.options.pidfile, mode='r') |
|
221 | 1 | old_pid = int(existing_file.read()) |
|
222 | 1 | os.kill(old_pid, 0) |
|
223 | # If kill() doesn't return an error, there's a process running |
||
224 | # with the same PID. We assume it is Kytos and quit. |
||
225 | # Otherwise, overwrite the file and proceed. |
||
226 | error_msg = ("PID file {} exists. Delete it if Kytos is not " |
||
227 | "running. Aborting.") |
||
228 | sys.exit(error_msg.format(self.options.pidfile)) |
||
229 | 1 | except OSError: |
|
230 | 1 | try: |
|
231 | 1 | pidfile = open(self.options.pidfile, mode='w') |
|
232 | except OSError as exception: |
||
233 | error_msg = "Failed to create pidfile {}: {}." |
||
234 | sys.exit(error_msg.format(self.options.pidfile, exception)) |
||
235 | |||
236 | # Identifies the process that created the pidfile. |
||
237 | 1 | pidfile.write(str(pid)) |
|
238 | 1 | pidfile.close() |
|
239 | |||
240 | 1 | def start_controller(self): |
|
241 | """Start the controller. |
||
242 | |||
243 | Starts the KytosServer (TCP Server) coroutine. |
||
244 | Starts a thread for each buffer handler. |
||
245 | Load the installed apps. |
||
246 | """ |
||
247 | 1 | self.log.info("Starting Kytos - Kytos Controller") |
|
248 | 1 | self.server = KytosServer((self.options.listen, |
|
249 | int(self.options.port)), |
||
250 | KytosServerProtocol, |
||
251 | self, |
||
252 | self.options.protocol_name) |
||
253 | |||
254 | 1 | self.log.info("Starting TCP server: %s", self.server) |
|
255 | 1 | self.server.serve_forever() |
|
256 | |||
257 | 1 | def _stop_loop(_): |
|
258 | loop = asyncio.get_event_loop() |
||
259 | # print(_.result()) |
||
260 | threads = threading.enumerate() |
||
261 | self.log.debug("%s threads before loop.stop: %s", |
||
262 | len(threads), threads) |
||
263 | loop.stop() |
||
264 | |||
265 | 1 | async def _run_api_server_thread(executor): |
|
266 | log = logging.getLogger('kytos.core.controller.api_server_thread') |
||
267 | log.debug('starting') |
||
268 | # log.debug('creating tasks') |
||
269 | loop = asyncio.get_event_loop() |
||
270 | blocking_tasks = [ |
||
271 | loop.run_in_executor(executor, self.api_server.run) |
||
272 | ] |
||
273 | # log.debug('waiting for tasks') |
||
274 | completed, pending = await asyncio.wait(blocking_tasks) |
||
275 | # results = [t.result() for t in completed] |
||
276 | # log.debug('results: {!r}'.format(results)) |
||
277 | log.debug('completed: %d, pending: %d', |
||
278 | len(completed), len(pending)) |
||
279 | |||
280 | 1 | task = self._loop.create_task(self.raw_event_handler()) |
|
281 | 1 | task = self._loop.create_task(self.msg_in_event_handler()) |
|
282 | 1 | task = self._loop.create_task(self.msg_out_event_handler()) |
|
283 | 1 | task = self._loop.create_task(self.app_event_handler()) |
|
284 | 1 | task = self._loop.create_task(_run_api_server_thread(self._pool)) |
|
285 | 1 | task.add_done_callback(_stop_loop) |
|
286 | |||
287 | 1 | self.log.info("ThreadPool started: %s", self._pool) |
|
288 | |||
289 | # ASYNC TODO: ensure all threads started correctly |
||
290 | # This is critical, if any of them failed starting we should exit. |
||
291 | # sys.exit(error_msg.format(thread, exception)) |
||
292 | |||
293 | 1 | self.log.info("Loading Kytos NApps...") |
|
294 | 1 | self.napp_dir_listener.start() |
|
295 | 1 | self.pre_install_napps(self.options.napps_pre_installed) |
|
296 | 1 | self.load_napps() |
|
297 | |||
298 | 1 | self.started_at = now() |
|
299 | |||
300 | 1 | def _register_endpoints(self): |
|
301 | """Register all rest endpoint served by kytos. |
||
302 | |||
303 | - Register APIServer endpoints |
||
304 | - Register WebUI endpoints |
||
305 | - Register ``/api/kytos/core/config`` endpoint |
||
306 | """ |
||
307 | 1 | self.api_server.start_api() |
|
308 | # Register controller endpoints as /api/kytos/core/... |
||
309 | 1 | self.api_server.register_core_endpoint('config/', |
|
310 | self.configuration_endpoint) |
||
311 | 1 | self.api_server.register_core_endpoint('metadata/', |
|
312 | Controller.metadata_endpoint) |
||
313 | 1 | self.api_server.register_core_endpoint( |
|
314 | 'reload/<username>/<napp_name>/', |
||
315 | self.rest_reload_napp) |
||
316 | 1 | self.api_server.register_core_endpoint('reload/all', |
|
317 | self.rest_reload_all_napps) |
||
318 | 1 | self.auth.register_core_auth_services() |
|
319 | |||
320 | 1 | def register_rest_endpoint(self, url, function, methods): |
|
321 | """Deprecate in favor of @rest decorator.""" |
||
322 | 1 | self.api_server.register_rest_endpoint(url, function, methods) |
|
323 | |||
324 | 1 | @classmethod |
|
325 | def metadata_endpoint(cls): |
||
326 | """Return the Kytos metadata. |
||
327 | |||
328 | Returns: |
||
329 | string: Json with current kytos metadata. |
||
330 | |||
331 | """ |
||
332 | 1 | meta_path = "%s/metadata.py" % os.path.dirname(__file__) |
|
333 | 1 | meta_file = open(meta_path).read() |
|
334 | 1 | metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file)) |
|
335 | 1 | return json.dumps(metadata) |
|
336 | |||
337 | 1 | def configuration_endpoint(self): |
|
338 | """Return the configuration options used by Kytos. |
||
339 | |||
340 | Returns: |
||
341 | string: Json with current configurations used by kytos. |
||
342 | |||
343 | """ |
||
344 | 1 | return json.dumps(self.options.__dict__) |
|
345 | |||
346 | 1 | def restart(self, graceful=True): |
|
347 | """Restart Kytos SDN Controller. |
||
348 | |||
349 | Args: |
||
350 | graceful(bool): Represents the way that Kytos will restart. |
||
351 | """ |
||
352 | 1 | if self.started_at is not None: |
|
353 | 1 | self.stop(graceful) |
|
354 | 1 | self.__init__(self.options) |
|
355 | |||
356 | 1 | self.start(restart=True) |
|
357 | |||
358 | 1 | def stop(self, graceful=True): |
|
359 | """Shutdown all services used by kytos. |
||
360 | |||
361 | This method should: |
||
362 | - stop all Websockets |
||
363 | - stop the API Server |
||
364 | - stop the Controller |
||
365 | """ |
||
366 | 1 | if self.started_at: |
|
367 | 1 | self.stop_controller(graceful) |
|
368 | |||
369 | 1 | def stop_controller(self, graceful=True): |
|
370 | """Stop the controller. |
||
371 | |||
372 | This method should: |
||
373 | - announce on the network that the controller will shutdown; |
||
374 | - stop receiving incoming packages; |
||
375 | - call the 'shutdown' method of each KytosNApp that is running; |
||
376 | - finish reading the events on all buffers; |
||
377 | - stop each running handler; |
||
378 | - stop all running threads; |
||
379 | - stop the KytosServer; |
||
380 | """ |
||
381 | 1 | self.log.info("Stopping Kytos") |
|
382 | |||
383 | 1 | self.buffers.send_stop_signal() |
|
384 | 1 | self.api_server.stop_api_server() |
|
385 | 1 | self.napp_dir_listener.stop() |
|
386 | |||
387 | 1 | self.log.info("Stopping threadpool: %s", self._pool) |
|
388 | |||
389 | 1 | threads = threading.enumerate() |
|
390 | 1 | self.log.debug("%s threads before threadpool shutdown: %s", |
|
391 | len(threads), threads) |
||
392 | |||
393 | 1 | self._pool.shutdown(wait=graceful) |
|
394 | |||
395 | # self.server.socket.shutdown() |
||
396 | # self.server.socket.close() |
||
397 | |||
398 | # for thread in self._threads.values(): |
||
399 | # self.log.info("Stopping thread: %s", thread.name) |
||
400 | # thread.join() |
||
401 | |||
402 | # for thread in self._threads.values(): |
||
403 | # while thread.is_alive(): |
||
404 | # self.log.info("Thread is alive: %s", thread.name) |
||
405 | # pass |
||
406 | |||
407 | 1 | self.started_at = None |
|
408 | 1 | self.unload_napps() |
|
409 | 1 | self.buffers = KytosBuffers() |
|
410 | |||
411 | # ASYNC TODO: close connections |
||
412 | # self.server.server_close() |
||
413 | |||
414 | # Shutdown the TCP server and the main asyncio loop |
||
415 | 1 | self.server.shutdown() |
|
416 | |||
417 | 1 | def status(self): |
|
418 | """Return status of Kytos Server. |
||
419 | |||
420 | If the controller kytos is running this method will be returned |
||
421 | "Running since 'Started_At'", otherwise "Stopped". |
||
422 | |||
423 | Returns: |
||
424 | string: String with kytos status. |
||
425 | |||
426 | """ |
||
427 | 1 | if self.started_at: |
|
428 | 1 | return "Running since %s" % self.started_at |
|
429 | 1 | return "Stopped" |
|
430 | |||
431 | 1 | def uptime(self): |
|
432 | """Return the uptime of kytos server. |
||
433 | |||
434 | This method should return: |
||
435 | - 0 if Kytos Server is stopped. |
||
436 | - (kytos.start_at - datetime.now) if Kytos Server is running. |
||
437 | |||
438 | Returns: |
||
439 | datetime.timedelta: The uptime interval. |
||
440 | |||
441 | """ |
||
442 | 1 | return now() - self.started_at if self.started_at else 0 |
|
443 | |||
444 | 1 | def notify_listeners(self, event): |
|
445 | """Send the event to the specified listeners. |
||
446 | |||
447 | Loops over self.events_listeners matching (by regexp) the attribute |
||
448 | name of the event with the keys of events_listeners. If a match occurs, |
||
449 | then send the event to each registered listener. |
||
450 | |||
451 | Args: |
||
452 | event (~kytos.core.KytosEvent): An instance of a KytosEvent. |
||
453 | """ |
||
454 | 1 | self.log.debug("looking for listeners for %s", event) |
|
455 | 1 | for event_regex, listeners in dict(self.events_listeners).items(): |
|
456 | # self.log.debug("listeners found for %s: %r => %s", event, |
||
457 | # event_regex, [l.__qualname__ for l in listeners]) |
||
458 | # Do not match if the event has more characters |
||
459 | # e.g. "shutdown" won't match "shutdown.kytos/of_core" |
||
460 | 1 | if event_regex[-1] != '$' or event_regex[-2] == '\\': |
|
461 | 1 | event_regex += '$' |
|
462 | 1 | if re.match(event_regex, event.name): |
|
463 | # self.log.debug('Calling listeners for %s', event) |
||
464 | 1 | for listener in listeners: |
|
465 | 1 | listener(event) |
|
466 | |||
467 | 1 | async def raw_event_handler(self): |
|
468 | """Handle raw events. |
||
469 | |||
470 | Listen to the raw_buffer and send all its events to the |
||
471 | corresponding listeners. |
||
472 | """ |
||
473 | 1 | self.log.info("Raw Event Handler started") |
|
474 | 1 | while True: |
|
475 | 1 | event = await self.buffers.raw.aget() |
|
476 | 1 | self.notify_listeners(event) |
|
477 | 1 | self.log.debug("Raw Event handler called") |
|
478 | |||
479 | 1 | if event.name == "kytos/core.shutdown": |
|
480 | 1 | self.log.debug("Raw Event handler stopped") |
|
481 | 1 | break |
|
482 | |||
483 | 1 | async def msg_in_event_handler(self): |
|
484 | """Handle msg_in events. |
||
485 | |||
486 | Listen to the msg_in buffer and send all its events to the |
||
487 | corresponding listeners. |
||
488 | """ |
||
489 | 1 | self.log.info("Message In Event Handler started") |
|
490 | 1 | while True: |
|
491 | 1 | event = await self.buffers.msg_in.aget() |
|
492 | 1 | self.notify_listeners(event) |
|
493 | 1 | self.log.debug("Message In Event handler called") |
|
494 | |||
495 | 1 | if event.name == "kytos/core.shutdown": |
|
496 | 1 | self.log.debug("Message In Event handler stopped") |
|
497 | 1 | break |
|
498 | |||
499 | 1 | async def msg_out_event_handler(self): |
|
500 | """Handle msg_out events. |
||
501 | |||
502 | Listen to the msg_out buffer and send all its events to the |
||
503 | corresponding listeners. |
||
504 | """ |
||
505 | 1 | self.log.info("Message Out Event Handler started") |
|
506 | 1 | while True: |
|
507 | 1 | triggered_event = await self.buffers.msg_out.aget() |
|
508 | |||
509 | 1 | if triggered_event.name == "kytos/core.shutdown": |
|
510 | 1 | self.log.debug("Message Out Event handler stopped") |
|
511 | 1 | break |
|
512 | |||
513 | 1 | message = triggered_event.content['message'] |
|
514 | 1 | destination = triggered_event.destination |
|
515 | 1 | if (destination and |
|
516 | not destination.state == ConnectionState.FINISHED): |
||
517 | 1 | packet = message.pack() |
|
518 | 1 | destination.send(packet) |
|
519 | 1 | self.log.debug('Connection %s: OUT OFP, ' |
|
520 | 'version: %s, type: %s, xid: %s - %s', |
||
521 | destination.id, |
||
522 | message.header.version, |
||
523 | message.header.message_type, |
||
524 | message.header.xid, |
||
525 | packet.hex()) |
||
526 | 1 | self.notify_listeners(triggered_event) |
|
527 | 1 | self.log.debug("Message Out Event handler called") |
|
528 | else: |
||
529 | self.log.info("connection closed. Cannot send message") |
||
530 | |||
531 | 1 | async def app_event_handler(self): |
|
532 | """Handle app events. |
||
533 | |||
534 | Listen to the app buffer and send all its events to the |
||
535 | corresponding listeners. |
||
536 | """ |
||
537 | 1 | self.log.info("App Event Handler started") |
|
538 | 1 | while True: |
|
539 | 1 | event = await self.buffers.app.aget() |
|
540 | 1 | self.notify_listeners(event) |
|
541 | 1 | self.log.debug("App Event handler called") |
|
542 | |||
543 | 1 | if event.name == "kytos/core.shutdown": |
|
544 | 1 | self.log.debug("App Event handler stopped") |
|
545 | 1 | break |
|
546 | |||
547 | 1 | def get_interface_by_id(self, interface_id): |
|
548 | """Find a Interface with interface_id. |
||
549 | |||
550 | Args: |
||
551 | interface_id(str): Interface Identifier. |
||
552 | |||
553 | Returns: |
||
554 | Interface: Instance of Interface with the id given. |
||
555 | |||
556 | """ |
||
557 | 1 | if interface_id is None: |
|
558 | 1 | return None |
|
559 | |||
560 | 1 | switch_id = ":".join(interface_id.split(":")[:-1]) |
|
561 | 1 | interface_number = int(interface_id.split(":")[-1]) |
|
562 | |||
563 | 1 | switch = self.switches.get(switch_id) |
|
564 | |||
565 | 1 | if not switch: |
|
566 | 1 | return None |
|
567 | |||
568 | 1 | return switch.interfaces.get(interface_number, None) |
|
569 | |||
570 | 1 | def get_switch_by_dpid(self, dpid): |
|
571 | """Return a specific switch by dpid. |
||
572 | |||
573 | Args: |
||
574 | dpid (|DPID|): dpid object used to identify a switch. |
||
575 | |||
576 | Returns: |
||
577 | :class:`~kytos.core.switch.Switch`: Switch with dpid specified. |
||
578 | |||
579 | """ |
||
580 | 1 | return self.switches.get(dpid) |
|
581 | |||
582 | 1 | def get_switch_or_create(self, dpid, connection): |
|
583 | """Return switch or create it if necessary. |
||
584 | |||
585 | Args: |
||
586 | dpid (|DPID|): dpid object used to identify a switch. |
||
587 | connection (:class:`~kytos.core.connection.Connection`): |
||
588 | connection used by switch. If a switch has a connection that |
||
589 | will be updated. |
||
590 | |||
591 | Returns: |
||
592 | :class:`~kytos.core.switch.Switch`: new or existent switch. |
||
593 | |||
594 | """ |
||
595 | 1 | self.create_or_update_connection(connection) |
|
596 | 1 | switch = self.get_switch_by_dpid(dpid) |
|
597 | 1 | event_name = 'kytos/core.switch.' |
|
598 | |||
599 | 1 | if switch is None: |
|
600 | 1 | switch = Switch(dpid=dpid) |
|
601 | 1 | self.add_new_switch(switch) |
|
602 | 1 | event_name += 'new' |
|
603 | else: |
||
604 | 1 | event_name += 'reconnected' |
|
605 | |||
606 | 1 | self.set_switch_options(dpid=dpid) |
|
607 | 1 | event = KytosEvent(name=event_name, content={'switch': switch}) |
|
608 | |||
609 | 1 | old_connection = switch.connection |
|
610 | 1 | switch.update_connection(connection) |
|
611 | |||
612 | 1 | if old_connection is not connection: |
|
613 | 1 | self.remove_connection(old_connection) |
|
614 | |||
615 | 1 | self.buffers.app.put(event) |
|
616 | |||
617 | 1 | return switch |
|
618 | |||
619 | 1 | def set_switch_options(self, dpid): |
|
620 | """Update the switch settings based on kytos.conf options. |
||
621 | |||
622 | Args: |
||
623 | dpid (str): dpid used to identify a switch. |
||
624 | |||
625 | """ |
||
626 | 1 | switch = self.switches.get(dpid) |
|
627 | 1 | if not switch: |
|
628 | return |
||
629 | |||
630 | 1 | vlan_pool = {} |
|
631 | 1 | try: |
|
632 | 1 | vlan_pool = json.loads(self.options.vlan_pool) |
|
633 | 1 | if not vlan_pool: |
|
634 | 1 | return |
|
635 | except (TypeError, json.JSONDecodeError) as err: |
||
636 | self.log.error("Invalid vlan_pool settings: %s", err) |
||
637 | |||
638 | 1 | if vlan_pool.get(dpid): |
|
639 | 1 | self.log.info("Loading vlan_pool configuration for dpid %s", dpid) |
|
640 | 1 | for intf_num, port_list in vlan_pool[dpid].items(): |
|
641 | 1 | if not switch.interfaces.get((intf_num)): |
|
642 | 1 | vlan_ids = set() |
|
643 | 1 | for vlan_range in port_list: |
|
644 | 1 | (vlan_begin, vlan_end) = (vlan_range[0:2]) |
|
645 | 1 | for vlan_id in range(vlan_begin, vlan_end): |
|
646 | 1 | vlan_ids.add(vlan_id) |
|
647 | 1 | intf_num = int(intf_num) |
|
648 | 1 | intf = Interface(name=intf_num, port_number=intf_num, |
|
649 | switch=switch) |
||
650 | 1 | intf.set_available_tags(vlan_ids) |
|
651 | 1 | switch.update_interface(intf) |
|
652 | |||
653 | 1 | def create_or_update_connection(self, connection): |
|
654 | """Update a connection. |
||
655 | |||
656 | Args: |
||
657 | connection (:class:`~kytos.core.connection.Connection`): |
||
658 | Instance of connection that will be updated. |
||
659 | """ |
||
660 | 1 | self.connections[connection.id] = connection |
|
661 | |||
662 | 1 | def get_connection_by_id(self, conn_id): |
|
663 | """Return a existent connection by id. |
||
664 | |||
665 | Args: |
||
666 | id (int): id from a connection. |
||
667 | |||
668 | Returns: |
||
669 | :class:`~kytos.core.connection.Connection`: Instance of connection |
||
670 | or None Type. |
||
671 | |||
672 | """ |
||
673 | 1 | return self.connections.get(conn_id) |
|
674 | |||
675 | 1 | def remove_connection(self, connection): |
|
676 | """Close a existent connection and remove it. |
||
677 | |||
678 | Args: |
||
679 | connection (:class:`~kytos.core.connection.Connection`): |
||
680 | Instance of connection that will be removed. |
||
681 | """ |
||
682 | 1 | if connection is None: |
|
683 | 1 | return False |
|
684 | |||
685 | 1 | try: |
|
686 | 1 | connection.close() |
|
687 | 1 | del self.connections[connection.id] |
|
688 | 1 | except KeyError: |
|
689 | 1 | return False |
|
690 | 1 | return True |
|
691 | |||
692 | 1 | def remove_switch(self, switch): |
|
693 | """Remove an existent switch. |
||
694 | |||
695 | Args: |
||
696 | switch (:class:`~kytos.core.switch.Switch`): |
||
697 | Instance of switch that will be removed. |
||
698 | """ |
||
699 | 1 | try: |
|
700 | 1 | del self.switches[switch.dpid] |
|
701 | 1 | except KeyError: |
|
702 | 1 | return False |
|
703 | 1 | return True |
|
704 | |||
705 | 1 | def new_connection(self, event): |
|
706 | """Handle a kytos/core.connection.new event. |
||
707 | |||
708 | This method will read new connection event and store the connection |
||
709 | (socket) into the connections attribute on the controller. |
||
710 | |||
711 | It also clear all references to the connection since it is a new |
||
712 | connection on the same ip:port. |
||
713 | |||
714 | Args: |
||
715 | event (~kytos.core.KytosEvent): |
||
716 | The received event (``kytos/core.connection.new``) with the |
||
717 | needed info. |
||
718 | """ |
||
719 | 1 | self.log.info("Handling %s...", event) |
|
720 | |||
721 | 1 | connection = event.source |
|
722 | 1 | self.log.debug("Event source: %s", event.source) |
|
723 | |||
724 | # Remove old connection (aka cleanup) if it exists |
||
725 | 1 | if self.get_connection_by_id(connection.id): |
|
726 | self.remove_connection(connection.id) |
||
727 | |||
728 | # Update connections with the new connection |
||
729 | 1 | self.create_or_update_connection(connection) |
|
730 | |||
731 | 1 | def add_new_switch(self, switch): |
|
732 | """Add a new switch on the controller. |
||
733 | |||
734 | Args: |
||
735 | switch (Switch): A Switch object |
||
736 | """ |
||
737 | 1 | self.switches[switch.dpid] = switch |
|
738 | |||
739 | 1 | def _import_napp(self, username, napp_name): |
|
740 | """Import a NApp module. |
||
741 | |||
742 | Raises: |
||
743 | FileNotFoundError: if NApp's main.py is not found. |
||
744 | ModuleNotFoundError: if any NApp requirement is not installed. |
||
745 | |||
746 | """ |
||
747 | 1 | mod_name = '.'.join(['napps', username, napp_name, 'main']) |
|
748 | 1 | path = os.path.join(self.options.napps, username, napp_name, |
|
749 | 'main.py') |
||
750 | 1 | napp_spec = spec_from_file_location(mod_name, path) |
|
751 | 1 | napp_module = module_from_spec(napp_spec) |
|
752 | 1 | sys.modules[napp_spec.name] = napp_module |
|
753 | 1 | napp_spec.loader.exec_module(napp_module) |
|
754 | 1 | return napp_module |
|
755 | |||
756 | 1 | def load_napp(self, username, napp_name): |
|
757 | """Load a single NApp. |
||
758 | |||
759 | Args: |
||
760 | username (str): NApp username (makes up NApp's path). |
||
761 | napp_name (str): Name of the NApp to be loaded. |
||
762 | """ |
||
763 | 1 | if (username, napp_name) in self.napps: |
|
764 | 1 | message = 'NApp %s/%s was already loaded' |
|
765 | 1 | self.log.warning(message, username, napp_name) |
|
766 | 1 | return |
|
767 | |||
768 | 1 | try: |
|
769 | 1 | napp_module = self._import_napp(username, napp_name) |
|
770 | 1 | except ModuleNotFoundError as err: |
|
771 | 1 | self.log.error("Error loading NApp '%s/%s': %s", |
|
772 | username, napp_name, err) |
||
773 | 1 | return |
|
774 | 1 | except FileNotFoundError as err: |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
775 | 1 | msg = "NApp module not found, assuming it's a meta-napp: %s" |
|
776 | 1 | self.log.warning(msg, err.filename) |
|
777 | 1 | return |
|
778 | |||
779 | 1 | try: |
|
780 | 1 | napp = napp_module.Main(controller=self) |
|
781 | 1 | except: # noqa pylint: disable=bare-except |
|
782 | 1 | self.log.critical("NApp initialization failed: %s/%s", |
|
783 | username, napp_name, exc_info=True) |
||
784 | 1 | return |
|
785 | |||
786 | 1 | self.napps[(username, napp_name)] = napp |
|
787 | |||
788 | 1 | napp.start() |
|
789 | 1 | self.api_server.register_napp_endpoints(napp) |
|
790 | |||
791 | # pylint: disable=protected-access |
||
792 | 1 | for event, listeners in napp._listeners.items(): |
|
793 | self.events_listeners.setdefault(event, []).extend(listeners) |
||
794 | # pylint: enable=protected-access |
||
795 | |||
796 | 1 | def pre_install_napps(self, napps, enable=True): |
|
797 | """Pre install and enable NApps. |
||
798 | |||
799 | Before installing, it'll check if it's installed yet. |
||
800 | |||
801 | Args: |
||
802 | napps ([str]): List of NApps to be pre-installed and enabled. |
||
803 | """ |
||
804 | 1 | all_napps = self.napps_manager.get_installed_napps() |
|
805 | 1 | installed = [str(napp) for napp in all_napps] |
|
806 | 1 | napps_diff = [napp for napp in napps if napp not in installed] |
|
807 | 1 | for napp in napps_diff: |
|
808 | 1 | self.napps_manager.install(napp, enable=enable) |
|
809 | |||
810 | 1 | def load_napps(self): |
|
811 | """Load all NApps enabled on the NApps dir.""" |
||
812 | 1 | for napp in self.napps_manager.get_enabled_napps(): |
|
813 | 1 | try: |
|
814 | 1 | self.log.info("Loading NApp %s", napp.id) |
|
815 | 1 | self.load_napp(napp.username, napp.name) |
|
816 | except FileNotFoundError as exception: |
||
817 | self.log.error("Could not load NApp %s: %s", |
||
818 | napp.id, exception) |
||
819 | |||
820 | 1 | def unload_napp(self, username, napp_name): |
|
821 | """Unload a specific NApp. |
||
822 | |||
823 | Args: |
||
824 | username (str): NApp username. |
||
825 | napp_name (str): Name of the NApp to be unloaded. |
||
826 | """ |
||
827 | 1 | napp = self.napps.pop((username, napp_name), None) |
|
828 | |||
829 | 1 | if napp is None: |
|
830 | self.log.warning('NApp %s/%s was not loaded', username, napp_name) |
||
831 | else: |
||
832 | 1 | self.log.info("Shutting down NApp %s/%s...", username, napp_name) |
|
833 | 1 | napp_id = NApp(username, napp_name).id |
|
834 | 1 | event = KytosEvent(name='kytos/core.shutdown.' + napp_id) |
|
835 | 1 | napp_shutdown_fn = self.events_listeners[event.name][0] |
|
836 | # Call listener before removing it from events_listeners |
||
837 | 1 | napp_shutdown_fn(event) |
|
838 | |||
839 | # Remove rest endpoints from that napp |
||
840 | 1 | self.api_server.remove_napp_endpoints(napp) |
|
841 | |||
842 | # Removing listeners from that napp |
||
843 | # pylint: disable=protected-access |
||
844 | 1 | for event_type, napp_listeners in napp._listeners.items(): |
|
845 | event_listeners = self.events_listeners[event_type] |
||
846 | for listener in napp_listeners: |
||
847 | event_listeners.remove(listener) |
||
848 | if not event_listeners: |
||
849 | del self.events_listeners[event_type] |
||
850 | # pylint: enable=protected-access |
||
851 | |||
852 | 1 | def unload_napps(self): |
|
853 | """Unload all loaded NApps that are not core NApps.""" |
||
854 | # list() is used here to avoid the error: |
||
855 | # 'RuntimeError: dictionary changed size during iteration' |
||
856 | # This is caused by looping over an dictionary while removing |
||
857 | # items from it. |
||
858 | for (username, napp_name) in list(self.napps.keys()): # noqa |
||
859 | self.unload_napp(username, napp_name) |
||
860 | |||
861 | 1 | def reload_napp_module(self, username, napp_name, napp_file): |
|
862 | """Reload a NApp Module.""" |
||
863 | 1 | mod_name = '.'.join(['napps', username, napp_name, napp_file]) |
|
864 | 1 | try: |
|
865 | 1 | napp_module = import_module(mod_name) |
|
866 | 1 | except ModuleNotFoundError as err: |
|
867 | 1 | self.log.error("Module '%s' not found", mod_name) |
|
868 | 1 | raise |
|
869 | 1 | try: |
|
870 | 1 | napp_module = reload_module(napp_module) |
|
871 | 1 | except ImportError as err: |
|
872 | 1 | self.log.error("Error reloading NApp '%s/%s': %s", |
|
873 | username, napp_name, err) |
||
874 | 1 | raise |
|
875 | |||
876 | 1 | def reload_napp(self, username, napp_name): |
|
877 | """Reload a NApp.""" |
||
878 | 1 | self.unload_napp(username, napp_name) |
|
879 | 1 | try: |
|
880 | 1 | self.reload_napp_module(username, napp_name, 'settings') |
|
881 | 1 | self.reload_napp_module(username, napp_name, 'main') |
|
882 | 1 | except (ModuleNotFoundError, ImportError): |
|
883 | 1 | return 400 |
|
884 | 1 | self.log.info("NApp '%s/%s' successfully reloaded", |
|
885 | username, napp_name) |
||
886 | 1 | self.load_napp(username, napp_name) |
|
887 | 1 | return 200 |
|
888 | |||
889 | 1 | def rest_reload_napp(self, username, napp_name): |
|
890 | """Request reload a NApp.""" |
||
891 | 1 | res = self.reload_napp(username, napp_name) |
|
892 | 1 | return 'reloaded', res |
|
893 | |||
894 | 1 | def rest_reload_all_napps(self): |
|
895 | """Request reload all NApps.""" |
||
896 | 1 | for napp in self.napps: |
|
897 | 1 | self.reload_napp(*napp) |
|
898 | return 'reloaded', 200 |
||
899 |