Passed
Pull Request — master (#429)
by Vinicius
04:47
created

kytos.core.controller   F

Complexity

Total Complexity 120

Size/Duplication

Total Lines 962
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 492
dl 0
loc 962
rs 2
c 0
b 0
f 0
wmc 120

45 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.loggers() 0 10 1
A Controller.toggle_debug() 0 37 5
A Controller._patch_core_loggers() 0 9 2
A Controller._resolve() 0 6 1
A Controller.__init__() 0 76 2
A Controller.start_auth() 0 4 1
A Controller.rest_reload_napp() 0 8 2
A Controller.publish_connection_error() 0 11 1
B Controller.notify_listeners() 0 28 7
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 11 3
A Controller.event_handler() 0 11 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
B Controller.get_switch_or_create() 0 38 6
A Controller.configuration_endpoint() 0 8 1
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
A Controller.metadata_endpoint() 0 13 2
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 35 7
A Controller.stop_controller() 0 47 4
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller.pre_install_napps() 0 13 2
A Controller.status() 0 13 2
A Controller._register_endpoints() 0 19 1
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 38 6
A Controller.unload_napp() 0 30 5
B Controller.start() 0 21 6
B Controller.create_pidfile() 0 49 5
A Controller.buffers() 0 7 2
A Controller.start_controller() 0 48 2
A Controller.enable_logs() 0 16 3
A Controller._try_to_fmt_traceback_msg() 0 8 2
A Controller._full_queue_counter() 0 10 4

1 Function

Rating   Name   Duplication   Size   Complexity  
A exc_handler() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from socket import error as SocketError
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.config import KytosConfig
41
from kytos.core.connection import ConnectionState
42
from kytos.core.db import db_conn_wait
43
from kytos.core.dead_letter import DeadLetter
44
from kytos.core.events import KytosEvent
45
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
46
                                   KytosNAppSetupException)
47
from kytos.core.helpers import executors, now
48
from kytos.core.logs import LogManager
49
from kytos.core.napps.base import NApp
50
from kytos.core.napps.manager import NAppsManager
51
from kytos.core.napps.napp_dir_listener import NAppDirListener
52
from kytos.core.switch import Switch
53
54
__all__ = ('Controller',)
55
56
57
def exc_handler(_, exc, __):
58
    """Log uncaught exceptions.
59
60
    Args:
61
        exc_type (ignored): exception type
62
        exc: exception instance
63
        tb (ignored): traceback
64
    """
65
    logging.basicConfig(filename='errlog.log',
66
                        format='%(asctime)s:%(pathname)'
67
                        's:%(levelname)s:%(message)s')
68
    logging.exception('Uncaught Exception: %s', exc)
69
70
71
class Controller:
72
    """Main class of Kytos.
73
74
    The main responsibilities of this class are:
75
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
76
        - manage KytosNApps (install, load and unload);
77
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
78
        - manage which event should be sent to NApps methods;
79
        - manage the buffers handlers, considering one thread per handler.
80
    """
81
82
    # Created issue #568 for the disabled checks.
83
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
84
    # pylint: disable=consider-using-with,unnecessary-dunder-call
85
    # pylint: disable=too-many-lines
86
    def __init__(self, options=None, loop: AbstractEventLoop = None):
87
        """Init method of Controller class takes the parameters below.
88
89
        Args:
90
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
91
                instance of :class:`~kytos.core.config.KytosConfig` class.
92
            loop asyncio.AbstractEventLoop
93
        """
94
        if options is None:
95
            options = KytosConfig().options['daemon']
96
97
        self.loop = loop
98
99
        # asyncio tasks
100
        self._tasks: list[asyncio.Task] = []
101
102
        #: KytosBuffers: KytosBuffer object with Controller buffers
103
        self._buffers: KytosBuffers = None
104
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
105
        #:
106
        #: This dict stores all connections between the controller and the
107
        #: switches. The key for this dict is a tuple (ip, port). The content
108
        #: is a Connection
109
        self.connections = {}
110
        #: dict: mapping of events and event listeners.
111
        #:
112
        #: The key of the dict is a KytosEvent (or a string that represent a
