Passed
Pull Request — master (#336)
by Vinicius
04:39
created

kytos.core.controller   F

Complexity

Total Complexity 121

Size/Duplication

Total Lines 1004
Duplicated Lines 0 %

Test Coverage

Coverage 87.85%

Importance

Changes 0
Metric Value
eloc 511
dl 0
loc 1004
rs 2
c 0
b 0
f 0
ccs 412
cts 469
cp 0.8785
wmc 121

45 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.rest_reload_napp() 0 4 1
A Controller.publish_connection_error() 0 11 1
A Controller.start() 0 11 4
B Controller.notify_listeners() 0 28 7
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
B Controller.create_pidfile() 0 49 5
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller.event_handler() 0 11 3
A Controller.init_apm_or_core_shutdown() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
B Controller.get_switch_or_create() 0 40 6
A Controller.configuration_endpoint() 0 8 1
A Controller.loggers() 0 10 1
B Controller.start_controller() 0 71 1
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.rest_reload_all_napps() 0 5 2
A Controller.__init__() 0 75 2
A Controller.new_connection() 0 25 2
A Controller.toggle_debug() 0 37 5
B Controller.set_switch_options() 0 30 8
A Controller.metadata_endpoint() 0 13 2
A Controller.enable_logs() 0 16 3
A Controller.remove_switch() 0 12 2
A Controller.db_conn_or_core_shutdown() 0 6 2
B Controller.msg_out_event_handler() 0 42 7
A Controller.stop_controller() 0 56 4
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller._patch_core_loggers() 0 9 2
A Controller.status() 0 13 2
A Controller.pre_install_napps() 0 13 2
A Controller._register_endpoints() 0 19 1
A Controller.reload_napp() 0 12 2
A Controller.start_auth() 0 4 1
B Controller.load_napp() 0 39 6
A Controller.unload_napp() 0 30 5
A Controller._resolve() 0 6 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A exc_handler() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from pyof.foundation.exceptions import PackException
33
34 1
from kytos.core.api_server import APIServer
35 1
from kytos.core.apm import init_apm
36 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
37 1
from kytos.core.auth import Auth
38 1
from kytos.core.buffers import KytosBuffers
39 1
from kytos.core.config import KytosConfig
40 1
from kytos.core.connection import ConnectionState
41 1
from kytos.core.db import db_conn_wait
42 1
from kytos.core.dead_letter import DeadLetter
43 1
from kytos.core.events import KytosEvent
44 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
45 1
from kytos.core.helpers import executors, now
46 1
from kytos.core.interface import Interface
47 1
from kytos.core.logs import LogManager
48 1
from kytos.core.napps.base import NApp
49 1
from kytos.core.napps.manager import NAppsManager
50 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
51 1
from kytos.core.switch import Switch
52
53 1
__all__ = ('Controller',)
54
55
56 1
def exc_handler(_, exc, __):
57
    """Log uncaught exceptions.
58
59
    Args:
60
        exc_type (ignored): exception type
61
        exc: exception instance
62
        tb (ignored): traceback
63
    """
64
    logging.basicConfig(filename='errlog.log',
65
                        format='%(asctime)s:%(pathname)'
66
                        's:%(levelname)s:%(message)s')
67
    logging.exception('Uncaught Exception: %s', exc)
68
69
70 1
class Controller:
71
    """Main class of Kytos.
72
73
    The main responsibilities of this class are:
74
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
75
        - manage KytosNApps (install, load and unload);
76
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
77
        - manage which event should be sent to NApps methods;
78
        - manage the buffers handlers, considering one thread per handler.
79
    """
80
81
    # Created issue #568 for the disabled checks.
82
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
83
    # pylint: disable=consider-using-with,unnecessary-dunder-call
84
    # pylint: disable=too-many-lines
85 1
    def __init__(self, options=None):
86
        """Init method of Controller class takes the parameters below.
87
88
        Args:
89
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
90
                instance of :class:`~kytos.core.config.KytosConfig` class.
91
        """
92 1
        if options is None:
93
            options = KytosConfig().options['daemon']
94
95 1
        self._pool = ThreadPoolExecutor(max_workers=1)
96
97
        # asyncio tasks
98 1
        self._tasks = []
99
100
        #: dict: keep the main threads of the controller (buffers and handler)
101 1
        self._threads = {}
102
        #: KytosBuffers: KytosBuffer object with Controller buffers
103 1
        self.buffers: KytosBuffers = None
104
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
105
        #:
106
        #: This dict stores all connections between the controller and the
107
        #: switches. The key for this dict is a tuple (ip, port). The content
108
        #: is a Connection
109 1
        self.connections = {}
110
        #: dict: mapping of events and event listeners.
111
        #:
112
        #: The key of the dict is a KytosEvent (or a string that represent a
113
        #: regex to match against KytosEvents) and the value is a list of
114
        #: methods that will receive the referenced event
115 1
        self.events_listeners = {'kytos/core.connection.new':
116
                                 [self.new_connection]}
117
118
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
119
        #:
120
        #: The key is the napp name (string), while the value is the napp
121
        #: instance itself.
122 1
        self.napps = {}
123
        #: Object generated by ParseArgs on config.py file
124 1
        self.options = options
125
        #: KytosServer: Instance of KytosServer that will be listening to TCP
126
        #: connections.
127 1
        self.server = None
128
        #: dict: Current existing switches.
129
        #:
130
        #: The key is the switch dpid, while the value is a Switch object.
131 1
        self.switches = {}  # dpid: Switch()
132 1
        self._switches_lock = threading.Lock()
133
134
        #: datetime.datetime: Time when the controller finished starting.
135 1
        self.started_at = None
136
137
        #: logging.Logger: Logger instance used by Kytos.
138 1
        self.log = None
139
140
        #: Observer that handle NApps when they are enabled or disabled.
141 1
        self.napp_dir_listener = NAppDirListener(self)
142
143 1
        self.napps_manager = NAppsManager(self)
144
145
        #: API Server used to expose rest endpoints.
146 1
        self.api_server = APIServer(__name__, self.options.listen,
147
                                    self.options.api_port,
148
                                    self.napps_manager, self.options.napps)
149
150 1
        self.auth = None
151 1
        self.dead_letter = DeadLetter(self)
152 1
        self._alisten_tasks = set()
153
154 1
        self._register_endpoints()
155
        #: Adding the napps 'enabled' directory into the PATH
156
        #: Now you can access the enabled napps with:
157
        #: from napps.<username>.<napp_name> import ?....
158 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
159 1
        sys.excepthook = exc_handler
160
161 1
    def start_auth(self):
162
        """Initialize Auth() and its services"""
163 1
        self.auth = Auth(self)
164 1
        self.auth.register_core_auth_services()
165
166 1
    def enable_logs(self):
167
        """Register kytos log and enable the logs."""
168 1
        decorators = self.options.logger_decorators
169 1
        try:
170 1
            decorators = [self._resolve(deco) for deco in decorators]
171
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
172
            print(f'Failed to resolve decorator module: {err.name}')
173
            sys.exit(1)
174
        except AttributeError:
175
            print(f'Failed to resolve decorator name: {decorators}')
176
            sys.exit(1)
177 1
        LogManager.decorate_logger_class(*decorators)
178 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
179 1
        LogManager.enable_websocket(self.api_server.server)
180 1
        self.log = logging.getLogger(__name__)
181 1
        self._patch_core_loggers()
182
183 1
    @staticmethod
184 1
    def _resolve(name):
185
        """Resolve a dotted name to a global object."""
186
        mod, _, attr = name.rpartition('.')
187
        mod = importlib.import_module(mod)
188
        return getattr(mod, attr)
189
190 1
    @staticmethod
191 1
    def _patch_core_loggers():
192
        """Patch in updated loggers to 'kytos.core.*' modules"""
193 1
        match_str = 'kytos.core.'
194 1
        str_len = len(match_str)
195 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
196
                           if mod_name[:str_len] == match_str]
