Passed
Pull Request — master (#336)
by Vinicius
05:06
created

kytos.core.controller.exc_handler()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
ccs 1
cts 3
cp 0.3333
crap 1.2963
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from pyof.foundation.exceptions import PackException
33
34 1
from kytos.core.api_server import APIServer
35 1
from kytos.core.apm import init_apm
36 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
37 1
from kytos.core.auth import Auth
38 1
from kytos.core.buffers import KytosBuffers
39 1
from kytos.core.config import KytosConfig
40 1
from kytos.core.connection import ConnectionState
41 1
from kytos.core.db import db_conn_wait
42 1
from kytos.core.dead_letter import DeadLetter
43 1
from kytos.core.events import KytosEvent
44 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
45 1
from kytos.core.helpers import executors, now
46 1
from kytos.core.interface import Interface
47 1
from kytos.core.logs import LogManager
48 1
from kytos.core.napps.base import NApp
49 1
from kytos.core.napps.manager import NAppsManager
50 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
51 1
from kytos.core.switch import Switch
52
53 1
__all__ = ('Controller',)
54
55
56 1
def exc_handler(_, exc, __):
57
    """Log uncaught exceptions.
58
59
    Args:
60
        exc_type (ignored): exception type
61
        exc: exception instance
62
        tb (ignored): traceback
63
    """
64
    logging.basicConfig(filename='errlog.log',
65
                        format='%(asctime)s:%(pathname)'
66
                        's:%(levelname)s:%(message)s')
67
    logging.exception('Uncaught Exception: %s', exc)
68
69
70 1
class Controller:
71
    """Main class of Kytos.
72
73
    The main responsibilities of this class are:
74
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
75
        - manage KytosNApps (install, load and unload);
76
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
77
        - manage which event should be sent to NApps methods;
78
        - manage the buffers handlers, considering one thread per handler.
79
    """
80
81
    # Created issue #568 for the disabled checks.
82
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
83
    # pylint: disable=consider-using-with,unnecessary-dunder-call
84
    # pylint: disable=too-many-lines
85 1
    def __init__(self, options=None):
86
        """Init method of Controller class takes the parameters below.
87
88
        Args:
89
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
90
                instance of :class:`~kytos.core.config.KytosConfig` class.
91
        """
92 1
        if options is None:
93
            options = KytosConfig().options['daemon']
94
95 1
        self._pool = ThreadPoolExecutor(max_workers=1)
96
97
        # asyncio tasks
98 1
        self._tasks = []
99
100
        #: dict: keep the main threads of the controller (buffers and handler)
101 1
        self._threads = {}
102
        #: KytosBuffers: KytosBuffer object with Controller buffers
103 1
        self.buffers: KytosBuffers = None
104
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
105
        #:
106
        #: This dict stores all connections between the controller and the
107
        #: switches. The key for this dict is a tuple (ip, port). The content
108
        #: is a Connection
109 1
        self.connections = {}
110
        #: dict: mapping of events and event listeners.
111
        #:
112
        #: The key of the dict is a KytosEvent (or a string that represent a
113
        #: regex to match against KytosEvents) and the value is a list of
114
        #: methods that will receive the referenced event
115 1
        self.events_listeners = {'kytos/core.connection.new':
116
                                 [self.new_connection]}
117
118
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
119
        #:
120
        #: The key is the napp name (string), while the value is the napp
121
        #: instance itself.
122 1
        self.napps = {}
123
        #: Object generated by ParseArgs on config.py file
124 1
        self.options = options
125
        #: KytosServer: Instance of KytosServer that will be listening to TCP
126
        #: connections.
127 1
        self.server = None
128
        #: dict: Current existing switches.
129
        #:
130
        #: The key is the switch dpid, while the value is a Switch object.
131 1
        self.switches = {}  # dpid: Switch()
132 1
        self._switches_lock = threading.Lock()
133
134
        #: datetime.datetime: Time when the controller finished starting.
135 1
        self.started_at = None
136
137
        #: logging.Logger: Logger instance used by Kytos.
138 1
        self.log = None
139
140
        #: Observer that handle NApps when they are enabled or disabled.
141 1
        self.napp_dir_listener = NAppDirListener(self)
142
143 1
        self.napps_manager = NAppsManager(self)
144
145
        #: API Server used to expose rest endpoints.
146 1
        self.api_server = APIServer(__name__, self.options.listen,
147
                                    self.options.api_port,
148
                                    self.napps_manager, self.options.napps)
149
150 1
        self.auth = None
151 1
        self.dead_letter = DeadLetter(self)
152 1
        self._alisten_tasks = set()
153
154 1
        self._register_endpoints()
155
        #: Adding the napps 'enabled' directory into the PATH
156
        #: Now you can access the enabled napps with:
157
        #: from napps.<username>.<napp_name> import ?....
158 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
159 1
        sys.excepthook = exc_handler
160
161 1
    def start_auth(self):
162
        """Initialize Auth() and its services"""
163 1
        self.auth = Auth(self)
164 1
        self.auth.register_core_auth_services()
165
166 1
    def enable_logs(self):
167
        """Register kytos log and enable the logs."""
168 1
        decorators = self.options.logger_decorators
169 1
        try:
170 1
            decorators = [self._resolve(deco) for deco in decorators]
171
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
172
            print(f'Failed to resolve decorator module: {err.name}')
173
            sys.exit(1)
174
        except AttributeError:
175
            print(f'Failed to resolve decorator name: {decorators}')
176
            sys.exit(1)
177 1
        LogManager.decorate_logger_class(*decorators)
178 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
179 1
        LogManager.enable_websocket(self.api_server.server)
180 1
        self.log = logging.getLogger(__name__)
181 1
        self._patch_core_loggers()
182
183 1
    @staticmethod
184 1
    def _resolve(name):
185
        """Resolve a dotted name to a global object."""
186
        mod, _, attr = name.rpartition('.')
187
        mod = importlib.import_module(mod)
188
        return getattr(mod, attr)
189
190 1
    @staticmethod
191 1
    def _patch_core_loggers():
192
        """Patch in updated loggers to 'kytos.core.*' modules"""
193 1
        match_str = 'kytos.core.'
194 1
        str_len = len(match_str)
195 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
196
                           if mod_name[:str_len] == match_str]