113
        #: regex to match against KytosEvents) and the value is a list of
114
        #: methods that will receive the referenced event
115
        self.events_listeners = {'kytos/core.connection.new':
116
                                 [self.new_connection]}
117
118
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
119
        #:
120
        #: The key is the napp name (string), while the value is the napp
121
        #: instance itself.
122
        self.napps = {}
123
        #: Object generated by ParseArgs on config.py file
124
        self.options = options
125
        #: KytosServer: Instance of KytosServer that will be listening to TCP
126
        #: connections.
127
        self.server = None
128
        #: dict: Current existing switches.
129
        #:
130
        #: The key is the switch dpid, while the value is a Switch object.
131
        self.switches = {}  # dpid: Switch()
132
        self._switches_lock = threading.Lock()
133
134
        #: datetime.datetime: Time when the controller finished starting.
135
        self.started_at = None
136
137
        #: logging.Logger: Logger instance used by Kytos.
138
        self.log = None
139
140
        #: Observer that handle NApps when they are enabled or disabled.
141
        self.napp_dir_listener = NAppDirListener(self)
142
143
        self.napps_manager = NAppsManager(self)
144
145
        #: API Server used to expose rest endpoints.
146
        self.api_server = APIServer(self.options.listen,
147
                                    self.options.api_port,
148
                                    self.napps_manager,
149
                                    self.options.napps,
150
                                    self.options.api_traceback_on_500)
151
152
        self.auth = None
153
        self.dead_letter = DeadLetter(self)
154
        self._alisten_tasks = set()
155
156
        self._register_endpoints()
157
        #: Adding the napps 'enabled' directory into the PATH
158
        #: Now you can access the enabled napps with:
159
        #: from napps.<username>.<napp_name> import ?....
160
        sys.path.append(os.path.join(self.options.napps, os.pardir))
161
        sys.excepthook = exc_handler
162
163
    def start_auth(self):
164
        """Initialize Auth() and its services"""
165
        self.auth = Auth(self)
166
        self.auth.register_core_auth_services()
167
168
    def enable_logs(self):
169
        """Register kytos log and enable the logs."""
170
        decorators = self.options.logger_decorators
171
        try:
172
            decorators = [self._resolve(deco) for deco in decorators]
173
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
174
            sys.exit(f'Failed to resolve decorator module: {err.name}')
175
        except AttributeError:
176
            sys.exit(f'Failed to resolve decorator name: {decorators}')
177
        LogManager.decorate_logger_class(*decorators)
178
        LogManager.load_config_file(self.options.logging, self.options.debug)
179
        # pylint: disable=fixme
180
        # TODO issue 371
181
        # LogManager.enable_websocket(self.api_server.server)
182
        self.log = logging.getLogger(__name__)
183
        self._patch_core_loggers()
184
185
    @staticmethod
186
    def _resolve(name):
187
        """Resolve a dotted name to a global object."""
188
        mod, _, attr = name.rpartition('.')
189
        mod = importlib.import_module(mod)
190
        return getattr(mod, attr)
191
192
    @staticmethod
193
    def _patch_core_loggers():
194
        """Patch in updated loggers to 'kytos.core.*' modules"""
195
        match_str = 'kytos.core.'
196
        str_len = len(match_str)
197
        reloadable_mods = [module for mod_name, module in sys.modules.items()
198
                           if mod_name[:str_len] == match_str]
199
        for module in reloadable_mods:
200
            module.LOG = logging.getLogger(module.__name__)
201
202
    @staticmethod
203
    def loggers():
204
        """List all logging Loggers.
205
206
        Return a list of Logger objects, with name and logging level.
207
        """
208
        # pylint: disable=no-member
209
        return [logging.getLogger(name)
210
                for name in logging.root.manager.loggerDict
211
                if "kytos" in name]
212
213
    def toggle_debug(self, name=None):
214
        """Enable/disable logging debug messages to a given logger name.
215
216
        If the name parameter is not specified the debug will be
217
        enabled/disabled following the initial config file. It will decide
218
        to enable/disable using the 'kytos' name to find the current
219
        logger level.
220
        Obs: To disable the debug the logging will be set to NOTSET
221
222
        Args:
223
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
224
        """