197 1
        for module in reloadable_mods:
198 1
            module.LOG = logging.getLogger(module.__name__)
199
200 1
    @staticmethod
201 1
    def loggers():
202
        """List all logging Loggers.
203
204
        Return a list of Logger objects, with name and logging level.
205
        """
206
        # pylint: disable=no-member
207 1
        return [logging.getLogger(name)
208
                for name in logging.root.manager.loggerDict
209
                if "kytos" in name]
210
211 1
    def toggle_debug(self, name=None):
212
        """Enable/disable logging debug messages to a given logger name.
213
214
        If the name parameter is not specified the debug will be
215
        enabled/disabled following the initial config file. It will decide
216
        to enable/disable using the 'kytos' name to find the current
217
        logger level.
218
        Obs: To disable the debug the logging will be set to NOTSET
219
220
        Args:
221
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
222
        """
223
        # pylint: disable=no-member
224 1
        if name and name not in logging.root.manager.loggerDict:
225
            # A Logger name that is not declared in logging will raise an error
226
            # otherwise logging would create a new Logger.
227 1
            raise ValueError(f"Invalid logger name: {name}")
228
229 1
        if not name:
230
            # Logger name not specified.
231 1
            level = logging.getLogger('kytos').getEffectiveLevel()
232 1
            enable_debug = level != logging.DEBUG