197 1
        for module in reloadable_mods:
198 1
            module.LOG = logging.getLogger(module.__name__)
199
200 1
    @staticmethod
201 1
    def loggers():
202
        """List all logging Loggers.
203
204
        Return a list of Logger objects, with name and logging level.
205
        """
206
        # pylint: disable=no-member
207 1
        return [logging.getLogger(name)
208
                for name in logging.root.manager.loggerDict
209
                if "kytos" in name]
210
211 1
    def toggle_debug(self, name=None):
212
        """Enable/disable logging debug messages to a given logger name.
213
214
        If the name parameter is not specified the debug will be
215
        enabled/disabled following the initial config file. It will decide
216
        to enable/disable using the 'kytos' name to find the current
217
        logger level.
218
        Obs: To disable the debug the logging will be set to NOTSET
219
220
        Args:
221
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
222
        """
223
        # pylint: disable=no-member
224 1
        if name and name not in logging.root.manager.loggerDict:
225
            # A Logger name that is not declared in logging will raise an error
226
            # otherwise logging would create a new Logger.
227 1
            raise ValueError(f"Invalid logger name: {name}")
228
229 1
        if not name:
230
            # Logger name not specified.
231 1
            level = logging.getLogger('kytos').getEffectiveLevel()
232 1
            enable_debug = level != logging.DEBUG
233
234
            # Enable/disable default Loggers
235 1
            LogManager.load_config_file(self.options.logging, enable_debug)
236 1
            return
237
238
        # Get effective logger level for the name
239 1
        level = logging.getLogger(name).getEffectiveLevel()
240 1
        logger = logging.getLogger(name)
241
242 1
        if level == logging.DEBUG:
243
            # disable debug
244 1
            logger.setLevel(logging.NOTSET)
245
        else:
246
            # enable debug
247 1
            logger.setLevel(logging.DEBUG)
248
249 1
    def start(self, restart=False):
250
        """Create pidfile and call start_controller method."""
251 1
        self.enable_logs()
252 1
        if self.options.database:
253 1
            self.db_conn_or_core_shutdown()
254 1
            self.start_auth()
255 1
        if self.options.apm:
256 1
            self.init_apm_or_core_shutdown()
257 1
        if not restart:
258 1
            self.create_pidfile()
259 1
        self.start_controller()
260
261 1
    def create_pidfile(self):
262
        """Create a pidfile."""
263 1
        pid = os.getpid()