225
        # pylint: disable=no-member
226
        if name and name not in logging.root.manager.loggerDict:
227
            # A Logger name that is not declared in logging will raise an error
228
            # otherwise logging would create a new Logger.
229
            raise ValueError(f"Invalid logger name: {name}")
230
231
        if not name:
232
            # Logger name not specified.
233
            level = logging.getLogger('kytos').getEffectiveLevel()
234
            enable_debug = level != logging.DEBUG
235
236
            # Enable/disable default Loggers
237
            LogManager.load_config_file(self.options.logging, enable_debug)
238
            return
239
240
        # Get effective logger level for the name
241
        level = logging.getLogger(name).getEffectiveLevel()
242
        logger = logging.getLogger(name)
243
244
        if level == logging.DEBUG:
245
            # disable debug
246
            logger.setLevel(logging.NOTSET)
247
        else:
248
            # enable debug
249
            logger.setLevel(logging.DEBUG)
250
251
    def start(self, restart=False):
252
        """Create pidfile and call start_controller method."""
253
        self.enable_logs()
254
        # pylint: disable=broad-except
255
        try:
256
            if self.options.database:
257
                db_conn_wait(db_backend=self.options.database)
258
                self.start_auth()
259
            if self.options.apm:
260
                init_apm(self.options.apm, app=self.api_server.app)
261
            if not restart:
262
                self.create_pidfile()
263
            self.start_controller()
264
        except (KytosDBInitException, KytosAPMInitException) as exc:
265
            message = f"Kytos couldn't start because of {str(exc)}"
266
            sys.exit(message)
267
        except Exception as exc:
268
            exc_fmt = traceback.format_exc(chain=True)
269
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
270
            counter = self._full_queue_counter()
271
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
272
273
    def create_pidfile(self):
274
        """Create a pidfile."""
275
        pid = os.getpid()
276
277
        # Creates directory if it doesn't exist
278
        # System can erase /var/run's content
279
        pid_folder = Path(self.options.pidfile).parent
280
        self.log.info(pid_folder)
281
282
        # Pylint incorrectly infers Path objects
283
        # https://github.com/PyCQA/pylint/issues/224
284
        # pylint: disable=no-member
285
        if not pid_folder.exists():
286
            pid_folder.mkdir()
287
            pid_folder.chmod(0o1777)
288
        # pylint: enable=no-member
289
290
        # Make sure the file is deleted when controller stops
291
        atexit.register(Path(self.options.pidfile).unlink)
292
293
        # Checks if a pidfile exists. Creates a new file.
294
        try:
295
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
296
        except OSError:
297
            # This happens if there is a pidfile already.
298
            # We shall check if the process that created the pidfile is still
299
            # running.
300
            try:
301
                existing_file = open(self.options.pidfile, mode='r',
302
                                     encoding="utf8")
303
                old_pid = int(existing_file.read())
304
                os.kill(old_pid, 0)
305
                # If kill() doesn't return an error, there's a process running
306
                # with the same PID. We assume it is Kytos and quit.
307
                # Otherwise, overwrite the file and proceed.
308
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
309
                             "running. Aborting.")
310
                sys.exit(error_msg.format(self.options.pidfile))
311
            except OSError:
312
                try:
313
                    pidfile = open(self.options.pidfile, mode='w',
314
                                   encoding="utf8")
315
                except OSError as exception:
316
                    error_msg = "Failed to create pidfile {}: {}."
317
                    sys.exit(error_msg.format(self.options.pidfile, exception))
318
319
        # Identifies the process that created the pidfile.
320
        pidfile.write(str(pid))
321
        pidfile.close()
322
323
    @property
324
    def buffers(self) -> KytosBuffers:
325
        """KytosBuffers exposed through this property to allow lazy
326
        async initiliation with an event loop running."""
327
        if not self._buffers:
328
            self._buffers = KytosBuffers()
329
        return self._buffers
330
331
    def start_controller(self):
332
        """Start the controller.
333
334
        Starts the KytosServer (TCP Server) coroutine.
335
        Starts a thread for each buffer handler.
336
        Load the installed apps.
337
        """