233
234
            # Enable/disable default Loggers
235 1
            LogManager.load_config_file(self.options.logging, enable_debug)
236 1
            return
237
238
        # Get effective logger level for the name
239 1
        level = logging.getLogger(name).getEffectiveLevel()
240 1
        logger = logging.getLogger(name)
241
242 1
        if level == logging.DEBUG:
243
            # disable debug
244 1
            logger.setLevel(logging.NOTSET)
245
        else:
246
            # enable debug
247 1
            logger.setLevel(logging.DEBUG)
248
249 1
    def start(self, restart=False):
250
        """Create pidfile and call start_controller method."""
251 1
        self.enable_logs()
252 1
        if self.options.database:
253 1
            self.db_conn_or_core_shutdown()
254 1
            self.start_auth()
255 1
        if self.options.apm:
256 1
            self.init_apm_or_core_shutdown()
257 1
        if not restart:
258 1
            self.create_pidfile()
259 1
        self.start_controller()
260
261 1
    def create_pidfile(self):
262
        """Create a pidfile."""
263 1
        pid = os.getpid()
264
265
        # Creates directory if it doesn't exist
266
        # System can erase /var/run's content
267 1
        pid_folder = Path(self.options.pidfile).parent
268 1
        self.log.info(pid_folder)
269
270
        # Pylint incorrectly infers Path objects
271
        # https://github.com/PyCQA/pylint/issues/224
272
        # pylint: disable=no-member
273 1
        if not pid_folder.exists():
274
            pid_folder.mkdir()
275
            pid_folder.chmod(0o1777)
276
        # pylint: enable=no-member
277
278
        # Make sure the file is deleted when controller stops
279 1
        atexit.register(Path(self.options.pidfile).unlink)
280
281
        # Checks if a pidfile exists. Creates a new file.
282 1
        try:
283 1
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
284 1
        except OSError:
285
            # This happens if there is a pidfile already.
286
            # We shall check if the process that created the pidfile is still
287
            # running.
288 1
            try:
289 1
                existing_file = open(self.options.pidfile, mode='r',
290
                                     encoding="utf8")
291 1
                old_pid = int(existing_file.read())
292 1
                os.kill(old_pid, 0)
293
                # If kill() doesn't return an error, there's a process running
294
                # with the same PID. We assume it is Kytos and quit.
295
                # Otherwise, overwrite the file and proceed.
296
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
297
                             "running. Aborting.")
298
                sys.exit(error_msg.format(self.options.pidfile))
299 1
            except OSError:
300 1
                try:
301 1
                    pidfile = open(self.options.pidfile, mode='w',
302
                                   encoding="utf8")
303
                except OSError as exception:
304
                    error_msg = "Failed to create pidfile {}: {}."
305
                    sys.exit(error_msg.format(self.options.pidfile, exception))
306
307
        # Identifies the process that created the pidfile.
308 1
        pidfile.write(str(pid))
309 1
        pidfile.close()
310
311 1
    def start_controller(self):
312
        """Start the controller.
313
314
        Starts the KytosServer (TCP Server) coroutine.
315
        Starts a thread for each buffer handler.
316
        Load the installed apps.
317
        """
318 1
        self.log.info("Starting Kytos - Kytos Controller")
319 1
        self.buffers = KytosBuffers()
320 1
        self.server = KytosServer((self.options.listen,
321
                                   int(self.options.port)),
322
                                  KytosServerProtocol,
323
                                  self,
324
                                  self.options.protocol_name)
325
326 1
        self.log.info("Starting TCP server: %s", self.server)
327 1
        self.server.serve_forever()
328
329 1
        def _stop_loop(_):
330
            loop = asyncio.get_running_loop()
331
            # print(_.result())
332
            threads = threading.enumerate()
333
            self.log.debug("%s threads before loop.stop: %s",
334
                           len(threads), threads)
335
            loop.stop()
336
337 1
        async def _run_api_server_thread(executor):
338
            log = logging.getLogger('kytos.core.controller.api_server_thread')
339
            log.debug('starting')
340
            # log.debug('creating tasks')
341
            loop = asyncio.get_running_loop()
342
            blocking_tasks = [
343
                loop.run_in_executor(executor, self.api_server.run)
344
            ]
345
            # log.debug('waiting for tasks')
346
            completed, pending = await asyncio.wait(blocking_tasks)
347
            # results = [t.result() for t in completed]