264
265
        # Creates directory if it doesn't exist
266
        # System can erase /var/run's content
267 1
        pid_folder = Path(self.options.pidfile).parent
268 1
        self.log.info(pid_folder)
269
270
        # Pylint incorrectly infers Path objects
271
        # https://github.com/PyCQA/pylint/issues/224
272
        # pylint: disable=no-member
273 1
        if not pid_folder.exists():
274
            pid_folder.mkdir()
275
            pid_folder.chmod(0o1777)
276
        # pylint: enable=no-member
277
278
        # Make sure the file is deleted when controller stops
279 1
        atexit.register(Path(self.options.pidfile).unlink)
280
281
        # Checks if a pidfile exists. Creates a new file.
282 1
        try:
283 1
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
284 1
        except OSError:
285
            # This happens if there is a pidfile already.
286
            # We shall check if the process that created the pidfile is still
287
            # running.
288 1
            try:
289 1
                existing_file = open(self.options.pidfile, mode='r',
290
                                     encoding="utf8")
291 1
                old_pid = int(existing_file.read())
292 1
                os.kill(old_pid, 0)
293
                # If kill() doesn't return an error, there's a process running
294
                # with the same PID. We assume it is Kytos and quit.
295
                # Otherwise, overwrite the file and proceed.
296
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
297
                             "running. Aborting.")
298
                sys.exit(error_msg.format(self.options.pidfile))
299 1
            except OSError:
300 1
                try:
301 1
                    pidfile = open(self.options.pidfile, mode='w',
302
                                   encoding="utf8")
303
                except OSError as exception:
304
                    error_msg = "Failed to create pidfile {}: {}."
305
                    sys.exit(error_msg.format(self.options.pidfile, exception))
306
307
        # Identifies the process that created the pidfile.
308 1
        pidfile.write(str(pid))
309 1
        pidfile.close()
310
311 1
    def start_controller(self):
312
        """Start the controller.
313
314
        Starts the KytosServer (TCP Server) coroutine.
315
        Starts a thread for each buffer handler.
316
        Load the installed apps.
317
        """
318 1
        self.log.info("Starting Kytos - Kytos Controller")
319 1
        self.buffers = KytosBuffers()
320 1
        self.server = KytosServer((self.options.listen,
321
                                   int(self.options.port)),
322
                                  KytosServerProtocol,
323
                                  self,
324
                                  self.options.protocol_name)
325
326 1
        self.log.info("Starting TCP server: %s", self.server)
327 1
        self.server.serve_forever()
328
329 1
        def _stop_loop(_):
330
            loop = asyncio.get_running_loop()
331
            # print(_.result())
332
            threads = threading.enumerate()
333
            self.log.debug("%s threads before loop.stop: %s",
334
                           len(threads), threads)
335
            loop.stop()
336
337 1
        async def _run_api_server_thread(executor):
338
            log = logging.getLogger('kytos.core.controller.api_server_thread')
339
            log.debug('starting')
340
            # log.debug('creating tasks')
341
            loop = asyncio.get_running_loop()
342
            blocking_tasks = [
343
                loop.run_in_executor(executor, self.api_server.run)
344
            ]
345
            # log.debug('waiting for tasks')
346
            completed, pending = await asyncio.wait(blocking_tasks)
347
            # results = [t.result() for t in completed]
348
            # log.debug('results: {!r}'.format(results))
349
            log.debug('completed: %d, pending: %d',
350
                      len(completed), len(pending))
351
352 1
        loop = asyncio.get_running_loop()
353 1
        task = loop.create_task(_run_api_server_thread(self._pool))
354 1
        task.add_done_callback(_stop_loop)
355 1
        self._tasks.append(task)
356
357 1
        self.log.info("ThreadPool started: %s", self._pool)
358
359
        # ASYNC TODO: ensure all threads started correctly
360
        # This is critical, if any of them failed starting we should exit.
361
        # sys.exit(error_msg.format(thread, exception))
362
363 1
        self.log.info("Loading Kytos NApps...")
364 1
        self.napp_dir_listener.start()
365 1
        self.pre_install_napps(self.options.napps_pre_installed)
366 1
        self.load_napps()
367
368
        # Start task handlers consumers after NApps to potentially
369
        # avoid discarding an event if a consumer isn't ready yet
370 1
        task = loop.create_task(self.event_handler("conn"))