338
        self.log.info("Starting Kytos - Kytos Controller")
339
        if not self._buffers:
340
            self._buffers = KytosBuffers()
341
        self.server = KytosServer((self.options.listen,
342
                                   int(self.options.port)),
343
                                  KytosServerProtocol,
344
                                  self,
345
                                  self.options.protocol_name)
346
347
        self.log.info("Starting TCP server: %s", self.server)
348
        self.server.serve_forever()
349
350
        task = self.loop.create_task(self.api_server.serve())
351
        self._tasks.append(task)
352
353
        # ASYNC TODO: ensure all threads started correctly
354
        # This is critical, if any of them failed starting we should exit.
355
        # sys.exit(error_msg.format(thread, exception))
356
357
        self.log.info("Loading Kytos NApps...")
358
        self.napp_dir_listener.start()
359
        self.pre_install_napps(self.options.napps_pre_installed)
360
        self.load_napps()
361
        self.api_server.start_web_ui()
362
363
        # Start task handlers consumers after NApps to potentially
364
        # avoid discarding an event if a consumer isn't ready yet
365
        task = self.loop.create_task(self.event_handler("conn"))
366
        self._tasks.append(task)
367
        task = self.loop.create_task(self.event_handler("raw"))
368
        self._tasks.append(task)
369
        task = self.loop.create_task(self.event_handler("msg_in"))
370
        self._tasks.append(task)
371
        task = self.loop.create_task(self.msg_out_event_handler())
372
        self._tasks.append(task)
373
        task = self.loop.create_task(self.event_handler("app"))
374
        self._tasks.append(task)
375
        task = self.loop.create_task(self.event_handler("meta"))
376
        self._tasks.append(task)
377
378
        self.started_at = now()
379
380
    def _register_endpoints(self):
381
        """Register all rest endpoint served by kytos.
382
383
        -   Register APIServer endpoints
384
        -   Register WebUI endpoints
385
        -   Register ``/api/kytos/core/config`` endpoint
386
        """
387
        self.api_server.start_api()
388
        # Register controller endpoints as /api/kytos/core/...
389
        self.api_server.register_core_endpoint('config/',
390
                                               self.configuration_endpoint)
391
        self.api_server.register_core_endpoint('metadata/',
392
                                               Controller.metadata_endpoint)
393
        self.api_server.register_core_endpoint(
394
            'reload/{username}/{napp_name}/',
395
            self.rest_reload_napp)
396
        self.api_server.register_core_endpoint('reload/all',
397
                                               self.rest_reload_all_napps)
398
        self.dead_letter.register_endpoints()
399
400
    def register_rest_endpoint(self, url, function, methods):
401
        """Deprecate in favor of @rest decorator."""
402
        self.api_server.register_rest_endpoint(url, function, methods)
403
404
    @classmethod
405
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
406
        """Return the Kytos metadata.
407
408
        Returns:
409
            string: Json with current kytos metadata.
410
411
        """
412
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
413
        with open(meta_path, encoding="utf8") as file:
414
            meta_file = file.read()
415
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
416
        return JSONResponse(metadata)
417
418
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
419
        """Return the configuration options used by Kytos.
420
421
        Returns:
422
            string: Json with current configurations used by kytos.
423
424
        """
425
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
426
427
    def restart(self, graceful=True):
428
        """Restart Kytos SDN Controller.
429
430
        Args:
431
            graceful(bool): Represents the way that Kytos will restart.
432
        """
433
        if self.started_at is not None:
434
            self.stop(graceful)
435
            self.__init__(self.options)
436
437
        self.start(restart=True)
438
439
    def stop(self, graceful=True):
440
        """Shutdown all services used by kytos.
441
442
        This method should:
443
            - stop all Websockets
444
            - stop the API Server
445
            - stop the Controller
446
        """
447
        if self.started_at:
448
            self.stop_controller(graceful)
449
450
    def stop_controller(self, graceful=True):
451
        """Stop the controller.
452
453
        This method should:
454
            - announce on the network that the controller will shutdown;
455
            - stop receiving incoming packages;
456
            - call the 'shutdown' method of each KytosNApp that is running;
457
            - finish reading the events on all buffers;
458
            - stop each running handler;
459
            - stop all running threads;
460
            - stop the KytosServer;
461
        """