348
            # log.debug('results: {!r}'.format(results))
349
            log.debug('completed: %d, pending: %d',
350
                      len(completed), len(pending))
351
352 1
        loop = asyncio.get_running_loop()
353 1
        task = loop.create_task(_run_api_server_thread(self._pool))
354 1
        task.add_done_callback(_stop_loop)
355 1
        self._tasks.append(task)
356
357 1
        self.log.info("ThreadPool started: %s", self._pool)
358
359
        # ASYNC TODO: ensure all threads started correctly
360
        # This is critical, if any of them failed starting we should exit.
361
        # sys.exit(error_msg.format(thread, exception))
362
363 1
        self.log.info("Loading Kytos NApps...")
364 1
        self.napp_dir_listener.start()
365 1
        self.pre_install_napps(self.options.napps_pre_installed)
366 1
        self.load_napps()
367
368
        # Start task handlers consumers after NApps to potentially
369
        # avoid discarding an event if a consumer isn't ready yet
370 1
        task = loop.create_task(self.event_handler("conn"))
371 1
        self._tasks.append(task)
372 1
        task = loop.create_task(self.event_handler("raw"))
373 1
        self._tasks.append(task)
374 1
        task = loop.create_task(self.event_handler("msg_in"))
375 1
        self._tasks.append(task)
376 1
        task = loop.create_task(self.msg_out_event_handler())
377 1
        self._tasks.append(task)
378 1
        task = loop.create_task(self.event_handler("app"))
379 1
        self._tasks.append(task)
380
381 1
        self.started_at = now()
382
383 1
    def db_conn_or_core_shutdown(self):
384
        """Ensure db connection or core shutdown."""
385 1
        try:
386 1
            db_conn_wait(db_backend=self.options.database)
387 1
        except KytosDBInitException as exc:
388 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
389
390 1
    def init_apm_or_core_shutdown(self, **kwargs):
391
        """Init APM instrumentation or core shutdown."""
392
        if not self.options.apm:
393
            return
394
        try:
395
            init_apm(self.options.apm, app=self.api_server.app,
396
                     **kwargs)
397
        except KytosAPMInitException as exc:
398
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
399
400 1
    def _register_endpoints(self):
401
        """Register all rest endpoint served by kytos.
402
403
        -   Register APIServer endpoints
404
        -   Register WebUI endpoints
405
        -   Register ``/api/kytos/core/config`` endpoint
406
        """
407 1
        self.api_server.start_api()
408
        # Register controller endpoints as /api/kytos/core/...
409 1
        self.api_server.register_core_endpoint('config/',
410
                                               self.configuration_endpoint)
411 1
        self.api_server.register_core_endpoint('metadata/',
412
                                               Controller.metadata_endpoint)
413 1
        self.api_server.register_core_endpoint(
414
            'reload/<username>/<napp_name>/',
415
            self.rest_reload_napp)
416 1
        self.api_server.register_core_endpoint('reload/all',
417
                                               self.rest_reload_all_napps)
418 1
        self.dead_letter.register_endpoints()
419
420 1
    def register_rest_endpoint(self, url, function, methods):
421
        """Deprecate in favor of @rest decorator."""
422 1
        self.api_server.register_rest_endpoint(url, function, methods)
423
424 1
    @classmethod
425 1
    def metadata_endpoint(cls):
426
        """Return the Kytos metadata.
427
428
        Returns:
429
            string: Json with current kytos metadata.
430
431
        """
432 1
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
433 1
        with open(meta_path, encoding="utf8") as file:
434 1
            meta_file = file.read()
435 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
436 1
        return json.dumps(metadata)
437
438 1
    def configuration_endpoint(self):
439
        """Return the configuration options used by Kytos.
440
441
        Returns:
442
            string: Json with current configurations used by kytos.
443
444
        """
445 1
        return json.dumps(self.options.__dict__)
446
447 1
    def restart(self, graceful=True):
448
        """Restart Kytos SDN Controller.
449
450
        Args:
451
            graceful(bool): Represents the way that Kytos will restart.
452
        """
453 1
        if self.started_at is not None:
454 1
            self.stop(graceful)
455 1
            self.__init__(self.options)
456
457 1
        self.start(restart=True)
458
459 1
    def stop(self, graceful=True):
460
        """Shutdown all services used by kytos.
461
462
        This method should:
463
            - stop all Websockets
464
            - stop the API Server
465
            - stop the Controller
466
        """
467 1
        if self.started_at:
468 1
            self.stop_controller(graceful)
469
470 1
    def stop_controller(self, graceful=True):