371 1
        self._tasks.append(task)
372 1
        task = loop.create_task(self.event_handler("raw"))
373 1
        self._tasks.append(task)
374 1
        task = loop.create_task(self.event_handler("msg_in"))
375 1
        self._tasks.append(task)
376 1
        task = loop.create_task(self.msg_out_event_handler())
377 1
        self._tasks.append(task)
378 1
        task = loop.create_task(self.event_handler("app"))
379 1
        self._tasks.append(task)
380
381 1
        self.started_at = now()
382
383 1
    def db_conn_or_core_shutdown(self):
384
        """Ensure db connection or core shutdown."""
385 1
        try:
386 1
            db_conn_wait(db_backend=self.options.database)
387 1
        except KytosDBInitException as exc:
388 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
389
390 1
    def init_apm_or_core_shutdown(self, **kwargs):
391
        """Init APM instrumentation or core shutdown."""
392
        if not self.options.apm:
393
            return
394
        try:
395
            init_apm(self.options.apm, app=self.api_server.app,
396
                     **kwargs)
397
        except KytosAPMInitException as exc:
398
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
399
400 1
    def _register_endpoints(self):
401
        """Register all rest endpoint served by kytos.
402
403
        -   Register APIServer endpoints
404
        -   Register WebUI endpoints
405
        -   Register ``/api/kytos/core/config`` endpoint
406
        """
407 1
        self.api_server.start_api()
408
        # Register controller endpoints as /api/kytos/core/...
409 1
        self.api_server.register_core_endpoint('config/',
410
                                               self.configuration_endpoint)
411 1
        self.api_server.register_core_endpoint('metadata/',
412
                                               Controller.metadata_endpoint)
413 1
        self.api_server.register_core_endpoint(
414
            'reload/<username>/<napp_name>/',
415
            self.rest_reload_napp)
416 1
        self.api_server.register_core_endpoint('reload/all',
417
                                               self.rest_reload_all_napps)
418 1
        self.dead_letter.register_endpoints()
419
420 1
    def register_rest_endpoint(self, url, function, methods):
421
        """Deprecate in favor of @rest decorator."""
422 1
        self.api_server.register_rest_endpoint(url, function, methods)
423
424 1
    @classmethod
425 1
    def metadata_endpoint(cls):
426
        """Return the Kytos metadata.
427
428
        Returns:
429
            string: Json with current kytos metadata.
430
431
        """
432 1
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
433 1
        with open(meta_path, encoding="utf8") as file:
434 1
            meta_file = file.read()
435 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
436 1
        return json.dumps(metadata)
437
438 1
    def configuration_endpoint(self):
439
        """Return the configuration options used by Kytos.
440
441
        Returns:
442
            string: Json with current configurations used by kytos.
443
444
        """
445 1
        return json.dumps(self.options.__dict__)
446
447 1
    def restart(self, graceful=True):
448
        """Restart Kytos SDN Controller.
449
450
        Args:
451
            graceful(bool): Represents the way that Kytos will restart.
452
        """
453 1
        if self.started_at is not None:
454 1
            self.stop(graceful)
455 1
            self.__init__(self.options)
456
457 1
        self.start(restart=True)
458
459 1
    def stop(self, graceful=True):
460
        """Shutdown all services used by kytos.
461
462
        This method should:
463
            - stop all Websockets
464
            - stop the API Server
465
            - stop the Controller
466
        """
467 1
        if self.started_at:
468 1
            self.stop_controller(graceful)
469
470 1
    def stop_controller(self, graceful=True):
471
        """Stop the controller.
472
473
        This method should:
474
            - announce on the network that the controller will shutdown;
475
            - stop receiving incoming packages;
476
            - call the 'shutdown' method of each KytosNApp that is running;
477
            - finish reading the events on all buffers;
478
            - stop each running handler;
479
            - stop all running threads;
480
            - stop the KytosServer;
481
        """
482 1
        self.log.info("Stopping Kytos")
483
484 1
        self.buffers.send_stop_signal()
485 1
        self.napp_dir_listener.stop()
486
487 1
        for pool_name in executors:
488 1
            self.log.info("Stopping threadpool: %s", pool_name)
489
490 1
        threads = threading.enumerate()
491 1
        self.log.debug("%s threads before threadpool shutdown: %s",
492
                       len(threads), threads)
493
494 1
        for executor_pool in executors.values():