462
        self.log.info("Stopping Kytos")
463
464
        self.buffers.send_stop_signal()
465
        self.napp_dir_listener.stop()
466
467
        for pool_name in executors:
468
            self.log.info("Stopping threadpool: %s", pool_name)
469
470
        threads = threading.enumerate()
471
        self.log.debug("%s threads before threadpool shutdown: %s",
472
                       len(threads), threads)
473
474
        for executor_pool in executors.values():
475
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
476
477
        # self.server.socket.shutdown()
478
        # self.server.socket.close()
479
480
        self.started_at = None
481
        self.unload_napps()
482
483
        # Cancel all async tasks (event handlers and servers)
484
        for task in self._tasks:
485
            task.cancel()
486
487
        # ASYNC TODO: close connections
488
        # self.server.server_close()
489
490
        self.log.info("Stopping API Server...")
491
        self.api_server.stop()
492
        self.log.info("Stopped API Server")
493
        self.log.info("Stopping TCP Server...")
494
        self.server.shutdown()
495
        self.log.info("Stopped TCP Server")
496
        self.loop.stop()
497
498
    def status(self):
499
        """Return status of Kytos Server.
500
501
        If the controller kytos is running this method will be returned
502
        "Running since 'Started_At'", otherwise "Stopped".
503
504
        Returns:
505
            string: String with kytos status.
506
507
        """
508
        if self.started_at:
509
            return f"Running since {self.started_at}"
510
        return "Stopped"
511
512
    def uptime(self):
513
        """Return the uptime of kytos server.
514
515
        This method should return:
516
            - 0 if Kytos Server is stopped.
517
            - (kytos.start_at - datetime.now) if Kytos Server is running.
518
519
        Returns:
520
           datetime.timedelta: The uptime interval.
521
522
        """
523
        return now() - self.started_at if self.started_at else 0
524
525
    def notify_listeners(self, event):
526
        """Send the event to the specified listeners.
527
528
        Loops over self.events_listeners matching (by regexp) the attribute
529
        name of the event with the keys of events_listeners. If a match occurs,
530
        then send the event to each registered listener.
531
532
        Args:
533
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
534
        """
535
536
        self.log.debug("looking for listeners for %s", event)
537
        for event_regex, listeners in dict(self.events_listeners).items():
538
            # self.log.debug("listeners found for %s: %r => %s", event,
539
            #                event_regex, [l.__qualname__ for l in listeners])
540
            # Do not match if the event has more characters
541
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
542
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
543
                event_regex += '$'
544
            if re.match(event_regex, event.name):
545
                self.log.debug('Calling listeners for %s', event)
546
                for listener in listeners:
547
                    if asyncio.iscoroutinefunction(listener):
548
                        task = asyncio.create_task(listener(event))
549
                        self._alisten_tasks.add(task)
550
                        task.add_done_callback(self._alisten_tasks.discard)
551
                    else:
552
                        listener(event)
553
554
    async def event_handler(self, buffer_name: str):
555
        """Default event handler that gets from an event buffer."""
556
        event_buffer = getattr(self.buffers, buffer_name)
557
        self.log.info(f"Event handler {buffer_name} started")
558
        while True:
559
            event = await event_buffer.aget()
560
            self.notify_listeners(event)
561
562
            if event.name == "kytos/core.shutdown":
563
                self.log.debug(f"Event handler {buffer_name} stopped")
564
                break
565
566
    async def publish_connection_error(self, event):
567
        """Publish connection error event.
568
569
        Args:
570
            event (KytosEvent): Event that triggered for error propagation.
571
        """
572
        event.name = \
573
            f"kytos/core.{event.destination.protocol.name}.connection.error"
574
        error_msg = f"Connection state: {event.destination.state}"
575
        event.content["exception"] = error_msg
576
        await self.buffers.conn.aput(event)
577
578
    async def msg_out_event_handler(self):
579
        """Handle msg_out events.
580
581
        Listen to the msg_out buffer and send all its events to the
582
        corresponding listeners.
583
        """