471
        """Stop the controller.
472
473
        This method should:
474
            - announce on the network that the controller will shutdown;
475
            - stop receiving incoming packages;
476
            - call the 'shutdown' method of each KytosNApp that is running;
477
            - finish reading the events on all buffers;
478
            - stop each running handler;
479
            - stop all running threads;
480
            - stop the KytosServer;
481
        """
482 1
        self.log.info("Stopping Kytos")
483
484 1
        self.buffers.send_stop_signal()
485 1
        self.napp_dir_listener.stop()
486
487 1
        for pool_name in executors:
488 1
            self.log.info("Stopping threadpool: %s", pool_name)
489
490 1
        threads = threading.enumerate()
491 1
        self.log.debug("%s threads before threadpool shutdown: %s",
492
                       len(threads), threads)
493
494 1
        for executor_pool in executors.values():
495 1
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
496
497
        # self.server.socket.shutdown()
498
        # self.server.socket.close()
499
500
        # for thread in self._threads.values():
501
        #     self.log.info("Stopping thread: %s", thread.name)
502
        #     thread.join()
503
504
        # for thread in self._threads.values():
505
        #     while thread.is_alive():
506
        #         self.log.info("Thread is alive: %s", thread.name)
507
        #         pass
508
509 1
        self.started_at = None
510 1
        self.unload_napps()
511 1
        self.buffers = KytosBuffers()
512
513
        # Cancel all async tasks (event handlers and servers)
514 1
        for task in self._tasks:
515
            task.cancel()
516
517
        # ASYNC TODO: close connections
518
        # self.server.server_close()
519
520 1
        self.log.info("Stopping API Server: %s", self._pool)
521 1
        self.api_server.stop_api_server()
522 1
        self.log.info("Stopping API Server threadpool: %s", self._pool)
523 1
        self._pool.shutdown(wait=graceful, cancel_futures=True)
524
        # Shutdown the TCP server and the main asyncio loop
525 1
        self.server.shutdown()
526
527 1
    def status(self):
528
        """Return status of Kytos Server.
529
530
        If the controller kytos is running this method will be returned
531
        "Running since 'Started_At'", otherwise "Stopped".
532
533
        Returns:
534
            string: String with kytos status.
535
536
        """
537 1
        if self.started_at:
538 1
            return f"Running since {self.started_at}"
539 1
        return "Stopped"
540
541 1
    def uptime(self):
542
        """Return the uptime of kytos server.
543
544
        This method should return:
545
            - 0 if Kytos Server is stopped.
546
            - (kytos.start_at - datetime.now) if Kytos Server is running.
547
548
        Returns:
549
           datetime.timedelta: The uptime interval.
550
551
        """
552 1
        return now() - self.started_at if self.started_at else 0
553
554 1
    def notify_listeners(self, event):
555
        """Send the event to the specified listeners.
556
557
        Loops over self.events_listeners matching (by regexp) the attribute
558
        name of the event with the keys of events_listeners. If a match occurs,
559
        then send the event to each registered listener.
560
561
        Args:
562
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
563
        """
564
565 1
        self.log.debug("looking for listeners for %s", event)
566 1
        for event_regex, listeners in dict(self.events_listeners).items():
567
            # self.log.debug("listeners found for %s: %r => %s", event,
568
            #                event_regex, [l.__qualname__ for l in listeners])
569
            # Do not match if the event has more characters
570
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
571 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
572 1
                event_regex += '$'
573 1
            if re.match(event_regex, event.name):
574 1
                self.log.debug('Calling listeners for %s', event)
575 1
                for listener in listeners:
576 1
                    if asyncio.iscoroutinefunction(listener):
577
                        task = asyncio.create_task(listener(event))
578
                        self._alisten_tasks.add(task)
579
                        task.add_done_callback(self._alisten_tasks.discard)
580
                    else:
581 1
                        listener(event)
582
583 1
    async def event_handler(self, buffer_name: str):
584
        """Default event handler that gets from an event buffer."""
585 1
        event_buffer = getattr(self.buffers, buffer_name)
586 1
        self.log.info(f"Event handler {buffer_name} started")
587
        while True:
588 1
            event = await event_buffer.aget()
589 1
            self.notify_listeners(event)
590
591 1
            if event.name == "kytos/core.shutdown":
592 1
                self.log.debug(f"Event handler {buffer_name} stopped")
593 1
                break
594
595 1
    async def publish_connection_error(self, event):
596
        """Publish connection error event.
597
598
        Args:
599
            event (KytosEvent): Event that triggered for error propagation.
600
        """
601
        event.name = \