495 1
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
496
497
        # self.server.socket.shutdown()
498
        # self.server.socket.close()
499
500
        # for thread in self._threads.values():
501
        #     self.log.info("Stopping thread: %s", thread.name)
502
        #     thread.join()
503
504
        # for thread in self._threads.values():
505
        #     while thread.is_alive():
506
        #         self.log.info("Thread is alive: %s", thread.name)
507
        #         pass
508
509 1
        self.started_at = None
510 1
        self.unload_napps()
511 1
        self.buffers = KytosBuffers()
512
513
        # Cancel all async tasks (event handlers and servers)
514 1
        for task in self._tasks:
515
            task.cancel()
516
517
        # ASYNC TODO: close connections
518
        # self.server.server_close()
519
520 1
        self.log.info("Stopping API Server: %s", self._pool)
521 1
        self.api_server.stop_api_server()
522 1
        self.log.info("Stopping API Server threadpool: %s", self._pool)
523 1
        self._pool.shutdown(wait=graceful, cancel_futures=True)
524
        # Shutdown the TCP server and the main asyncio loop
525 1
        self.server.shutdown()
526
527 1
    def status(self):
528
        """Return status of Kytos Server.
529
530
        If the controller kytos is running this method will be returned
531
        "Running since 'Started_At'", otherwise "Stopped".
532
533
        Returns:
534
            string: String with kytos status.
535
536
        """
537 1
        if self.started_at:
538 1
            return f"Running since {self.started_at}"
539 1
        return "Stopped"
540
541 1
    def uptime(self):
542
        """Return the uptime of kytos server.
543
544
        This method should return:
545
            - 0 if Kytos Server is stopped.
546
            - (kytos.start_at - datetime.now) if Kytos Server is running.
547
548
        Returns:
549
           datetime.timedelta: The uptime interval.
550
551
        """
552 1
        return now() - self.started_at if self.started_at else 0
553
554 1
    def notify_listeners(self, event):
555
        """Send the event to the specified listeners.
556
557
        Loops over self.events_listeners matching (by regexp) the attribute
558
        name of the event with the keys of events_listeners. If a match occurs,
559
        then send the event to each registered listener.
560
561
        Args:
562
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
563
        """
564
565 1
        self.log.debug("looking for listeners for %s", event)
566 1
        for event_regex, listeners in dict(self.events_listeners).items():
567
            # self.log.debug("listeners found for %s: %r => %s", event,
568
            #                event_regex, [l.__qualname__ for l in listeners])
569
            # Do not match if the event has more characters
570
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
571 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
572 1
                event_regex += '$'
573 1
            if re.match(event_regex, event.name):
574 1
                self.log.debug('Calling listeners for %s', event)
575 1
                for listener in listeners:
576 1
                    if asyncio.iscoroutinefunction(listener):
577
                        task = asyncio.create_task(listener(event))
578
                        self._alisten_tasks.add(task)
579
                        task.add_done_callback(self._alisten_tasks.discard)
580
                    else:
581 1
                        listener(event)
582
583 1
    async def event_handler(self, buffer_name: str):
584
        """Default event handler that gets from an event buffer."""
585 1
        event_buffer = getattr(self.buffers, buffer_name)
586 1
        self.log.info(f"Event handler {buffer_name} started")
587
        while True:
588 1
            event = await event_buffer.aget()
589 1
            self.notify_listeners(event)
590
591 1
            if event.name == "kytos/core.shutdown":
592 1
                self.log.debug(f"Event handler {buffer_name} stopped")
593 1
                break
594
595 1
    async def publish_connection_error(self, event):
596
        """Publish connection error event.
597
598
        Args:
599
            event (KytosEvent): Event that triggered for error propagation.
600
        """
601
        event.name = \
602
            f"kytos/core.{event.destination.protocol.name}.connection.error"
603
        error_msg = f"Connection state: {event.destination.state}"
604
        event.content["exception"] = error_msg
605
        await self.buffers.app.aput(event)
606
607 1
    async def msg_out_event_handler(self):
608
        """Handle msg_out events.
609
610
        Listen to the msg_out buffer and send all its events to the
611
        corresponding listeners.
612
        """
613 1
        self.log.info("Event handler msg_out started")
614
        while True:
615 1
            triggered_event = await self.buffers.msg_out.aget()