584
        self.log.info("Event handler msg_out started")
585
        while True:
586
            triggered_event = await self.buffers.msg_out.aget()
587
588
            if triggered_event.name == "kytos/core.shutdown":
589
                self.log.debug("Message Out Event handler stopped")
590
                break
591
592
            message = triggered_event.content['message']
593
            destination = triggered_event.destination
594
            try:
595
                if (destination and
596
                        not destination.state == ConnectionState.FINISHED):
597
                    packet = message.pack()
598
                    destination.send(packet)
599
                    self.log.debug('Connection %s: OUT OFP, '
600
                                   'version: %s, type: %s, xid: %s - %s',
601
                                   destination.id,
602
                                   message.header.version,
603
                                   message.header.message_type,
604
                                   message.header.xid,
605
                                   packet.hex())
606
                    self.notify_listeners(triggered_event)
607
            except (OSError, SocketError):
608
                await self.publish_connection_error(triggered_event)
609
                self.log.info("connection closed. Cannot send message")
610
            except PackException as err:
611
                self.log.error(
612
                    f"Discarding message: {message}, event: {triggered_event} "
613
                    f"because of PackException {err}"
614
                )
615
616
    def get_interface_by_id(self, interface_id):
617
        """Find a Interface  with interface_id.
618
619
        Args:
620
            interface_id(str): Interface Identifier.
621
622
        Returns:
623
            Interface: Instance of Interface with the id given.
624
625
        """
626
        if interface_id is None:
627
            return None
628
629
        switch_id = ":".join(interface_id.split(":")[:-1])
630
        interface_number = int(interface_id.split(":")[-1])
631
632
        switch = self.switches.get(switch_id)
633
634
        if not switch:
635
            return None
636
637
        return switch.interfaces.get(interface_number, None)
638
639
    def get_switch_by_dpid(self, dpid):
640
        """Return a specific switch by dpid.
641
642
        Args:
643
            dpid (|DPID|): dpid object used to identify a switch.
644
645
        Returns:
646
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
647
648
        """
649
        return self.switches.get(dpid)
650
651
    def get_switch_or_create(self, dpid, connection=None):
652
        """Return switch or create it if necessary.
653
654
        Args:
655
            dpid (|DPID|): dpid object used to identify a switch.
656
            connection (:class:`~kytos.core.connection.Connection`):
657
                connection used by switch. If a switch has a connection that
658
                will be updated.
659
660
        Returns:
661
            :class:`~kytos.core.switch.Switch`: new or existent switch.
662
663
        """
664
        with self._switches_lock:
665
            if connection:
666
                self.create_or_update_connection(connection)
667
668
            switch = self.get_switch_by_dpid(dpid)
669
            event_name = 'kytos/core.switch.'
670
671
            if switch is None:
672
                switch = Switch(dpid=dpid)
673
                self.add_new_switch(switch)
674
                event_name += 'new'
675
            else:
676
                event_name += 'reconnected'
677
            event = KytosEvent(name=event_name, content={'switch': switch})
678
679
            if connection:
680
                old_connection = switch.connection
681
                switch.update_connection(connection)
682
683
                if old_connection is not connection:
684
                    self.remove_connection(old_connection)
685
686
            self.buffers.conn.put(event)
687
688
            return switch
689
690
    def create_or_update_connection(self, connection):
691
        """Update a connection.
692
693
        Args:
694
            connection (:class:`~kytos.core.connection.Connection`):
695
                Instance of connection that will be updated.
696
        """
697
        self.connections[connection.id] = connection
698
699
    def get_connection_by_id(self, conn_id):
700
        """Return a existent connection by id.
701
702
        Args:
703
            id (int): id from a connection.
704
705
        Returns:
706
            :class:`~kytos.core.connection.Connection`: Instance of connection
707
                or None Type.
708
709
        """
710
        return self.connections.get(conn_id)
711
712
    def remove_connection(self, connection):
713
        """Close a existent connection and remove it.
714
715
        Args:
716
            connection (:class:`~kytos.core.connection.Connection`):
717
                Instance of connection that will be removed.
718
        """