602
            f"kytos/core.{event.destination.protocol.name}.connection.error"
603
        error_msg = f"Connection state: {event.destination.state}"
604
        event.content["exception"] = error_msg
605
        await self.buffers.app.aput(event)
606
607 1
    async def msg_out_event_handler(self):
608
        """Handle msg_out events.
609
610
        Listen to the msg_out buffer and send all its events to the
611
        corresponding listeners.
612
        """
613 1
        self.log.info("Event handler msg_out started")
614
        while True:
615 1
            triggered_event = await self.buffers.msg_out.aget()
616
617 1
            if triggered_event.name == "kytos/core.shutdown":
618 1
                self.log.debug("Message Out Event handler stopped")
619 1
                break
620
621 1
            message = triggered_event.content['message']
622 1
            destination = triggered_event.destination
623 1
            try:
624 1
                if (destination and
625
                        not destination.state == ConnectionState.FINISHED):
626 1
                    packet = message.pack()
627 1
                    destination.send(packet)
628 1
                    self.log.debug('Connection %s: OUT OFP, '
629
                                   'version: %s, type: %s, xid: %s - %s',
630
                                   destination.id,
631
                                   message.header.version,
632
                                   message.header.message_type,
633
                                   message.header.xid,
634
                                   packet.hex())
635 1
                    self.notify_listeners(triggered_event)
636 1
                    continue
637
638 1
            except (OSError, SocketError):
639
                pass
640 1
            except PackException as err:
641 1
                self.log.error(
642
                    f"Discarding message: {message}, event: {triggered_event} "
643
                    f"because of PackException {err}"
644
                )
645 1
                continue
646
647
            await self.publish_connection_error(triggered_event)
648
            self.log.info("connection closed. Cannot send message")
649
650 1
    def get_interface_by_id(self, interface_id):
651
        """Find a Interface  with interface_id.
652
653
        Args:
654
            interface_id(str): Interface Identifier.
655
656
        Returns:
657
            Interface: Instance of Interface with the id given.
658
659
        """
660 1
        if interface_id is None:
661 1
            return None
662
663 1
        switch_id = ":".join(interface_id.split(":")[:-1])
664 1
        interface_number = int(interface_id.split(":")[-1])
665
666 1
        switch = self.switches.get(switch_id)
667
668 1
        if not switch:
669 1
            return None
670
671 1
        return switch.interfaces.get(interface_number, None)
672
673 1
    def get_switch_by_dpid(self, dpid):
674
        """Return a specific switch by dpid.
675
676
        Args:
677
            dpid (|DPID|): dpid object used to identify a switch.
678
679
        Returns:
680
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
681
682
        """
683 1
        return self.switches.get(dpid)
684
685 1
    def get_switch_or_create(self, dpid, connection=None):
686
        """Return switch or create it if necessary.
687
688
        Args:
689
            dpid (|DPID|): dpid object used to identify a switch.
690
            connection (:class:`~kytos.core.connection.Connection`):
691
                connection used by switch. If a switch has a connection that
692
                will be updated.
693
694
        Returns:
695
            :class:`~kytos.core.switch.Switch`: new or existent switch.
696
697
        """
698 1
        with self._switches_lock:
699 1
            if connection:
700 1
                self.create_or_update_connection(connection)
701
702 1
            switch = self.get_switch_by_dpid(dpid)
703 1
            event_name = 'kytos/core.switch.'
704
705 1
            if switch is None:
706 1
                switch = Switch(dpid=dpid)
707 1
                self.add_new_switch(switch)
708 1
                event_name += 'new'
709
            else:
710 1
                event_name += 'reconnected'
711
712 1
            self.set_switch_options(dpid=dpid)
713 1
            event = KytosEvent(name=event_name, content={'switch': switch})
714
715 1
            if connection:
716 1
                old_connection = switch.connection
717 1
                switch.update_connection(connection)
718
719 1
                if old_connection is not connection:
720 1
                    self.remove_connection(old_connection)
721
722 1
            self.buffers.app.put(event)
723
724 1
            return switch
725
726 1
    def set_switch_options(self, dpid):
727
        """Update the switch settings based on kytos.conf options.
728
729
        Args:
730
            dpid (str): dpid used to identify a switch.
731
732
        """
733 1
        switch = self.switches.get(dpid)
734 1
        if not switch:
735
            return
736
737 1
        vlan_pool = {}
738 1
        vlan_pool = self.options.vlan_pool
739 1
        if not vlan_pool:
740 1
            return
741
742 1
        if vlan_pool.get(dpid):