616
617 1
            if triggered_event.name == "kytos/core.shutdown":
618 1
                self.log.debug("Message Out Event handler stopped")
619 1
                break
620
621 1
            message = triggered_event.content['message']
622 1
            destination = triggered_event.destination
623 1
            try:
624 1
                if (destination and
625
                        not destination.state == ConnectionState.FINISHED):
626 1
                    packet = message.pack()
627 1
                    destination.send(packet)
628 1
                    self.log.debug('Connection %s: OUT OFP, '
629
                                   'version: %s, type: %s, xid: %s - %s',
630
                                   destination.id,
631
                                   message.header.version,
632
                                   message.header.message_type,
633
                                   message.header.xid,
634
                                   packet.hex())
635 1
                    self.notify_listeners(triggered_event)
636 1
            except (OSError, SocketError):
637
                await self.publish_connection_error(triggered_event)
638
                self.log.info("connection closed. Cannot send message")
639 1
            except PackException as err:
640 1
                self.log.error(
641
                    f"Discarding message: {message}, event: {triggered_event} "
642
                    f"because of PackException {err}"
643
                )
644
645 1
    def get_interface_by_id(self, interface_id):
646
        """Find a Interface  with interface_id.
647
648
        Args:
649
            interface_id(str): Interface Identifier.
650
651
        Returns:
652
            Interface: Instance of Interface with the id given.
653
654
        """
655 1
        if interface_id is None:
656 1
            return None
657
658 1
        switch_id = ":".join(interface_id.split(":")[:-1])
659 1
        interface_number = int(interface_id.split(":")[-1])
660
661 1
        switch = self.switches.get(switch_id)
662
663 1
        if not switch:
664 1
            return None
665
666 1
        return switch.interfaces.get(interface_number, None)
667
668 1
    def get_switch_by_dpid(self, dpid):
669
        """Return a specific switch by dpid.
670
671
        Args:
672
            dpid (|DPID|): dpid object used to identify a switch.
673
674
        Returns:
675
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
676
677
        """
678 1
        return self.switches.get(dpid)
679
680 1
    def get_switch_or_create(self, dpid, connection=None):
681
        """Return switch or create it if necessary.
682
683
        Args:
684
            dpid (|DPID|): dpid object used to identify a switch.
685
            connection (:class:`~kytos.core.connection.Connection`):
686
                connection used by switch. If a switch has a connection that
687
                will be updated.
688
689
        Returns:
690
            :class:`~kytos.core.switch.Switch`: new or existent switch.
691
692
        """
693 1
        with self._switches_lock:
694 1
            if connection:
695 1
                self.create_or_update_connection(connection)
696
697 1
            switch = self.get_switch_by_dpid(dpid)
698 1
            event_name = 'kytos/core.switch.'
699
700 1
            if switch is None:
701 1
                switch = Switch(dpid=dpid)
702 1
                self.add_new_switch(switch)
703 1
                event_name += 'new'
704
            else:
705 1
                event_name += 'reconnected'
706
707 1
            self.set_switch_options(dpid=dpid)
708 1
            event = KytosEvent(name=event_name, content={'switch': switch})
709
710 1
            if connection:
711 1
                old_connection = switch.connection
712 1
                switch.update_connection(connection)
713
714 1
                if old_connection is not connection:
715 1
                    self.remove_connection(old_connection)
716
717 1
            self.buffers.app.put(event)
718
719 1
            return switch
720
721 1
    def set_switch_options(self, dpid):
722
        """Update the switch settings based on kytos.conf options.
723
724
        Args:
725
            dpid (str): dpid used to identify a switch.
726
727
        """
728 1
        switch = self.switches.get(dpid)
729 1
        if not switch:
730
            return
731
732 1
        vlan_pool = {}
733 1
        vlan_pool = self.options.vlan_pool
734 1
        if not vlan_pool:
735 1
            return
736
737 1
        if vlan_pool.get(dpid):
738 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
739 1
            for intf_num, port_list in vlan_pool[dpid].items():
740 1
                if not switch.interfaces.get((intf_num)):
741 1
                    vlan_ids = set()
742 1
                    for vlan_range in port_list:
743 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
744 1
                        for vlan_id in range(vlan_begin, vlan_end):
745 1
                            vlan_ids.add(vlan_id)
746 1
                    intf_num = int(intf_num)
747 1
                    intf = Interface(name=intf_num, port_number=intf_num,
748
                                     switch=switch)