719
        if connection is None:
720
            return False
721
722
        try:
723
            connection.close()
724
            del self.connections[connection.id]
725
        except KeyError:
726
            return False
727
        return True
728
729
    def remove_switch(self, switch):
730
        """Remove an existent switch.
731
732
        Args:
733
            switch (:class:`~kytos.core.switch.Switch`):
734
                Instance of switch that will be removed.
735
        """
736
        try:
737
            del self.switches[switch.dpid]
738
        except KeyError:
739
            return False
740
        return True
741
742
    def new_connection(self, event):
743
        """Handle a kytos/core.connection.new event.
744
745
        This method will read new connection event and store the connection
746
        (socket) into the connections attribute on the controller.
747
748
        It also clear all references to the connection since it is a new
749
        connection on the same ip:port.
750
751
        Args:
752
            event (~kytos.core.KytosEvent):
753
                The received event (``kytos/core.connection.new``) with the
754
                needed info.
755
        """
756
        self.log.info("Handling %s...", event)
757
758
        connection = event.source
759
        self.log.debug("Event source: %s", event.source)
760
761
        # Remove old connection (aka cleanup) if it exists
762
        if self.get_connection_by_id(connection.id):
763
            self.remove_connection(connection.id)
764
765
        # Update connections with the new connection
766
        self.create_or_update_connection(connection)
767
768
    def add_new_switch(self, switch):
769
        """Add a new switch on the controller.
770
771
        Args:
772
            switch (Switch): A Switch object
773
        """
774
        self.switches[switch.dpid] = switch
775
776
    def _import_napp(self, username, napp_name):
777
        """Import a NApp module.
778
779
        Raises:
780
            FileNotFoundError: if NApp's main.py is not found.
781
            ModuleNotFoundError: if any NApp requirement is not installed.
782
783
        """
784
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
785
        path = os.path.join(self.options.napps, username, napp_name,
786
                            'main.py')
787
        napp_spec = spec_from_file_location(mod_name, path)
788
        napp_module = module_from_spec(napp_spec)
789
        sys.modules[napp_spec.name] = napp_module
790
        napp_spec.loader.exec_module(napp_module)
791
        return napp_module
792
793
    def load_napp(self, username, napp_name):
794
        """Load a single NApp.
795
796
        Args:
797
            username (str): NApp username (makes up NApp's path).
798
            napp_name (str): Name of the NApp to be loaded.
799
        """
800
        if (username, napp_name) in self.napps:
801
            message = 'NApp %s/%s was already loaded'
802
            self.log.warning(message, username, napp_name)
803
            return
804
805
        try:
806
            napp_module = self._import_napp(username, napp_name)
807
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
808
            self.log.error("Error loading NApp '%s/%s': %s",
809
                           username, napp_name, err)
810
            return
811
        except FileNotFoundError as err:
812
            msg = "NApp module not found, assuming it's a meta-napp: %s"
813
            self.log.warning(msg, err.filename)
814
            return
815
816
        try:
817
            napp = napp_module.Main(controller=self)
818
        except Exception as exc:  # noqa pylint: disable=bare-except
819
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
820
            raise KytosNAppSetupException(msg) from exc
821
822
        self.napps[(username, napp_name)] = napp
823
824
        napp.start()
825
        self.api_server.authenticate_endpoints(napp)
826
        self.api_server.register_napp_endpoints(napp)
827
828
        # pylint: disable=protected-access
829
        for event, listeners in napp._listeners.items():
830
            self.events_listeners.setdefault(event, []).extend(listeners)
831
        # pylint: enable=protected-access
832
833
    def pre_install_napps(self, napps, enable=True):
834
        """Pre install and enable NApps.
835
836
        Before installing, it'll check if it's installed yet.
837
838
        Args:
839
            napps ([str]): List of NApps to be pre-installed and enabled.
840
        """
841
        all_napps = self.napps_manager.get_installed_napps()
842
        installed = [str(napp) for napp in all_napps]
843
        napps_diff = [napp for napp in napps if napp not in installed]
844
        for napp in napps_diff:
845
            self.napps_manager.install(napp, enable=enable)