743 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
744 1
            for intf_num, port_list in vlan_pool[dpid].items():
745 1
                if not switch.interfaces.get((intf_num)):
746 1
                    vlan_ids = set()
747 1
                    for vlan_range in port_list:
748 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
749 1
                        for vlan_id in range(vlan_begin, vlan_end):
750 1
                            vlan_ids.add(vlan_id)
751 1
                    intf_num = int(intf_num)
752 1
                    intf = Interface(name=intf_num, port_number=intf_num,
753
                                     switch=switch)
754 1
                    intf.set_available_tags(vlan_ids)
755 1
                    switch.update_interface(intf)
756
757 1
    def create_or_update_connection(self, connection):
758
        """Update a connection.
759
760
        Args:
761
            connection (:class:`~kytos.core.connection.Connection`):
762
                Instance of connection that will be updated.
763
        """
764 1
        self.connections[connection.id] = connection
765
766 1
    def get_connection_by_id(self, conn_id):
767
        """Return a existent connection by id.
768
769
        Args:
770
            id (int): id from a connection.
771
772
        Returns:
773
            :class:`~kytos.core.connection.Connection`: Instance of connection
774
                or None Type.
775
776
        """
777 1
        return self.connections.get(conn_id)
778
779 1
    def remove_connection(self, connection):
780
        """Close a existent connection and remove it.
781
782
        Args:
783
            connection (:class:`~kytos.core.connection.Connection`):
784
                Instance of connection that will be removed.
785
        """
786 1
        if connection is None:
787 1
            return False
788
789 1
        try:
790 1
            connection.close()
791 1
            del self.connections[connection.id]
792 1
        except KeyError:
793 1
            return False
794 1
        return True
795
796 1
    def remove_switch(self, switch):
797
        """Remove an existent switch.
798
799
        Args:
800
            switch (:class:`~kytos.core.switch.Switch`):
801
                Instance of switch that will be removed.
802
        """
803 1
        try:
804 1
            del self.switches[switch.dpid]
805 1
        except KeyError:
806 1
            return False
807 1
        return True
808
809 1
    def new_connection(self, event):
810
        """Handle a kytos/core.connection.new event.
811
812
        This method will read new connection event and store the connection
813
        (socket) into the connections attribute on the controller.
814
815
        It also clear all references to the connection since it is a new
816
        connection on the same ip:port.
817
818
        Args:
819
            event (~kytos.core.KytosEvent):
820
                The received event (``kytos/core.connection.new``) with the
821
                needed info.
822
        """
823 1
        self.log.info("Handling %s...", event)
824
825 1
        connection = event.source
826 1
        self.log.debug("Event source: %s", event.source)
827
828
        # Remove old connection (aka cleanup) if it exists
829 1
        if self.get_connection_by_id(connection.id):
830
            self.remove_connection(connection.id)
831
832
        # Update connections with the new connection
833 1
        self.create_or_update_connection(connection)
834
835 1
    def add_new_switch(self, switch):
836
        """Add a new switch on the controller.
837
838
        Args:
839
            switch (Switch): A Switch object
840
        """
841 1
        self.switches[switch.dpid] = switch
842
843 1
    def _import_napp(self, username, napp_name):
844
        """Import a NApp module.
845
846
        Raises:
847
            FileNotFoundError: if NApp's main.py is not found.
848
            ModuleNotFoundError: if any NApp requirement is not installed.
849
850
        """
851 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
852 1
        path = os.path.join(self.options.napps, username, napp_name,
853
                            'main.py')
854 1
        napp_spec = spec_from_file_location(mod_name, path)
855 1
        napp_module = module_from_spec(napp_spec)
856 1
        sys.modules[napp_spec.name] = napp_module
857 1
        napp_spec.loader.exec_module(napp_module)
858 1
        return napp_module
859
860 1
    def load_napp(self, username, napp_name):
861
        """Load a single NApp.
862
863
        Args:
864
            username (str): NApp username (makes up NApp's path).
865
            napp_name (str): Name of the NApp to be loaded.
866
        """
867 1
        if (username, napp_name) in self.napps:
868 1
            message = 'NApp %s/%s was already loaded'
869 1
            self.log.warning(message, username, napp_name)
870 1
            return
871
872 1
        try:
873 1
            napp_module = self._import_napp(username, napp_name)
874 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
875 1
            self.log.error("Error loading NApp '%s/%s': %s",
876
                           username, napp_name, err)
877 1
            return
878 1
        except FileNotFoundError as err:
879 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
880 1
            self.log.warning(msg, err.filename)
881 1
            return
882
883 1
        try:
