1 | """Kytos SDN Platform main class. |
||
2 | |||
3 | This module contains the main class of Kytos, which is |
||
4 | :class:`~.core.Controller`. |
||
5 | |||
6 | Basic usage: |
||
7 | |||
8 | .. code-block:: python3 |
||
9 | |||
10 | from kytos.config import KytosConfig |
||
11 | from kytos.core import Controller |
||
12 | config = KytosConfig() |
||
13 | controller = Controller(config.options) |
||
14 | controller.start() |
||
15 | """ |
||
16 | 1 | import asyncio |
|
17 | 1 | import atexit |
|
18 | 1 | import json |
|
19 | 1 | import logging |
|
20 | 1 | import os |
|
21 | 1 | import re |
|
22 | 1 | import sys |
|
23 | 1 | import threading |
|
24 | 1 | from concurrent.futures import ThreadPoolExecutor |
|
25 | 1 | from importlib import import_module |
|
26 | 1 | from importlib import reload as reload_module |
|
27 | 1 | from importlib.util import module_from_spec, spec_from_file_location |
|
28 | 1 | from pathlib import Path |
|
29 | |||
30 | 1 | from kytos.core.api_server import APIServer |
|
31 | # from kytos.core.tcp_server import KytosRequestHandler, KytosServer |
||
32 | 1 | from kytos.core.atcp_server import KytosServer, KytosServerProtocol |
|
33 | 1 | from kytos.core.auth import Auth |
|
34 | 1 | from kytos.core.buffers import KytosBuffers |
|
35 | 1 | from kytos.core.config import KytosConfig |
|
36 | 1 | from kytos.core.connection import ConnectionState |
|
37 | 1 | from kytos.core.events import KytosEvent |
|
38 | 1 | from kytos.core.helpers import now |
|
39 | 1 | from kytos.core.interface import Interface |
|
40 | 1 | from kytos.core.logs import LogManager |
|
41 | 1 | from kytos.core.napps.base import NApp |
|
42 | 1 | from kytos.core.napps.manager import NAppsManager |
|
43 | 1 | from kytos.core.napps.napp_dir_listener import NAppDirListener |
|
44 | 1 | from kytos.core.switch import Switch |
|
45 | |||
46 | 1 | __all__ = ('Controller',) |
|
47 | |||
48 | |||
49 | 1 | def exc_handler(_, exc, __): |
|
50 | """Log uncaught exceptions. |
||
51 | |||
52 | Args: |
||
53 | exc_type (ignored): exception type |
||
54 | exc: exception instance |
||
55 | tb (ignored): traceback |
||
56 | """ |
||
57 | logging.basicConfig(filename='errlog.log', |
||
58 | format='%(asctime)s:%(pathname)' |
||
59 | 's:%(levelname)s:%(message)s') |
||
60 | logging.exception('Uncaught Exception: %s', exc) |
||
61 | |||
62 | |||
63 | 1 | class Controller: |
|
64 | """Main class of Kytos. |
||
65 | |||
66 | The main responsibilities of this class are: |
||
67 | - start a thread with :class:`~.core.tcp_server.KytosServer`; |
||
68 | - manage KytosNApps (install, load and unload); |
||
69 | - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`); |
||
70 | - manage which event should be sent to NApps methods; |
||
71 | - manage the buffers handlers, considering one thread per handler. |
||
72 | """ |
||
73 | |||
74 | # Created issue #568 for the disabled checks. |
||
75 | # pylint: disable=too-many-instance-attributes,too-many-public-methods |
||
76 | 1 | def __init__(self, options=None, loop=None): |
|
77 | """Init method of Controller class takes the parameters below. |
||
78 | |||
79 | Args: |
||
80 | options (:attr:`ParseArgs.args`): :attr:`options` attribute from an |
||
81 | instance of :class:`~kytos.core.config.KytosConfig` class. |
||
82 | """ |
||
83 | 1 | if options is None: |
|
84 | options = KytosConfig().options['daemon'] |
||
85 | |||
86 | 1 | self._loop = loop or asyncio.get_event_loop() |
|
87 | 1 | self._pool = ThreadPoolExecutor(max_workers=1) |
|
88 | # asyncio tasks |
||
89 | 1 | self._tasks = [] |
|
90 | |||
91 | #: dict: keep the main threads of the controller (buffers and handler) |
||
92 | 1 | self._threads = {} |
|
93 | #: KytosBuffers: KytosBuffer object with Controller buffers |
||
94 | 1 | self.buffers = KytosBuffers(loop=self._loop) |
|
95 | #: dict: keep track of the socket connections labeled by ``(ip, port)`` |
||
96 | #: |
||
97 | #: This dict stores all connections between the controller and the |
||
98 | #: switches. The key for this dict is a tuple (ip, port). The content |
||
99 | #: is another dict with the connection information. |
||
100 | 1 | self.connections = {} |
|
101 | #: dict: mapping of events and event listeners. |
||
102 | #: |
||
103 | #: The key of the dict is a KytosEvent (or a string that represent a |
||
104 | #: regex to match against KytosEvents) and the value is a list of |
||
105 | #: methods that will receive the referenced event |
||
106 | 1 | self.events_listeners = {'kytos/core.connection.new': |
|
107 | [self.new_connection]} |
||
108 | |||
109 | #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance) |
||
110 | #: |
||
111 | #: The key is the napp name (string), while the value is the napp |
||
112 | #: instance itself. |
||
113 | 1 | self.napps = {} |
|
114 | #: Object generated by ParseArgs on config.py file |
||
115 | 1 | self.options = options |
|
116 | #: KytosServer: Instance of KytosServer that will be listening to TCP |
||
117 | #: connections. |
||
118 | 1 | self.server = None |
|
119 | #: dict: Current existing switches. |
||
120 | #: |
||
121 | #: The key is the switch dpid, while the value is a Switch object. |
||
122 | 1 | self.switches = {} # dpid: Switch() |
|
123 | |||
124 | #: datetime.datetime: Time when the controller finished starting. |
||
125 | 1 | self.started_at = None |
|
126 | |||
127 | #: logging.Logger: Logger instance used by Kytos. |
||
128 | 1 | self.log = None |
|
129 | |||
130 | #: Observer that handle NApps when they are enabled or disabled. |
||
131 | 1 | self.napp_dir_listener = NAppDirListener(self) |
|
132 | |||
133 | 1 | self.napps_manager = NAppsManager(self) |
|
134 | |||
135 | #: API Server used to expose rest endpoints. |
||
136 | 1 | self.api_server = APIServer(__name__, self.options.listen, |
|
137 | self.options.api_port, |
||
138 | self.napps_manager, self.options.napps) |
||
139 | |||
140 | 1 | self.auth = Auth(self) |
|
141 | |||
142 | 1 | self._register_endpoints() |
|
143 | #: Adding the napps 'enabled' directory into the PATH |
||
144 | #: Now you can access the enabled napps with: |
||
145 | #: from napps.<username>.<napp_name> import ?.... |
||
146 | 1 | sys.path.append(os.path.join(self.options.napps, os.pardir)) |
|
147 | 1 | sys.excepthook = exc_handler |
|
148 | |||
149 | 1 | def enable_logs(self): |
|
150 | """Register kytos log and enable the logs.""" |
||
151 | 1 | LogManager.load_config_file(self.options.logging, self.options.debug) |
|
152 | 1 | LogManager.enable_websocket(self.api_server.server) |
|
153 | 1 | self.log = logging.getLogger(__name__) |
|
154 | |||
155 | 1 | @staticmethod |
|
156 | def loggers(): |
||
157 | """List all logging Loggers. |
||
158 | |||
159 | Return a list of Logger objects, with name and logging level. |
||
160 | """ |
||
161 | 1 | return [logging.getLogger(name) |
|
162 | for name in logging.root.manager.loggerDict |
||
163 | if "kytos" in name] |
||
164 | |||
165 | 1 | def toggle_debug(self, name=None): |
|
166 | """Enable/disable logging debug messages to a given logger name. |
||
167 | |||
168 | If the name parameter is not specified the debug will be |
||
169 | enabled/disabled following the initial config file. It will decide |
||
170 | to enable/disable using the 'kytos' name to find the current |
||
171 | logger level. |
||
172 | Obs: To disable the debug the logging will be set to NOTSET |
||
173 | |||
174 | Args: |
||
175 | name(text): Full hierarchy Logger name. Ex: "kytos.core.controller" |
||
176 | """ |
||
177 | 1 | if name and name not in logging.root.manager.loggerDict: |
|
178 | # A Logger name that is not declared in logging will raise an error |
||
179 | # otherwise logging would create a new Logger. |
||
180 | 1 | raise ValueError(f"Invalid logger name: {name}") |
|
181 | |||
182 | 1 | if not name: |
|
183 | # Logger name not specified. |
||
184 | 1 | level = logging.getLogger('kytos').getEffectiveLevel() |
|
185 | 1 | enable_debug = level != logging.DEBUG |
|
186 | |||
187 | # Enable/disable default Loggers |
||
188 | 1 | LogManager.load_config_file(self.options.logging, enable_debug) |
|
189 | 1 | return |
|
190 | |||
191 | # Get effective logger level for the name |
||
192 | 1 | level = logging.getLogger(name).getEffectiveLevel() |
|
193 | 1 | logger = logging.getLogger(name) |
|
194 | |||
195 | 1 | if level == logging.DEBUG: |
|
196 | # disable debug |
||
197 | 1 | logger.setLevel(logging.NOTSET) |
|
198 | else: |
||
199 | # enable debug |
||
200 | 1 | logger.setLevel(logging.DEBUG) |
|
201 | |||
202 | 1 | def start(self, restart=False): |
|
203 | """Create pidfile and call start_controller method.""" |
||
204 | 1 | self.enable_logs() |
|
205 | 1 | if not restart: |
|
206 | 1 | self.create_pidfile() |
|
207 | 1 | self.start_controller() |
|
208 | |||
209 | 1 | def create_pidfile(self): |
|
210 | """Create a pidfile.""" |
||
211 | 1 | pid = os.getpid() |
|
212 | |||
213 | # Creates directory if it doesn't exist |
||
214 | # System can erase /var/run's content |
||
215 | 1 | pid_folder = Path(self.options.pidfile).parent |
|
216 | 1 | self.log.info(pid_folder) |
|
217 | |||
218 | # Pylint incorrectly infers Path objects |
||
219 | # https://github.com/PyCQA/pylint/issues/224 |
||
220 | # pylint: disable=no-member |
||
221 | 1 | if not pid_folder.exists(): |
|
222 | pid_folder.mkdir() |
||
223 | pid_folder.chmod(0o1777) |
||
224 | # pylint: enable=no-member |
||
225 | |||
226 | # Make sure the file is deleted when controller stops |
||
227 | 1 | atexit.register(Path(self.options.pidfile).unlink) |
|
228 | |||
229 | # Checks if a pidfile exists. Creates a new file. |
||
230 | 1 | try: |
|
231 | 1 | pidfile = open(self.options.pidfile, mode='x') |
|
232 | 1 | except OSError: |
|
233 | # This happens if there is a pidfile already. |
||
234 | # We shall check if the process that created the pidfile is still |
||
235 | # running. |
||
236 | 1 | try: |
|
237 | 1 | existing_file = open(self.options.pidfile, mode='r') |
|
238 | 1 | old_pid = int(existing_file.read()) |
|
239 | 1 | os.kill(old_pid, 0) |
|
240 | # If kill() doesn't return an error, there's a process running |
||
241 | # with the same PID. We assume it is Kytos and quit. |
||
242 | # Otherwise, overwrite the file and proceed. |
||
243 | error_msg = ("PID file {} exists. Delete it if Kytos is not " |
||
244 | "running. Aborting.") |
||
245 | sys.exit(error_msg.format(self.options.pidfile)) |
||
246 | 1 | except OSError: |
|
247 | 1 | try: |
|
248 | 1 | pidfile = open(self.options.pidfile, mode='w') |
|
249 | except OSError as exception: |
||
250 | error_msg = "Failed to create pidfile {}: {}." |
||
251 | sys.exit(error_msg.format(self.options.pidfile, exception)) |
||
252 | |||
253 | # Identifies the process that created the pidfile. |
||
254 | 1 | pidfile.write(str(pid)) |
|
255 | 1 | pidfile.close() |
|
256 | |||
257 | 1 | def start_controller(self): |
|
258 | """Start the controller. |
||
259 | |||
260 | Starts the KytosServer (TCP Server) coroutine. |
||
261 | Starts a thread for each buffer handler. |
||
262 | Load the installed apps. |
||
263 | """ |
||
264 | 1 | self.log.info("Starting Kytos - Kytos Controller") |
|
265 | 1 | self.server = KytosServer((self.options.listen, |
|
266 | int(self.options.port)), |
||
267 | KytosServerProtocol, |
||
268 | self, |
||
269 | self.options.protocol_name) |
||
270 | |||
271 | 1 | self.log.info("Starting TCP server: %s", self.server) |
|
272 | 1 | self.server.serve_forever() |
|
273 | |||
274 | 1 | def _stop_loop(_): |
|
275 | loop = asyncio.get_event_loop() |
||
276 | # print(_.result()) |
||
277 | threads = threading.enumerate() |
||
278 | self.log.debug("%s threads before loop.stop: %s", |
||
279 | len(threads), threads) |
||
280 | loop.stop() |
||
281 | |||
282 | 1 | async def _run_api_server_thread(executor): |
|
283 | log = logging.getLogger('kytos.core.controller.api_server_thread') |
||
284 | log.debug('starting') |
||
285 | # log.debug('creating tasks') |
||
286 | loop = asyncio.get_event_loop() |
||
287 | blocking_tasks = [ |
||
288 | loop.run_in_executor(executor, self.api_server.run) |
||
289 | ] |
||
290 | # log.debug('waiting for tasks') |
||
291 | completed, pending = await asyncio.wait(blocking_tasks) |
||
292 | # results = [t.result() for t in completed] |
||
293 | # log.debug('results: {!r}'.format(results)) |
||
294 | log.debug('completed: %d, pending: %d', |
||
295 | len(completed), len(pending)) |
||
296 | |||
297 | 1 | task = self._loop.create_task(self.raw_event_handler()) |
|
298 | 1 | self._tasks.append(task) |
|
299 | 1 | task = self._loop.create_task(self.msg_in_event_handler()) |
|
300 | 1 | self._tasks.append(task) |
|
301 | 1 | task = self._loop.create_task(self.msg_out_event_handler()) |
|
302 | 1 | self._tasks.append(task) |
|
303 | 1 | task = self._loop.create_task(self.app_event_handler()) |
|
304 | 1 | self._tasks.append(task) |
|
305 | 1 | task = self._loop.create_task(_run_api_server_thread(self._pool)) |
|
306 | 1 | task.add_done_callback(_stop_loop) |
|
307 | 1 | self._tasks.append(task) |
|
308 | |||
309 | 1 | self.log.info("ThreadPool started: %s", self._pool) |
|
310 | |||
311 | # ASYNC TODO: ensure all threads started correctly |
||
312 | # This is critical, if any of them failed starting we should exit. |
||
313 | # sys.exit(error_msg.format(thread, exception)) |
||
314 | |||
315 | 1 | self.log.info("Loading Kytos NApps...") |
|
316 | 1 | self.napp_dir_listener.start() |
|
317 | 1 | self.pre_install_napps(self.options.napps_pre_installed) |
|
318 | 1 | self.load_napps() |
|
319 | |||
320 | 1 | self.started_at = now() |
|
321 | |||
322 | 1 | def _register_endpoints(self): |
|
323 | """Register all rest endpoint served by kytos. |
||
324 | |||
325 | - Register APIServer endpoints |
||
326 | - Register WebUI endpoints |
||
327 | - Register ``/api/kytos/core/config`` endpoint |
||
328 | """ |
||
329 | 1 | self.api_server.start_api() |
|
330 | # Register controller endpoints as /api/kytos/core/... |
||
331 | 1 | self.api_server.register_core_endpoint('config/', |
|
332 | self.configuration_endpoint) |
||
333 | 1 | self.api_server.register_core_endpoint('metadata/', |
|
334 | Controller.metadata_endpoint) |
||
335 | 1 | self.api_server.register_core_endpoint( |
|
336 | 'reload/<username>/<napp_name>/', |
||
337 | self.rest_reload_napp) |
||
338 | 1 | self.api_server.register_core_endpoint('reload/all', |
|
339 | self.rest_reload_all_napps) |
||
340 | 1 | self.auth.register_core_auth_services() |
|
341 | |||
342 | 1 | def register_rest_endpoint(self, url, function, methods): |
|
343 | """Deprecate in favor of @rest decorator.""" |
||
344 | 1 | self.api_server.register_rest_endpoint(url, function, methods) |
|
345 | |||
346 | 1 | @classmethod |
|
347 | def metadata_endpoint(cls): |
||
348 | """Return the Kytos metadata. |
||
349 | |||
350 | Returns: |
||
351 | string: Json with current kytos metadata. |
||
352 | |||
353 | """ |
||
354 | 1 | meta_path = "%s/metadata.py" % os.path.dirname(__file__) |
|
355 | 1 | meta_file = open(meta_path).read() |
|
356 | 1 | metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file)) |
|
357 | 1 | return json.dumps(metadata) |
|
358 | |||
359 | 1 | def configuration_endpoint(self): |
|
360 | """Return the configuration options used by Kytos. |
||
361 | |||
362 | Returns: |
||
363 | string: Json with current configurations used by kytos. |
||
364 | |||
365 | """ |
||
366 | 1 | return json.dumps(self.options.__dict__) |
|
367 | |||
368 | 1 | def restart(self, graceful=True): |
|
369 | """Restart Kytos SDN Controller. |
||
370 | |||
371 | Args: |
||
372 | graceful(bool): Represents the way that Kytos will restart. |
||
373 | """ |
||
374 | 1 | if self.started_at is not None: |
|
375 | 1 | self.stop(graceful) |
|
376 | 1 | self.__init__(self.options) |
|
377 | |||
378 | 1 | self.start(restart=True) |
|
379 | |||
380 | 1 | def stop(self, graceful=True): |
|
381 | """Shutdown all services used by kytos. |
||
382 | |||
383 | This method should: |
||
384 | - stop all Websockets |
||
385 | - stop the API Server |
||
386 | - stop the Controller |
||
387 | """ |
||
388 | 1 | if self.started_at: |
|
389 | 1 | self.stop_controller(graceful) |
|
390 | |||
391 | 1 | def stop_controller(self, graceful=True): |
|
392 | """Stop the controller. |
||
393 | |||
394 | This method should: |
||
395 | - announce on the network that the controller will shutdown; |
||
396 | - stop receiving incoming packages; |
||
397 | - call the 'shutdown' method of each KytosNApp that is running; |
||
398 | - finish reading the events on all buffers; |
||
399 | - stop each running handler; |
||
400 | - stop all running threads; |
||
401 | - stop the KytosServer; |
||
402 | """ |
||
403 | 1 | self.log.info("Stopping Kytos") |
|
404 | |||
405 | 1 | self.buffers.send_stop_signal() |
|
406 | 1 | self.api_server.stop_api_server() |
|
407 | 1 | self.napp_dir_listener.stop() |
|
408 | |||
409 | 1 | self.log.info("Stopping threadpool: %s", self._pool) |
|
410 | |||
411 | 1 | threads = threading.enumerate() |
|
412 | 1 | self.log.debug("%s threads before threadpool shutdown: %s", |
|
413 | len(threads), threads) |
||
414 | |||
415 | 1 | try: |
|
416 | # Python >= 3.9 |
||
417 | # pylint: disable=unexpected-keyword-arg |
||
418 | 1 | self._pool.shutdown(wait=graceful, cancel_futures=True) |
|
419 | # pylint: enable=unexpected-keyword-arg |
||
420 | except TypeError: |
||
421 | self._pool.shutdown(wait=graceful) |
||
422 | |||
423 | # self.server.socket.shutdown() |
||
424 | # self.server.socket.close() |
||
425 | |||
426 | # for thread in self._threads.values(): |
||
427 | # self.log.info("Stopping thread: %s", thread.name) |
||
428 | # thread.join() |
||
429 | |||
430 | # for thread in self._threads.values(): |
||
431 | # while thread.is_alive(): |
||
432 | # self.log.info("Thread is alive: %s", thread.name) |
||
433 | # pass |
||
434 | |||
435 | 1 | self.started_at = None |
|
436 | 1 | self.unload_napps() |
|
437 | 1 | self.buffers = KytosBuffers() |
|
438 | |||
439 | # Cancel all async tasks (event handlers and servers) |
||
440 | 1 | for task in self._tasks: |
|
441 | task.cancel() |
||
442 | |||
443 | # ASYNC TODO: close connections |
||
444 | # self.server.server_close() |
||
445 | |||
446 | # Shutdown the TCP server and the main asyncio loop |
||
447 | 1 | self.server.shutdown() |
|
448 | |||
449 | 1 | def status(self): |
|
450 | """Return status of Kytos Server. |
||
451 | |||
452 | If the controller kytos is running this method will be returned |
||
453 | "Running since 'Started_At'", otherwise "Stopped". |
||
454 | |||
455 | Returns: |
||
456 | string: String with kytos status. |
||
457 | |||
458 | """ |
||
459 | 1 | if self.started_at: |
|
460 | 1 | return "Running since %s" % self.started_at |
|
461 | 1 | return "Stopped" |
|
462 | |||
463 | 1 | def uptime(self): |
|
464 | """Return the uptime of kytos server. |
||
465 | |||
466 | This method should return: |
||
467 | - 0 if Kytos Server is stopped. |
||
468 | - (kytos.start_at - datetime.now) if Kytos Server is running. |
||
469 | |||
470 | Returns: |
||
471 | datetime.timedelta: The uptime interval. |
||
472 | |||
473 | """ |
||
474 | 1 | return now() - self.started_at if self.started_at else 0 |
|
475 | |||
476 | 1 | def notify_listeners(self, event): |
|
477 | """Send the event to the specified listeners. |
||
478 | |||
479 | Loops over self.events_listeners matching (by regexp) the attribute |
||
480 | name of the event with the keys of events_listeners. If a match occurs, |
||
481 | then send the event to each registered listener. |
||
482 | |||
483 | Args: |
||
484 | event (~kytos.core.KytosEvent): An instance of a KytosEvent. |
||
485 | """ |
||
486 | 1 | self.log.debug("looking for listeners for %s", event) |
|
487 | 1 | for event_regex, listeners in dict(self.events_listeners).items(): |
|
488 | # self.log.debug("listeners found for %s: %r => %s", event, |
||
489 | # event_regex, [l.__qualname__ for l in listeners]) |
||
490 | # Do not match if the event has more characters |
||
491 | # e.g. "shutdown" won't match "shutdown.kytos/of_core" |
||
492 | 1 | if event_regex[-1] != '$' or event_regex[-2] == '\\': |
|
493 | 1 | event_regex += '$' |
|
494 | 1 | if re.match(event_regex, event.name): |
|
495 | # self.log.debug('Calling listeners for %s', event) |
||
496 | 1 | for listener in listeners: |
|
497 | 1 | listener(event) |
|
498 | |||
499 | 1 | async def raw_event_handler(self): |
|
500 | """Handle raw events. |
||
501 | |||
502 | Listen to the raw_buffer and send all its events to the |
||
503 | corresponding listeners. |
||
504 | """ |
||
505 | 1 | self.log.info("Raw Event Handler started") |
|
506 | 1 | while True: |
|
507 | 1 | event = await self.buffers.raw.aget() |
|
508 | 1 | self.notify_listeners(event) |
|
509 | 1 | self.log.debug("Raw Event handler called") |
|
510 | |||
511 | 1 | if event.name == "kytos/core.shutdown": |
|
512 | 1 | self.log.debug("Raw Event handler stopped") |
|
513 | 1 | break |
|
514 | |||
515 | 1 | async def msg_in_event_handler(self): |
|
516 | """Handle msg_in events. |
||
517 | |||
518 | Listen to the msg_in buffer and send all its events to the |
||
519 | corresponding listeners. |
||
520 | """ |
||
521 | 1 | self.log.info("Message In Event Handler started") |
|
522 | 1 | while True: |
|
523 | 1 | event = await self.buffers.msg_in.aget() |
|
524 | 1 | self.notify_listeners(event) |
|
525 | 1 | self.log.debug("Message In Event handler called") |
|
526 | |||
527 | 1 | if event.name == "kytos/core.shutdown": |
|
528 | 1 | self.log.debug("Message In Event handler stopped") |
|
529 | 1 | break |
|
530 | |||
531 | 1 | async def msg_out_event_handler(self): |
|
532 | """Handle msg_out events. |
||
533 | |||
534 | Listen to the msg_out buffer and send all its events to the |
||
535 | corresponding listeners. |
||
536 | """ |
||
537 | 1 | self.log.info("Message Out Event Handler started") |
|
538 | 1 | while True: |
|
539 | 1 | triggered_event = await self.buffers.msg_out.aget() |
|
540 | |||
541 | 1 | if triggered_event.name == "kytos/core.shutdown": |
|
542 | 1 | self.log.debug("Message Out Event handler stopped") |
|
543 | 1 | break |
|
544 | |||
545 | 1 | message = triggered_event.content['message'] |
|
546 | 1 | destination = triggered_event.destination |
|
547 | 1 | if (destination and |
|
548 | not destination.state == ConnectionState.FINISHED): |
||
549 | 1 | packet = message.pack() |
|
550 | 1 | destination.send(packet) |
|
551 | 1 | self.log.debug('Connection %s: OUT OFP, ' |
|
552 | 'version: %s, type: %s, xid: %s - %s', |
||
553 | destination.id, |
||
554 | message.header.version, |
||
555 | message.header.message_type, |
||
556 | message.header.xid, |
||
557 | packet.hex()) |
||
558 | 1 | self.notify_listeners(triggered_event) |
|
559 | 1 | self.log.debug("Message Out Event handler called") |
|
560 | else: |
||
561 | self.log.info("connection closed. Cannot send message") |
||
562 | |||
563 | 1 | async def app_event_handler(self): |
|
564 | """Handle app events. |
||
565 | |||
566 | Listen to the app buffer and send all its events to the |
||
567 | corresponding listeners. |
||
568 | """ |
||
569 | 1 | self.log.info("App Event Handler started") |
|
570 | 1 | while True: |
|
571 | 1 | event = await self.buffers.app.aget() |
|
572 | 1 | self.notify_listeners(event) |
|
573 | 1 | self.log.debug("App Event handler called") |
|
574 | |||
575 | 1 | if event.name == "kytos/core.shutdown": |
|
576 | 1 | self.log.debug("App Event handler stopped") |
|
577 | 1 | break |
|
578 | |||
579 | 1 | def get_interface_by_id(self, interface_id): |
|
580 | """Find a Interface with interface_id. |
||
581 | |||
582 | Args: |
||
583 | interface_id(str): Interface Identifier. |
||
584 | |||
585 | Returns: |
||
586 | Interface: Instance of Interface with the id given. |
||
587 | |||
588 | """ |
||
589 | 1 | if interface_id is None: |
|
590 | 1 | return None |
|
591 | |||
592 | 1 | switch_id = ":".join(interface_id.split(":")[:-1]) |
|
593 | 1 | interface_number = int(interface_id.split(":")[-1]) |
|
594 | |||
595 | 1 | switch = self.switches.get(switch_id) |
|
596 | |||
597 | 1 | if not switch: |
|
598 | 1 | return None |
|
599 | |||
600 | 1 | return switch.interfaces.get(interface_number, None) |
|
601 | |||
602 | 1 | def get_switch_by_dpid(self, dpid): |
|
603 | """Return a specific switch by dpid. |
||
604 | |||
605 | Args: |
||
606 | dpid (|DPID|): dpid object used to identify a switch. |
||
607 | |||
608 | Returns: |
||
609 | :class:`~kytos.core.switch.Switch`: Switch with dpid specified. |
||
610 | |||
611 | """ |
||
612 | 1 | return self.switches.get(dpid) |
|
613 | |||
614 | 1 | def get_switch_or_create(self, dpid, connection): |
|
615 | """Return switch or create it if necessary. |
||
616 | |||
617 | Args: |
||
618 | dpid (|DPID|): dpid object used to identify a switch. |
||
619 | connection (:class:`~kytos.core.connection.Connection`): |
||
620 | connection used by switch. If a switch has a connection that |
||
621 | will be updated. |
||
622 | |||
623 | Returns: |
||
624 | :class:`~kytos.core.switch.Switch`: new or existent switch. |
||
625 | |||
626 | """ |
||
627 | 1 | self.create_or_update_connection(connection) |
|
628 | 1 | switch = self.get_switch_by_dpid(dpid) |
|
629 | 1 | event_name = 'kytos/core.switch.' |
|
630 | |||
631 | 1 | if switch is None: |
|
632 | 1 | switch = Switch(dpid=dpid) |
|
633 | 1 | self.add_new_switch(switch) |
|
634 | 1 | event_name += 'new' |
|
635 | else: |
||
636 | 1 | event_name += 'reconnected' |
|
637 | |||
638 | 1 | self.set_switch_options(dpid=dpid) |
|
639 | 1 | event = KytosEvent(name=event_name, content={'switch': switch}) |
|
640 | |||
641 | 1 | old_connection = switch.connection |
|
642 | 1 | switch.update_connection(connection) |
|
643 | |||
644 | 1 | if old_connection is not connection: |
|
645 | 1 | self.remove_connection(old_connection) |
|
646 | |||
647 | 1 | self.buffers.app.put(event) |
|
648 | |||
649 | 1 | return switch |
|
650 | |||
651 | 1 | def set_switch_options(self, dpid): |
|
652 | """Update the switch settings based on kytos.conf options. |
||
653 | |||
654 | Args: |
||
655 | dpid (str): dpid used to identify a switch. |
||
656 | |||
657 | """ |
||
658 | 1 | switch = self.switches.get(dpid) |
|
659 | 1 | if not switch: |
|
660 | return |
||
661 | |||
662 | 1 | vlan_pool = {} |
|
663 | 1 | vlan_pool = self.options.vlan_pool |
|
664 | 1 | if not vlan_pool: |
|
665 | 1 | return |
|
666 | |||
667 | 1 | if vlan_pool.get(dpid): |
|
668 | 1 | self.log.info("Loading vlan_pool configuration for dpid %s", dpid) |
|
669 | 1 | for intf_num, port_list in vlan_pool[dpid].items(): |
|
670 | 1 | if not switch.interfaces.get((intf_num)): |
|
671 | 1 | vlan_ids = set() |
|
672 | 1 | for vlan_range in port_list: |
|
673 | 1 | (vlan_begin, vlan_end) = (vlan_range[0:2]) |
|
674 | 1 | for vlan_id in range(vlan_begin, vlan_end): |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
675 | 1 | vlan_ids.add(vlan_id) |
|
676 | 1 | intf_num = int(intf_num) |
|
677 | 1 | intf = Interface(name=intf_num, port_number=intf_num, |
|
678 | switch=switch) |
||
679 | 1 | intf.set_available_tags(vlan_ids) |
|
680 | 1 | switch.update_interface(intf) |
|
681 | |||
682 | 1 | def create_or_update_connection(self, connection): |
|
683 | """Update a connection. |
||
684 | |||
685 | Args: |
||
686 | connection (:class:`~kytos.core.connection.Connection`): |
||
687 | Instance of connection that will be updated. |
||
688 | """ |
||
689 | 1 | self.connections[connection.id] = connection |
|
690 | |||
691 | 1 | def get_connection_by_id(self, conn_id): |
|
692 | """Return a existent connection by id. |
||
693 | |||
694 | Args: |
||
695 | id (int): id from a connection. |
||
696 | |||
697 | Returns: |
||
698 | :class:`~kytos.core.connection.Connection`: Instance of connection |
||
699 | or None Type. |
||
700 | |||
701 | """ |
||
702 | 1 | return self.connections.get(conn_id) |
|
703 | |||
704 | 1 | def remove_connection(self, connection): |
|
705 | """Close a existent connection and remove it. |
||
706 | |||
707 | Args: |
||
708 | connection (:class:`~kytos.core.connection.Connection`): |
||
709 | Instance of connection that will be removed. |
||
710 | """ |
||
711 | 1 | if connection is None: |
|
712 | 1 | return False |
|
713 | |||
714 | 1 | try: |
|
715 | 1 | connection.close() |
|
716 | 1 | del self.connections[connection.id] |
|
717 | 1 | except KeyError: |
|
718 | 1 | return False |
|
719 | 1 | return True |
|
720 | |||
721 | 1 | def remove_switch(self, switch): |
|
722 | """Remove an existent switch. |
||
723 | |||
724 | Args: |
||
725 | switch (:class:`~kytos.core.switch.Switch`): |
||
726 | Instance of switch that will be removed. |
||
727 | """ |
||
728 | 1 | try: |
|
729 | 1 | del self.switches[switch.dpid] |
|
730 | 1 | except KeyError: |
|
731 | 1 | return False |
|
732 | 1 | return True |
|
733 | |||
734 | 1 | def new_connection(self, event): |
|
735 | """Handle a kytos/core.connection.new event. |
||
736 | |||
737 | This method will read new connection event and store the connection |
||
738 | (socket) into the connections attribute on the controller. |
||
739 | |||
740 | It also clear all references to the connection since it is a new |
||
741 | connection on the same ip:port. |
||
742 | |||
743 | Args: |
||
744 | event (~kytos.core.KytosEvent): |
||
745 | The received event (``kytos/core.connection.new``) with the |
||
746 | needed info. |
||
747 | """ |
||
748 | 1 | self.log.info("Handling %s...", event) |
|
749 | |||
750 | 1 | connection = event.source |
|
751 | 1 | self.log.debug("Event source: %s", event.source) |
|
752 | |||
753 | # Remove old connection (aka cleanup) if it exists |
||
754 | 1 | if self.get_connection_by_id(connection.id): |
|
755 | self.remove_connection(connection.id) |
||
756 | |||
757 | # Update connections with the new connection |
||
758 | 1 | self.create_or_update_connection(connection) |
|
759 | |||
760 | 1 | def add_new_switch(self, switch): |
|
761 | """Add a new switch on the controller. |
||
762 | |||
763 | Args: |
||
764 | switch (Switch): A Switch object |
||
765 | """ |
||
766 | 1 | self.switches[switch.dpid] = switch |
|
767 | |||
768 | 1 | def _import_napp(self, username, napp_name): |
|
769 | """Import a NApp module. |
||
770 | |||
771 | Raises: |
||
772 | FileNotFoundError: if NApp's main.py is not found. |
||
773 | ModuleNotFoundError: if any NApp requirement is not installed. |
||
774 | |||
775 | """ |
||
776 | 1 | mod_name = '.'.join(['napps', username, napp_name, 'main']) |
|
777 | 1 | path = os.path.join(self.options.napps, username, napp_name, |
|
778 | 'main.py') |
||
779 | 1 | napp_spec = spec_from_file_location(mod_name, path) |
|
780 | 1 | napp_module = module_from_spec(napp_spec) |
|
781 | 1 | sys.modules[napp_spec.name] = napp_module |
|
782 | 1 | napp_spec.loader.exec_module(napp_module) |
|
783 | 1 | return napp_module |
|
784 | |||
785 | 1 | def load_napp(self, username, napp_name): |
|
786 | """Load a single NApp. |
||
787 | |||
788 | Args: |
||
789 | username (str): NApp username (makes up NApp's path). |
||
790 | napp_name (str): Name of the NApp to be loaded. |
||
791 | """ |
||
792 | 1 | if (username, napp_name) in self.napps: |
|
793 | 1 | message = 'NApp %s/%s was already loaded' |
|
794 | 1 | self.log.warning(message, username, napp_name) |
|
795 | 1 | return |
|
796 | |||
797 | 1 | try: |
|
798 | 1 | napp_module = self._import_napp(username, napp_name) |
|
799 | 1 | except ModuleNotFoundError as err: |
|
800 | 1 | self.log.error("Error loading NApp '%s/%s': %s", |
|
801 | username, napp_name, err) |
||
802 | 1 | return |
|
803 | 1 | except FileNotFoundError as err: |
|
804 | 1 | msg = "NApp module not found, assuming it's a meta-napp: %s" |
|
805 | 1 | self.log.warning(msg, err.filename) |
|
806 | 1 | return |
|
807 | |||
808 | 1 | try: |
|
809 | 1 | napp = napp_module.Main(controller=self) |
|
810 | 1 | except: # noqa pylint: disable=bare-except |
|
811 | 1 | self.log.critical("NApp initialization failed: %s/%s", |
|
812 | username, napp_name, exc_info=True) |
||
813 | 1 | return |
|
814 | |||
815 | 1 | self.napps[(username, napp_name)] = napp |
|
816 | |||
817 | 1 | napp.start() |
|
818 | 1 | self.api_server.authenticate_endpoints(napp) |
|
819 | 1 | self.api_server.register_napp_endpoints(napp) |
|
820 | |||
821 | # pylint: disable=protected-access |
||
822 | 1 | for event, listeners in napp._listeners.items(): |
|
823 | self.events_listeners.setdefault(event, []).extend(listeners) |
||
824 | # pylint: enable=protected-access |
||
825 | |||
826 | 1 | def pre_install_napps(self, napps, enable=True): |
|
827 | """Pre install and enable NApps. |
||
828 | |||
829 | Before installing, it'll check if it's installed yet. |
||
830 | |||
831 | Args: |
||
832 | napps ([str]): List of NApps to be pre-installed and enabled. |
||
833 | """ |
||
834 | 1 | all_napps = self.napps_manager.get_installed_napps() |
|
835 | 1 | installed = [str(napp) for napp in all_napps] |
|
836 | 1 | napps_diff = [napp for napp in napps if napp not in installed] |
|
837 | 1 | for napp in napps_diff: |
|
838 | 1 | self.napps_manager.install(napp, enable=enable) |
|
839 | |||
840 | 1 | def load_napps(self): |
|
841 | """Load all NApps enabled on the NApps dir.""" |
||
842 | 1 | for napp in self.napps_manager.get_enabled_napps(): |
|
843 | 1 | try: |
|
844 | 1 | self.log.info("Loading NApp %s", napp.id) |
|
845 | 1 | self.load_napp(napp.username, napp.name) |
|
846 | except FileNotFoundError as exception: |
||
847 | self.log.error("Could not load NApp %s: %s", |
||
848 | napp.id, exception) |
||
849 | |||
850 | 1 | def unload_napp(self, username, napp_name): |
|
851 | """Unload a specific NApp. |
||
852 | |||
853 | Args: |
||
854 | username (str): NApp username. |
||
855 | napp_name (str): Name of the NApp to be unloaded. |
||
856 | """ |
||
857 | 1 | napp = self.napps.pop((username, napp_name), None) |
|
858 | |||
859 | 1 | if napp is None: |
|
860 | self.log.warning('NApp %s/%s was not loaded', username, napp_name) |
||
861 | else: |
||
862 | 1 | self.log.info("Shutting down NApp %s/%s...", username, napp_name) |
|
863 | 1 | napp_id = NApp(username, napp_name).id |
|
864 | 1 | event = KytosEvent(name='kytos/core.shutdown.' + napp_id) |
|
865 | 1 | napp_shutdown_fn = self.events_listeners[event.name][0] |
|
866 | # Call listener before removing it from events_listeners |
||
867 | 1 | napp_shutdown_fn(event) |
|
868 | |||
869 | # Remove rest endpoints from that napp |
||
870 | 1 | self.api_server.remove_napp_endpoints(napp) |
|
871 | |||
872 | # Removing listeners from that napp |
||
873 | # pylint: disable=protected-access |
||
874 | 1 | for event_type, napp_listeners in napp._listeners.items(): |
|
875 | event_listeners = self.events_listeners[event_type] |
||
876 | for listener in napp_listeners: |
||
877 | event_listeners.remove(listener) |
||
878 | if not event_listeners: |
||
879 | del self.events_listeners[event_type] |
||
880 | # pylint: enable=protected-access |
||
881 | |||
882 | 1 | def unload_napps(self): |
|
883 | """Unload all loaded NApps that are not core NApps.""" |
||
884 | # list() is used here to avoid the error: |
||
885 | # 'RuntimeError: dictionary changed size during iteration' |
||
886 | # This is caused by looping over an dictionary while removing |
||
887 | # items from it. |
||
888 | for (username, napp_name) in list(self.napps.keys()): # noqa |
||
889 | self.unload_napp(username, napp_name) |
||
890 | |||
891 | 1 | def reload_napp_module(self, username, napp_name, napp_file): |
|
892 | """Reload a NApp Module.""" |
||
893 | 1 | mod_name = '.'.join(['napps', username, napp_name, napp_file]) |
|
894 | 1 | try: |
|
895 | 1 | napp_module = import_module(mod_name) |
|
896 | 1 | except ModuleNotFoundError as err: |
|
897 | 1 | self.log.error("Module '%s' not found", mod_name) |
|
898 | 1 | raise |
|
899 | 1 | try: |
|
900 | 1 | napp_module = reload_module(napp_module) |
|
901 | 1 | except ImportError as err: |
|
902 | 1 | self.log.error("Error reloading NApp '%s/%s': %s", |
|
903 | username, napp_name, err) |
||
904 | 1 | raise |
|
905 | |||
906 | 1 | def reload_napp(self, username, napp_name): |
|
907 | """Reload a NApp.""" |
||
908 | 1 | self.unload_napp(username, napp_name) |
|
909 | 1 | try: |
|
910 | 1 | self.reload_napp_module(username, napp_name, 'settings') |
|
911 | 1 | self.reload_napp_module(username, napp_name, 'main') |
|
912 | 1 | except (ModuleNotFoundError, ImportError): |
|
913 | 1 | return 400 |
|
914 | 1 | self.log.info("NApp '%s/%s' successfully reloaded", |
|
915 | username, napp_name) |
||
916 | 1 | self.load_napp(username, napp_name) |
|
917 | 1 | return 200 |
|
918 | |||
919 | 1 | def rest_reload_napp(self, username, napp_name): |
|
920 | """Request reload a NApp.""" |
||
921 | 1 | res = self.reload_napp(username, napp_name) |
|
922 | 1 | return 'reloaded', res |
|
923 | |||
924 | 1 | def rest_reload_all_napps(self): |
|
925 | """Request reload all NApps.""" |
||
926 | 1 | for napp in self.napps: |
|
927 | 1 | self.reload_napp(*napp) |
|
928 | return 'reloaded', 200 |
||
929 |