846
847
    def load_napps(self):
848
        """Load all NApps enabled on the NApps dir."""
849
        for napp in self.napps_manager.get_enabled_napps():
850
            try:
851
                self.log.info("Loading NApp %s", napp.id)
852
                self.load_napp(napp.username, napp.name)
853
            except FileNotFoundError as exception:
854
                self.log.error("Could not load NApp %s: %s",
855
                               napp.id, exception)
856
                msg = f"NApp {napp.id} exception {str(exception)}"
857
                raise KytosNAppSetupException(msg) from exception
858
859
    def unload_napp(self, username, napp_name):
860
        """Unload a specific NApp.
861
862
        Args:
863
            username (str): NApp username.
864
            napp_name (str): Name of the NApp to be unloaded.
865
        """
866
        napp = self.napps.pop((username, napp_name), None)
867
868
        if napp is None:
869
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
870
        else:
871
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
872
            napp_id = NApp(username, napp_name).id
873
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
874
            napp_shutdown_fn = self.events_listeners[event.name][0]
875
            # Call listener before removing it from events_listeners
876
            napp_shutdown_fn(event)
877
878
            # Remove rest endpoints from that napp
879
            self.api_server.remove_napp_endpoints(napp)
880
881
            # Removing listeners from that napp
882
            # pylint: disable=protected-access
883
            for event_type, napp_listeners in napp._listeners.items():
884
                event_listeners = self.events_listeners[event_type]
885
                for listener in napp_listeners:
886
                    event_listeners.remove(listener)
887
                if not event_listeners:
888
                    del self.events_listeners[event_type]
889
            # pylint: enable=protected-access
890
891
    def unload_napps(self):
892
        """Unload all loaded NApps that are not core NApps
893
894
        NApps are unloaded in the reverse order that they are enabled to
895
        facilitate to shutdown gracefully.
896
        """
897
        for napp in reversed(self.napps_manager.get_enabled_napps()):
898
            self.unload_napp(napp.username, napp.name)
899
900
    def reload_napp_module(self, username, napp_name, napp_file):
901
        """Reload a NApp Module."""
902
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
903
        try:
904
            napp_module = import_module(mod_name)
905
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
906
            self.log.error("Module '%s' not found", mod_name)
907
            raise
908
        try:
909
            napp_module = reload_module(napp_module)
910
        except ImportError as err:
911
            self.log.error("Error reloading NApp '%s/%s': %s",
912
                           username, napp_name, err)
913
            raise
914
915
    def reload_napp(self, username, napp_name):
916
        """Reload a NApp."""
917
        self.unload_napp(username, napp_name)
918
        try:
919
            self.reload_napp_module(username, napp_name, 'settings')
920
            self.reload_napp_module(username, napp_name, 'main')
921
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
922
            return 400
923
        self.log.info("NApp '%s/%s' successfully reloaded",
924
                      username, napp_name)
925
        self.load_napp(username, napp_name)
926
        return 200
927
928
    def rest_reload_napp(self, request: Request) -> JSONResponse:
929
        """Request reload a NApp."""
930
        username = request.path_params["username"]
931
        napp_name = request.path_params["napp_name"]
932
        res = self.reload_napp(username, napp_name)
933
        if res == 200:
934
            return JSONResponse('reloaded')
935
        return JSONResponse('fail to reload', status_code=res)
936
937
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
938
        """Request reload all NApps."""
939
        for napp in self.napps:
940
            self.reload_napp(*napp)
941
        return JSONResponse('reloaded')
942
943
    def _full_queue_counter(self) -> Counter:
944
        """Generate full queue stats counter."""
945
        buffer_counter = defaultdict(Counter)
946
        for buffer in self.buffers.get_all_buffers():
947
            if not buffer.full():
948
                continue
949
            while not buffer.empty():
950
                event = buffer.get()
951
                buffer_counter[buffer.name][event.name] += 1
952
        return buffer_counter
953
954
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
955
        """Try to fmt traceback message."""
956
        if counter:
957
            counter = dict(counter)
958
            message = (
959
                f"{message}\nFull KytosEventBuffers counters: {counter}"
960
            )
961
        return message
962