884 1
            napp = napp_module.Main(controller=self)
885 1
        except:  # noqa pylint: disable=bare-except
886 1
            self.log.critical("NApp initialization failed: %s/%s",
887
                              username, napp_name, exc_info=True)
888 1
            return
889
890 1
        self.napps[(username, napp_name)] = napp
891
892 1
        napp.start()
893 1
        self.api_server.authenticate_endpoints(napp)
894 1
        self.api_server.register_napp_endpoints(napp)
895
896
        # pylint: disable=protected-access
897 1
        for event, listeners in napp._listeners.items():
898
            self.events_listeners.setdefault(event, []).extend(listeners)
899
        # pylint: enable=protected-access
900
901 1
    def pre_install_napps(self, napps, enable=True):
902
        """Pre install and enable NApps.
903
904
        Before installing, it'll check if it's installed yet.
905
906
        Args:
907
            napps ([str]): List of NApps to be pre-installed and enabled.
908
        """
909 1
        all_napps = self.napps_manager.get_installed_napps()
910 1
        installed = [str(napp) for napp in all_napps]
911 1
        napps_diff = [napp for napp in napps if napp not in installed]
912 1
        for napp in napps_diff:
913 1
            self.napps_manager.install(napp, enable=enable)
914
915 1
    def load_napps(self):
916
        """Load all NApps enabled on the NApps dir."""
917 1
        for napp in self.napps_manager.get_enabled_napps():
918 1
            try:
919 1
                self.log.info("Loading NApp %s", napp.id)
920 1
                self.load_napp(napp.username, napp.name)
921
            except FileNotFoundError as exception:
922
                self.log.error("Could not load NApp %s: %s",
923
                               napp.id, exception)
924
925 1
    def unload_napp(self, username, napp_name):
926
        """Unload a specific NApp.
927
928
        Args:
929
            username (str): NApp username.
930
            napp_name (str): Name of the NApp to be unloaded.
931
        """
932 1
        napp = self.napps.pop((username, napp_name), None)
933
934 1
        if napp is None:
935
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
936
        else:
937 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
938 1
            napp_id = NApp(username, napp_name).id
939 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
940 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
941
            # Call listener before removing it from events_listeners
942 1
            napp_shutdown_fn(event)
943
944
            # Remove rest endpoints from that napp
945 1
            self.api_server.remove_napp_endpoints(napp)
946
947
            # Removing listeners from that napp
948
            # pylint: disable=protected-access
949 1
            for event_type, napp_listeners in napp._listeners.items():
950
                event_listeners = self.events_listeners[event_type]
951
                for listener in napp_listeners:
952
                    event_listeners.remove(listener)
953
                if not event_listeners:
954
                    del self.events_listeners[event_type]
955
            # pylint: enable=protected-access
956
957 1
    def unload_napps(self):
958
        """Unload all loaded NApps that are not core NApps
959
960
        NApps are unloaded in the reverse order that they are enabled to
961
        facilitate to shutdown gracefully.
962
        """
963 1
        for napp in reversed(self.napps_manager.get_enabled_napps()):
964 1
            self.unload_napp(napp.username, napp.name)
965
966 1
    def reload_napp_module(self, username, napp_name, napp_file):
967
        """Reload a NApp Module."""
968 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
969 1
        try:
970 1
            napp_module = import_module(mod_name)
971 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
972 1
            self.log.error("Module '%s' not found", mod_name)
973 1
            raise
974 1
        try:
975 1
            napp_module = reload_module(napp_module)
976 1
        except ImportError as err:
977 1
            self.log.error("Error reloading NApp '%s/%s': %s",
978
                           username, napp_name, err)
979 1
            raise
980
981 1
    def reload_napp(self, username, napp_name):
982
        """Reload a NApp."""
983 1
        self.unload_napp(username, napp_name)
984 1
        try:
985 1
            self.reload_napp_module(username, napp_name, 'settings')
986 1
            self.reload_napp_module(username, napp_name, 'main')
987 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
988 1
            return 400
989 1
        self.log.info("NApp '%s/%s' successfully reloaded",
990
                      username, napp_name)
991 1
        self.load_napp(username, napp_name)
992 1
        return 200
993
994 1
    def rest_reload_napp(self, username, napp_name):
995
        """Request reload a NApp."""
996 1
        res = self.reload_napp(username, napp_name)
997 1
        return 'reloaded', res
998
999 1
    def rest_reload_all_napps(self):
1000
        """Request reload all NApps."""
1001 1
        for napp in self.napps:
1002 1
            self.reload_napp(*napp)
1003
        return 'reloaded', 200
1004