749 1
                    intf.set_available_tags(vlan_ids)
750 1
                    switch.update_interface(intf)
751
752 1
    def create_or_update_connection(self, connection):
753
        """Update a connection.
754
755
        Args:
756
            connection (:class:`~kytos.core.connection.Connection`):
757
                Instance of connection that will be updated.
758
        """
759 1
        self.connections[connection.id] = connection
760
761 1
    def get_connection_by_id(self, conn_id):
762
        """Return a existent connection by id.
763
764
        Args:
765
            id (int): id from a connection.
766
767
        Returns:
768
            :class:`~kytos.core.connection.Connection`: Instance of connection
769
                or None Type.
770
771
        """
772 1
        return self.connections.get(conn_id)
773
774 1
    def remove_connection(self, connection):
775
        """Close a existent connection and remove it.
776
777
        Args:
778
            connection (:class:`~kytos.core.connection.Connection`):
779
                Instance of connection that will be removed.
780
        """
781 1
        if connection is None:
782 1
            return False
783
784 1
        try:
785 1
            connection.close()
786 1
            del self.connections[connection.id]
787 1
        except KeyError:
788 1
            return False
789 1
        return True
790
791 1
    def remove_switch(self, switch):
792
        """Remove an existent switch.
793
794
        Args:
795
            switch (:class:`~kytos.core.switch.Switch`):
796
                Instance of switch that will be removed.
797
        """
798 1
        try:
799 1
            del self.switches[switch.dpid]
800 1
        except KeyError:
801 1
            return False
802 1
        return True
803
804 1
    def new_connection(self, event):
805
        """Handle a kytos/core.connection.new event.
806
807
        This method will read new connection event and store the connection
808
        (socket) into the connections attribute on the controller.
809
810
        It also clear all references to the connection since it is a new
811
        connection on the same ip:port.
812
813
        Args:
814
            event (~kytos.core.KytosEvent):
815
                The received event (``kytos/core.connection.new``) with the
816
                needed info.
817
        """
818 1
        self.log.info("Handling %s...", event)
819
820 1
        connection = event.source
821 1
        self.log.debug("Event source: %s", event.source)
822
823
        # Remove old connection (aka cleanup) if it exists
824 1
        if self.get_connection_by_id(connection.id):
825
            self.remove_connection(connection.id)
826
827
        # Update connections with the new connection
828 1
        self.create_or_update_connection(connection)
829
830 1
    def add_new_switch(self, switch):
831
        """Add a new switch on the controller.
832
833
        Args:
834
            switch (Switch): A Switch object
835
        """
836 1
        self.switches[switch.dpid] = switch
837
838 1
    def _import_napp(self, username, napp_name):
839
        """Import a NApp module.
840
841
        Raises:
842
            FileNotFoundError: if NApp's main.py is not found.
843
            ModuleNotFoundError: if any NApp requirement is not installed.
844
845
        """
846 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
847 1
        path = os.path.join(self.options.napps, username, napp_name,
848
                            'main.py')
849 1
        napp_spec = spec_from_file_location(mod_name, path)
850 1
        napp_module = module_from_spec(napp_spec)
851 1
        sys.modules[napp_spec.name] = napp_module
852 1
        napp_spec.loader.exec_module(napp_module)
853 1
        return napp_module
854
855 1
    def load_napp(self, username, napp_name):
856
        """Load a single NApp.
857
858
        Args:
859
            username (str): NApp username (makes up NApp's path).
860
            napp_name (str): Name of the NApp to be loaded.
861
        """
862 1
        if (username, napp_name) in self.napps:
863 1
            message = 'NApp %s/%s was already loaded'
864 1
            self.log.warning(message, username, napp_name)
865 1
            return
866
867 1
        try:
868 1
            napp_module = self._import_napp(username, napp_name)
869 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
870 1
            self.log.error("Error loading NApp '%s/%s': %s",
871
                           username, napp_name, err)
872 1
            return
873 1
        except FileNotFoundError as err:
874 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
875 1
            self.log.warning(msg, err.filename)
876 1
            return
877
878 1
        try:
879 1
            napp = napp_module.Main(controller=self)
880 1
        except:  # noqa pylint: disable=bare-except
881 1
            self.log.critical("NApp initialization failed: %s/%s",
882
                              username, napp_name, exc_info=True)
883 1
            return
884
885 1
        self.napps[(username, napp_name)] = napp
886
887 1
        napp.start()
888 1
        self.api_server.authenticate_endpoints(napp)
889 1
        self.api_server.register_napp_endpoints(napp)
890
891
        # pylint: disable=protected-access
892 1
        for event, listeners in napp._listeners.items():
893
            self.events_listeners.setdefault(event, []).extend(listeners)
894
        # pylint: enable=protected-access
895
896 1
    def pre_install_napps(self, napps, enable=True):
897
        """Pre install and enable NApps.
898
899
        Before installing, it'll check if it's installed yet.
900
901
        Args:
902
            napps ([str]): List of NApps to be pre-installed and enabled.
903
        """
904 1
        all_napps = self.napps_manager.get_installed_napps()
905 1
        installed = [str(napp) for napp in all_napps]
906 1
        napps_diff = [napp for napp in napps if napp not in installed]
907 1
        for napp in napps_diff:
908 1
            self.napps_manager.install(napp, enable=enable)
909
910 1
    def load_napps(self):
911
        """Load all NApps enabled on the NApps dir."""
912 1
        for napp in self.napps_manager.get_enabled_napps():
913 1
            try:
914 1
                self.log.info("Loading NApp %s", napp.id)
915 1
                self.load_napp(napp.username, napp.name)
916
            except FileNotFoundError as exception:
917
                self.log.error("Could not load NApp %s: %s",
918
                               napp.id, exception)
919
920 1
    def unload_napp(self, username, napp_name):
921
        """Unload a specific NApp.
922
923
        Args:
924
            username (str): NApp username.
925
            napp_name (str): Name of the NApp to be unloaded.
926
        """
927 1
        napp = self.napps.pop((username, napp_name), None)
928
929 1
        if napp is None:
930
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
931
        else:
932 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
933 1
            napp_id = NApp(username, napp_name).id
934 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
935 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
936
            # Call listener before removing it from events_listeners
937 1
            napp_shutdown_fn(event)
938
939
            # Remove rest endpoints from that napp
940 1
            self.api_server.remove_napp_endpoints(napp)
941
942
            # Removing listeners from that napp
943
            # pylint: disable=protected-access
944 1
            for event_type, napp_listeners in napp._listeners.items():
945
                event_listeners = self.events_listeners[event_type]
946
                for listener in napp_listeners:
947
                    event_listeners.remove(listener)
948
                if not event_listeners:
949
                    del self.events_listeners[event_type]
950
            # pylint: enable=protected-access
951
952 1
    def unload_napps(self):
953
        """Unload all loaded NApps that are not core NApps
954
955
        NApps are unloaded in the reverse order that they are enabled to
956
        facilitate to shutdown gracefully.
957
        """
958 1
        for napp in reversed(self.napps_manager.get_enabled_napps()):
959 1
            self.unload_napp(napp.username, napp.name)
960
961 1
    def reload_napp_module(self, username, napp_name, napp_file):
962
        """Reload a NApp Module."""
963 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
964 1
        try:
965 1
            napp_module = import_module(mod_name)
966 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
967 1
            self.log.error("Module '%s' not found", mod_name)
968 1
            raise
969 1
        try:
970 1
            napp_module = reload_module(napp_module)
971 1
        except ImportError as err:
972 1
            self.log.error("Error reloading NApp '%s/%s': %s",
973
                           username, napp_name, err)
974 1
            raise
975
976 1
    def reload_napp(self, username, napp_name):
977
        """Reload a NApp."""
978 1
        self.unload_napp(username, napp_name)
979 1
        try:
980 1
            self.reload_napp_module(username, napp_name, 'settings')
981 1
            self.reload_napp_module(username, napp_name, 'main')
982 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
983 1
            return 400
984 1
        self.log.info("NApp '%s/%s' successfully reloaded",
985
                      username, napp_name)
986 1
        self.load_napp(username, napp_name)
987 1
        return 200
988
989 1
    def rest_reload_napp(self, username, napp_name):
990
        """Request reload a NApp."""
991 1
        res = self.reload_napp(username, napp_name)
992 1
        return 'reloaded', res
993
994 1
    def rest_reload_all_napps(self):
995
        """Request reload all NApps."""
996 1
        for napp in self.napps:
997 1
            self.reload_napp(*napp)
998
        return 'reloaded', 200
999