Passed
Pull Request — master (#583)
by
unknown
06:16
created

kytos.core.controller   F

Complexity

Total Complexity 163

Size/Duplication

Total Lines 1137
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 609
dl 0
loc 1137
rs 1.991
c 0
b 0
f 0
wmc 163

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from typing import Optional
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.common import EntityStatus
41
from kytos.core.config import KytosConfig
42
from kytos.core.connection import Connection, ConnectionState
43
from kytos.core.db import db_conn_wait
44
from kytos.core.dead_letter import DeadLetter
45
from kytos.core.events import KytosEvent
46
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
47
                                   KytosNAppSetupException)
48
from kytos.core.helpers import executors, now
49
from kytos.core.interface import Interface
50
from kytos.core.link import Link
51
from kytos.core.logs import LogManager
52
from kytos.core.napps.base import NApp
53
from kytos.core.napps.manager import NAppsManager
54
from kytos.core.napps.napp_dir_listener import NAppDirListener
55
from kytos.core.pacing import Pacer
56
from kytos.core.queue_monitor import QueueMonitorWindow
57
from kytos.core.switch import Switch
58
59
__all__ = ('Controller',)
60
61
62
def exc_handler(_, exc, __):
63
    """Log uncaught exceptions.
64
65
    Args:
66
        exc_type (ignored): exception type
67
        exc: exception instance
68
        tb (ignored): traceback
69
    """
70
    logging.basicConfig(filename='errlog.log',
71
                        format='%(asctime)s:%(pathname)'
72
                        's:%(levelname)s:%(message)s')
73
    logging.exception('Uncaught Exception: %s', exc)
74
75
76
class Controller:
77
    """Main class of Kytos.
78
79
    The main responsibilities of this class are:
80
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
81
        - manage KytosNApps (install, load and unload);
82
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
83
        - manage which event should be sent to NApps methods;
84
        - manage the buffers handlers, considering one thread per handler.
85
    """
86
87
    # Created issue #568 for the disabled checks.
88
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
89
    # pylint: disable=consider-using-with,unnecessary-dunder-call
90
    # pylint: disable=too-many-lines
91
    def __init__(self, options=None, loop: AbstractEventLoop = None):
92
        """Init method of Controller class takes the parameters below.
93
94
        Args:
95
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
96
                instance of :class:`~kytos.core.config.KytosConfig` class.
97
            loop asyncio.AbstractEventLoop
98
        """
99
        if options is None:
100
            options = KytosConfig().options['daemon']
101
102
        self.loop = loop
103
104
        # asyncio tasks
105
        self._tasks: list[asyncio.Task] = []
106
107
        #: KytosBuffers: KytosBuffer object with Controller buffers
108
        self._buffers: KytosBuffers = None
109
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
110
        #:
111
        #: This dict stores all connections between the controller and the
112
        #: switches. The key for this dict is a tuple (ip, port). The content
113
        #: is a Connection
114
        self.connections: dict[tuple, Connection] = {}
115
        #: dict: mapping of events and event listeners.
116
        #:
117
        #: The key of the dict is a KytosEvent (or a string that represent a
118
        #: regex to match against KytosEvents) and the value is a list of
119
        #: methods that will receive the referenced event
120
        self.events_listeners = {'kytos/core.connection.new':
121
                                 [self.new_connection]}
122
123
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
124
        #:
125
        #: The key is the napp name (string), while the value is the napp
126
        #: instance itself.
127
        self.napps = {}
128
        #: Object generated by ParseArgs on config.py file
129
        self.options = options
130
        #: KytosServer: Instance of KytosServer that will be listening to TCP
131
        #: connections.
132
        self.server = None
133
        #: dict: Current existing switches.
134
        #:
135
        #: The key is the switch dpid, while the value is a Switch object.
136
        self.switches: dict[str, Switch] = {}
137
        self.switches_lock = threading.Lock()
138
139
        #: datetime.datetime: Time when the controller finished starting.
140
        self.started_at = None
141
142
        #: logging.Logger: Logger instance used by Kytos.
143
        self.log = None
144
145
        #: Observer that handle NApps when they are enabled or disabled.
146
        self.napp_dir_listener = NAppDirListener(self)
147
148
        self.napps_manager = NAppsManager(self)
149
150
        #: API Server used to expose rest endpoints.
151
        self.api_server = APIServer(self.options.listen,
152
                                    self.options.api_port,
153
                                    self.napps_manager,
154
                                    self.options.napps,
155
                                    self.options.api_traceback_on_500)
156
157
        self.auth = None
158
        self.dead_letter = DeadLetter(self)
159
        self._alisten_tasks = set()
160
        self.qmonitors: list[QueueMonitorWindow] = []
161
162
        #: APM client in memory to be closed when necessary
163
        self.apm = None
164
165
        self._register_endpoints()
166
        #: Adding the napps 'enabled' directory into the PATH
167
        #: Now you can access the enabled napps with:
168
        #: from napps.<username>.<napp_name> import ?....
169
        sys.path.append(os.path.join(self.options.napps, os.pardir))
170
        sys.excepthook = exc_handler
171
172
        #: Pacer for controlling the rate which actions can be executed
173
        self.pacer = Pacer("memory://")
174
175
        self.links: dict[str, Link] = {}
176
        Link.register_status_reason_func("controller_mismatched_reason",
177
                                         self.detect_mismatched_link)
178
        Link.register_status_func("controller_mismatched_status",
179
                                  self.link_status_mismatched)
180
181
    def start_auth(self):
182
        """Initialize Auth() and its services"""
183
        self.auth = Auth(self)
184
        self.auth.register_core_auth_services()
185
186
    def enable_logs(self):
187
        """Register kytos log and enable the logs."""
188
        decorators = self.options.logger_decorators
189
        try:
190
            decorators = [self._resolve(deco) for deco in decorators]
191
        except ModuleNotFoundError as err:
192
            sys.exit(f'Failed to resolve decorator module: {err.name}')
193
        except AttributeError:
194
            sys.exit(f'Failed to resolve decorator name: {decorators}')
195
        LogManager.decorate_logger_class(*decorators)
196
        LogManager.load_config_file(self.options.logging, self.options.debug)
197
        # pylint: disable=fixme
198
        # TODO issue 371
199
        # LogManager.enable_websocket(self.api_server.server)
200
        self.log = logging.getLogger(__name__)
201
        self._patch_core_loggers()
202
203
    @staticmethod
204
    def _resolve(name):
205
        """Resolve a dotted name to a global object."""
206
        mod, _, attr = name.rpartition('.')
207
        mod = importlib.import_module(mod)
208
        return getattr(mod, attr)
209
210
    @staticmethod
211
    def _patch_core_loggers():
212
        """Patch in updated loggers to 'kytos.core.*' modules"""
213
        match_str = 'kytos.core.'
214
        str_len = len(match_str)
215
        reloadable_mods = [module for mod_name, module in sys.modules.items()
216
                           if mod_name[:str_len] == match_str]
217
        for module in reloadable_mods:
218
            new_logger = logging.getLogger(module.__name__)
219
            if hasattr(module, "LOG"):
220
                old_logger = module.LOG
221
                for handler in old_logger.handlers:
222
                    new_logger.addHandler(handler)
223
                for log_filter in old_logger.filters:
224
                    new_logger.addFilter(log_filter)
225
            module.LOG = new_logger
226
227
    @staticmethod
228
    def loggers():
229
        """List all logging Loggers.
230
231
        Return a list of Logger objects, with name and logging level.
232
        """
233
        # pylint: disable=no-member
234
        return [logging.getLogger(name)
235
                for name in logging.root.manager.loggerDict
236
                if "kytos" in name]
237
238
    def toggle_debug(self, name=None):
239
        """Enable/disable logging debug messages to a given logger name.
240
241
        If the name parameter is not specified the debug will be
242
        enabled/disabled following the initial config file. It will decide
243
        to enable/disable using the 'kytos' name to find the current
244
        logger level.
245
        Obs: To disable the debug the logging will be set to NOTSET
246
247
        Args:
248
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
249
        """
250
        # pylint: disable=no-member
251
        if name and name not in logging.root.manager.loggerDict:
252
            # A Logger name that is not declared in logging will raise an error
253
            # otherwise logging would create a new Logger.
254
            raise ValueError(f"Invalid logger name: {name}")
255
256
        if not name:
257
            # Logger name not specified.
258
            level = logging.getLogger('kytos').getEffectiveLevel()
259
            enable_debug = level != logging.DEBUG
260
261
            # Enable/disable default Loggers
262
            LogManager.load_config_file(self.options.logging, enable_debug)
263
            return
264
265
        # Get effective logger level for the name
266
        level = logging.getLogger(name).getEffectiveLevel()
267
        logger = logging.getLogger(name)
268
269
        if level == logging.DEBUG:
270
            # disable debug
271
            logger.setLevel(logging.NOTSET)
272
        else:
273
            # enable debug
274
            logger.setLevel(logging.DEBUG)
275
276
    async def start(self, restart=False):
277
        """Create pidfile and call start_controller method."""
278
        self.enable_logs()
279
        # pylint: disable=broad-except
280
        try:
281
            if self.options.database:
282
                db_conn_wait(db_backend=self.options.database)
283
            if self.options.apm:
284
                self.apm = init_apm(self.options.apm, app=self.api_server.app)
285
            if not restart:
286
                self.create_pidfile()
287
            await self.start_controller()
288
        except (KytosDBInitException, KytosAPMInitException) as exc:
289
            message = f"Kytos couldn't start because of {str(exc)}"
290
            sys.exit(message)
291
        except Exception as exc:
292
            exc_fmt = traceback.format_exc(chain=True)
293
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
294
            counter = self._full_queue_counter()
295
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
296
297
    def start_queue_monitors(self) -> None:
298
        """Start QueueMonitorWindows."""
299
        for data in self.options.event_buffer_monitors:
300
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
301
                self.qmonitors.append(qmon)
302
        for data in self.options.thread_pool_queue_monitors:
303
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
304
                self.qmonitors.append(qmon)
305
        for qmonitor in self.qmonitors:
306
            self.log.info(f"Starting {qmonitor}...")
307
            qmonitor.start()
308
309
    def stop_queue_monitors(self) -> None:
310
        """Stop QueueMonitorWindows."""
311
        for qmonitor in self.qmonitors:
312
            self.log.info(f"Stopping {qmonitor}...")
313
            qmonitor.stop()
314
315
    def create_pidfile(self):
316
        """Create a pidfile."""
317
        pid = os.getpid()
318
319
        # Creates directory if it doesn't exist
320
        # System can erase /var/run's content
321
        pid_folder = Path(self.options.pidfile).parent
322
        self.log.info(pid_folder)
323
324
        # Pylint incorrectly infers Path objects
325
        # https://github.com/PyCQA/pylint/issues/224
326
        # pylint: disable=no-member
327
        if not pid_folder.exists():
328
            pid_folder.mkdir()
329
            pid_folder.chmod(0o1777)
330
        # pylint: enable=no-member
331
332
        # Make sure the file is deleted when controller stops
333
        atexit.register(Path(self.options.pidfile).unlink)
334
335
        # Checks if a pidfile exists. Creates a new file.
336
        try:
337
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
338
        except OSError:
339
            # This happens if there is a pidfile already.
340
            # We shall check if the process that created the pidfile is still
341
            # running.
342
            try:
343
                existing_file = open(self.options.pidfile, mode='r',
344
                                     encoding="utf8")
345
                old_pid = int(existing_file.read())
346
                os.kill(old_pid, 0)
347
                # If kill() doesn't return an error, there's a process running
348
                # with the same PID. We assume it is Kytos and quit.
349
                # Otherwise, overwrite the file and proceed.
350
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
351
                             "running. Aborting.")
352
                sys.exit(error_msg.format(self.options.pidfile))
353
            except OSError:
354
                try:
355
                    pidfile = open(self.options.pidfile, mode='w',
356
                                   encoding="utf8")
357
                except OSError as exception:
358
                    error_msg = "Failed to create pidfile {}: {}."
359
                    sys.exit(error_msg.format(self.options.pidfile, exception))
360
361
        # Identifies the process that created the pidfile.
362
        pidfile.write(str(pid))
363
        pidfile.close()
364
365
    @property
366
    def buffers(self) -> KytosBuffers:
367
        """KytosBuffers exposed through this property to allow lazy
368
        async initiliation with an event loop running."""
369
        if not self._buffers:
370
            self._buffers = KytosBuffers()
371
        return self._buffers
372
373
    async def _wait_api_server_started(self):
374
        """Use this method to wait for Uvicorn server to start."""
375
        while not self.api_server.server.started:
376
            await asyncio.sleep(0.1)
377
378
    async def start_controller(self):
379
        """Start the controller.
380
381
        Starts the KytosServer (TCP Server) coroutine.
382
        Starts a thread for each buffer handler.
383
        Load the installed apps.
384
        """
385
        self.log.info("Starting Kytos - Kytos Controller")
386
        if not self._buffers:
387
            self._buffers = KytosBuffers()
388
        self.server = KytosServer((self.options.listen,
389
                                   int(self.options.port)),
390
                                  KytosServerProtocol,
391
                                  self,
392
                                  self.options.protocol_name)
393
394
        self.log.info("Starting API server")
395
        task = self.loop.create_task(self.api_server.serve())
396
        self._tasks.append(task)
397
        await self._wait_api_server_started()
398
399
        self.log.info(f"Starting TCP server: {self.server}")
400
        self.server.serve_forever()
401
402
        # ASYNC TODO: ensure all threads started correctly
403
        # This is critical, if any of them failed starting we should exit.
404
        # sys.exit(error_msg.format(thread, exception))
405
406
        self.log.info("Starting authorization.")
407
        self.start_auth()
408
        self.log.info("Loading Kytos NApps...")
409
        self.napp_dir_listener.start()
410
        self.pre_install_napps(self.options.napps_pre_installed)
411
        self.load_napps()
412
        self.api_server.start_web_ui()
413
414
        # Start task handlers consumers after NApps to potentially
415
        # avoid discarding an event if a consumer isn't ready yet
416
        task = self.loop.create_task(self.event_handler("conn"))
417
        self._tasks.append(task)
418
        task = self.loop.create_task(self.event_handler("raw"))
419
        self._tasks.append(task)
420
        task = self.loop.create_task(self.event_handler("msg_in"))
421
        self._tasks.append(task)
422
        task = self.loop.create_task(self.msg_out_event_handler())
423
        self._tasks.append(task)
424
        task = self.loop.create_task(self.event_handler("app"))
425
        self._tasks.append(task)
426
        task = self.loop.create_task(self.event_handler("meta"))
427
        self._tasks.append(task)
428
429
        self.start_queue_monitors()
430
        self.started_at = now()
431
432
    def _register_endpoints(self):
433
        """Register all rest endpoint served by kytos.
434
435
        -   Register APIServer endpoints
436
        -   Register WebUI endpoints
437
        -   Register ``/api/kytos/core/config`` endpoint
438
        """
439
        self.api_server.start_api()
440
        # Register controller endpoints as /api/kytos/core/...
441
        self.api_server.register_core_endpoint('config/',
442
                                               self.configuration_endpoint)
443
        self.api_server.register_core_endpoint('metadata/',
444
                                               Controller.metadata_endpoint)
445
        self.api_server.register_core_endpoint(
446
            'reload/{username}/{napp_name}/',
447
            self.rest_reload_napp)
448
        self.api_server.register_core_endpoint('reload/all',
449
                                               self.rest_reload_all_napps)
450
        self.dead_letter.register_endpoints()
451
452
    def register_rest_endpoint(self, url, function, methods):
453
        """Deprecate in favor of @rest decorator."""
454
        self.api_server.register_rest_endpoint(url, function, methods)
455
456
    @classmethod
457
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
458
        """Return the Kytos metadata.
459
460
        Returns:
461
            string: Json with current kytos metadata.
462
463
        """
464
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
465
        with open(meta_path, encoding="utf8") as file:
466
            meta_file = file.read()
467
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
468
        return JSONResponse(metadata)
469
470
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
471
        """Return the configuration options used by Kytos.
472
473
        Returns:
474
            string: Json with current configurations used by kytos.
475
476
        """
477
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
478
479
    def restart(self, graceful=True):
480
        """Restart Kytos SDN Controller.
481
482
        Args:
483
            graceful(bool): Represents the way that Kytos will restart.
484
        """
485
        if self.started_at is not None:
486
            self.stop(graceful)
487
            self.__init__(self.options)
488
489
        self.start(restart=True)
490
491
    def stop(self, graceful=True):
492
        """Shutdown all services used by kytos.
493
494
        This method should:
495
            - stop all Websockets
496
            - stop the API Server
497
            - stop the Controller
498
        """
499
        if self.started_at:
500
            self.stop_controller(graceful)
501
502
    def stop_controller(self, graceful=True):
503
        """Stop the controller.
504
505
        This method should:
506
            - announce on the network that the controller will shutdown;
507
            - stop receiving incoming packages;
508
            - call the 'shutdown' method of each KytosNApp that is running;
509
            - finish reading the events on all buffers;
510
            - stop each running handler;
511
            - stop all running threads;
512
            - stop the KytosServer;
513
        """
514
        self.log.info("Stopping Kytos")
515
516
        self.buffers.send_stop_signal()
517
        self.napp_dir_listener.stop()
518
519
        for pool_name in executors:
520
            self.log.info("Stopping threadpool: %s", pool_name)
521
522
        threads = threading.enumerate()
523
        self.log.debug("%s threads before threadpool shutdown: %s",
524
                       len(threads), threads)
525
526
        for executor_pool in executors.values():
527
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
528
529
        # self.server.socket.shutdown()
530
        # self.server.socket.close()
531
532
        self.started_at = None
533
        napps = self.unload_napps()
534
        self.log.info(
535
            f"Waiting for {len(napps)} NApps shutdown confirmation..."
536
        )
537
        for napp in napps:
538
            self.log.info(f"Waiting {napp} shutdown confirmation...")
539
            napp.__dict__['_KytosNApp__event'].wait()
540
            self.log.info(f"{napp} was shutdown")
541
542
        # Cancel all async tasks (event handlers and servers)
543
        for task in self._tasks:
544
            task.cancel()
545
546
        # ASYNC TODO: close connections
547
        # self.server.server_close()
548
549
        self.stop_queue_monitors()
550
        if self.apm:
551
            self.log.info("Stopping APM server...")
552
            self.apm.close()
553
            self.log.info("Stopped APM Server")
554
        self.log.info("Stopping API Server...")
555
        self.api_server.stop()
556
        self.log.info("Stopped API Server")
557
        self.log.info("Stopping TCP Server...")
558
        self.server.shutdown()
559
        self.log.info("Stopped TCP Server")
560
        self.loop.stop()
561
562
    def status(self):
563
        """Return status of Kytos Server.
564
565
        If the controller kytos is running this method will be returned
566
        "Running since 'Started_At'", otherwise "Stopped".
567
568
        Returns:
569
            string: String with kytos status.
570
571
        """
572
        if self.started_at:
573
            return f"Running since {self.started_at}"
574
        return "Stopped"
575
576
    def uptime(self):
577
        """Return the uptime of kytos server.
578
579
        This method should return:
580
            - 0 if Kytos Server is stopped.
581
            - (kytos.start_at - datetime.now) if Kytos Server is running.
582
583
        Returns:
584
           datetime.timedelta: The uptime interval.
585
586
        """
587
        return now() - self.started_at if self.started_at else 0
588
589
    def notify_listeners(self, event):
590
        """Send the event to the specified listeners.
591
592
        Loops over self.events_listeners matching (by regexp) the attribute
593
        name of the event with the keys of events_listeners. If a match occurs,
594
        then send the event to each registered listener.
595
596
        Args:
597
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
598
        """
599
600
        self.log.debug("looking for listeners for %s", event)
601
        for event_regex, listeners in dict(self.events_listeners).items():
602
            # self.log.debug("listeners found for %s: %r => %s", event,
603
            #                event_regex, [l.__qualname__ for l in listeners])
604
            # Do not match if the event has more characters
605
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
606
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
607
                event_regex += '$'
608
            if re.match(event_regex, event.name):
609
                self.log.debug('Calling listeners for %s', event)
610
                for listener in listeners:
611
                    if asyncio.iscoroutinefunction(listener):
612
                        task = asyncio.create_task(listener(event))
613
                        self._alisten_tasks.add(task)
614
                        task.add_done_callback(self._alisten_tasks.discard)
615
                    else:
616
                        listener(event)
617
618
    # pylint: disable=broad-exception-caught
619
    async def event_handler(self, buffer_name: str):
620
        """Default event handler that gets from an event buffer."""
621
        event_buffer = getattr(self.buffers, buffer_name)
622
        self.log.info(f"Event handler {buffer_name} started")
623
        while True:
624
            try:
625
                # After hitting issues with a large amount of flows being
626
                # installed (16k which sends 32k events), this task was
627
                # hogging resources in the MainThread causing disconnections
628
                # and socket exceptions. With the following sleep, this task
629
                # yields to other loops mitigating the disconnection issues.
630
                # Now the cap for flow installation at the same time is 50k.
631
                await asyncio.sleep(0)
632
                event = await event_buffer.aget()
633
                self.notify_listeners(event)
634
635
                if event.name == "kytos/core.shutdown":
636
                    self.log.debug(f"Event handler {buffer_name} stopped")
637
                    break
638
            except Exception as exc:
639
                self.log.exception(f"Unhandled exception on {buffer_name}",
640
                                   exc_info=exc)
641
642
    async def publish_connection_error(self, event):
643
        """Publish connection error event.
644
645
        Args:
646
            event (KytosEvent): Event that triggered for error propagation.
647
        """
648
        event.name = \
649
            f"kytos/core.{event.destination.protocol.name}.connection.error"
650
        error_msg = f"Connection state: {event.destination.state}"
651
        event.content["exception"] = error_msg
652
        await self.buffers.conn.aput(event)
653
654
    # pylint: disable=broad-exception-caught
655
    async def msg_out_event_handler(self):
656
        """Handle msg_out events.
657
658
        Listen to the msg_out buffer and send all its events to the
659
        corresponding listeners.
660
        """
661
        self.log.info("Event handler msg_out started")
662
        while True:
663
            try:
664
                triggered_event = await self.buffers.msg_out.aget()
665
666
                if triggered_event.name == "kytos/core.shutdown":
667
                    self.log.debug("Message Out Event handler stopped")
668
                    break
669
670
                message = triggered_event.content['message']
671
                destination = triggered_event.destination
672
                if (destination and
673
                        not destination.state == ConnectionState.FINISHED):
674
                    packet = message.pack()
675
                    destination.send(packet)
676
                    self.log.debug('Connection %s: OUT OFP, '
677
                                   'version: %s, type: %s, xid: %s - %s',
678
                                   destination.id,
679
                                   message.header.version,
680
                                   message.header.message_type,
681
                                   message.header.xid,
682
                                   packet.hex())
683
                    self.notify_listeners(triggered_event)
684
            except OSError:
685
                await self.publish_connection_error(triggered_event)
686
                self.log.info("connection closed. Cannot send message")
687
            except PackException as err:
688
                self.log.error(
689
                    f"Discarding message: {message}, event: {triggered_event} "
690
                    f"because of PackException {err}"
691
                )
692
            except Exception as exc:
693
                self.log.exception("Unhandled exception on msg_out",
694
                                   exc_info=exc)
695
696
    def get_interface_by_id(self, interface_id):
697
        """Find a Interface  with interface_id.
698
699
        Args:
700
            interface_id(str): Interface Identifier.
701
702
        Returns:
703
            Interface: Instance of Interface with the id given.
704
705
        """
706
        if interface_id is None:
707
            return None
708
709
        switch_id = ":".join(interface_id.split(":")[:-1])
710
        interface_number = int(interface_id.split(":")[-1])
711
712
        switch = self.switches.get(switch_id)
713
714
        if not switch:
715
            return None
716
717
        return switch.interfaces.get(interface_number, None)
718
719
    def get_switch_by_dpid(self, dpid):
720
        """Return a specific switch by dpid.
721
722
        Args:
723
            dpid (|DPID|): dpid object used to identify a switch.
724
725
        Returns:
726
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
727
728
        """
729
        return self.switches.get(dpid)
730
731
    def get_switch_or_create(self, dpid, connection=None):
732
        """Return switch or create it if necessary.
733
734
        Args:
735
            dpid (|DPID|): dpid object used to identify a switch.
736
            connection (:class:`~kytos.core.connection.Connection`):
737
                connection used by switch. If a switch has a connection that
738
                will be updated.
739
740
        Returns:
741
            :class:`~kytos.core.switch.Switch`: new or existent switch.
742
743
        """
744
        with self.switches_lock:
745
            if connection:
746
                self.create_or_update_connection(connection)
747
748
            switch = self.get_switch_by_dpid(dpid)
749
            event_name = 'kytos/core.switch.'
750
751
            if switch is None:
752
                switch = Switch(dpid=dpid)
753
                self.add_new_switch(switch)
754
                event_name += 'new'
755
            else:
756
                event_name += 'reconnected'
757
            event = KytosEvent(name=event_name, content={'switch': switch})
758
759
            if connection:
760
                old_connection = switch.connection
761
                switch.update_connection(connection)
762
763
                if old_connection is not connection:
764
                    self.remove_connection(old_connection)
765
766
            self.buffers.conn.put(event)
767
768
            return switch
769
770
    def create_or_update_connection(self, connection):
771
        """Update a connection.
772
773
        Args:
774
            connection (:class:`~kytos.core.connection.Connection`):
775
                Instance of connection that will be updated.
776
        """
777
        self.connections[connection.id] = connection
778
779
    def get_connection_by_id(self, conn_id):
780
        """Return a existent connection by id.
781
782
        Args:
783
            id (int): id from a connection.
784
785
        Returns:
786
            :class:`~kytos.core.connection.Connection`: Instance of connection
787
                or None Type.
788
789
        """
790
        return self.connections.get(conn_id)
791
792
    def remove_connection(self, connection):
793
        """Close a existent connection and remove it.
794
795
        Args:
796
            connection (:class:`~kytos.core.connection.Connection`):
797
                Instance of connection that will be removed.
798
        """
799
        if connection is None:
800
            return False
801
802
        try:
803
            connection.close()
804
            del self.connections[connection.id]
805
        except KeyError:
806
            return False
807
        return True
808
809
    def remove_switch(self, switch):
810
        """Remove an existent switch.
811
812
        Args:
813
            switch (:class:`~kytos.core.switch.Switch`):
814
                Instance of switch that will be removed.
815
        """
816
        try:
817
            del self.switches[switch.dpid]
818
        except KeyError:
819
            return False
820
        return True
821
822
    def new_connection(self, event):
823
        """Handle a kytos/core.connection.new event.
824
825
        This method will read new connection event and store the connection
826
        (socket) into the connections attribute on the controller.
827
828
        It also clear all references to the connection since it is a new
829
        connection on the same ip:port.
830
831
        Args:
832
            event (~kytos.core.KytosEvent):
833
                The received event (``kytos/core.connection.new``) with the
834
                needed info.
835
        """
836
        self.log.info("Handling %s...", event)
837
838
        connection = event.source
839
        self.log.debug("Event source: %s", event.source)
840
841
        # Remove old connection (aka cleanup) if it exists
842
        if self.get_connection_by_id(connection.id):
843
            self.remove_connection(connection.id)
844
845
        # Update connections with the new connection
846
        self.create_or_update_connection(connection)
847
848
    def add_new_switch(self, switch):
849
        """Add a new switch on the controller.
850
851
        Args:
852
            switch (Switch): A Switch object
853
        """
854
        self.switches[switch.dpid] = switch
855
856
    def _import_napp(self, username, napp_name):
857
        """Import a NApp module.
858
859
        Raises:
860
            FileNotFoundError: if NApp's main.py is not found.
861
            ModuleNotFoundError: if any NApp requirement is not installed.
862
863
        """
864
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
865
        path = os.path.join(self.options.napps, username, napp_name,
866
                            'main.py')
867
        napp_spec = spec_from_file_location(mod_name, path)
868
        napp_module = module_from_spec(napp_spec)
869
        sys.modules[napp_spec.name] = napp_module
870
        napp_spec.loader.exec_module(napp_module)
871
        return napp_module
872
873
    def load_napp(self, username, napp_name):
874
        """Load a single NApp.
875
876
        Args:
877
            username (str): NApp username (makes up NApp's path).
878
            napp_name (str): Name of the NApp to be loaded.
879
        """
880
        if (username, napp_name) in self.napps:
881
            message = 'NApp %s/%s was already loaded'
882
            self.log.warning(message, username, napp_name)
883
            return
884
885
        try:
886
            napp_module = self._import_napp(username, napp_name)
887
        except ModuleNotFoundError as err:
888
            self.log.error("Error loading NApp '%s/%s': %s",
889
                           username, napp_name, err)
890
            return
891
        except FileNotFoundError as err:
892
            msg = "NApp module not found, assuming it's a meta-napp: %s"
893
            self.log.warning(msg, err.filename)
894
            return
895
896
        try:
897
            napp = napp_module.Main(controller=self)
898
        except Exception as exc:  # noqa pylint: disable=bare-except
899
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
900
            raise KytosNAppSetupException(msg) from exc
901
902
        self.napps[(username, napp_name)] = napp
903
904
        napp.start()
905
        self.api_server.authenticate_endpoints(napp)
906
        self.api_server.register_napp_endpoints(napp)
907
908
        # pylint: disable=protected-access
909
        for event, listeners in napp._listeners.items():
910
            self.events_listeners.setdefault(event, []).extend(listeners)
911
        # pylint: enable=protected-access
912
913
    def pre_install_napps(self, napps, enable=True):
914
        """Pre install and enable NApps.
915
916
        Before installing, it'll check if it's installed yet.
917
918
        Args:
919
            napps ([str]): List of NApps to be pre-installed and enabled.
920
        """
921
        all_napps = self.napps_manager.get_installed_napps()
922
        installed = [str(napp) for napp in all_napps]
923
        napps_diff = [napp for napp in napps if napp not in installed]
924
        for napp in napps_diff:
925
            self.napps_manager.install(napp, enable=enable)
926
927
    def load_napps(self):
928
        """Load all NApps enabled on the NApps dir."""
929
        for napp in self.napps_manager.get_enabled_napps():
930
            try:
931
                self.log.info("Loading NApp %s", napp.id)
932
                self.load_napp(napp.username, napp.name)
933
            except FileNotFoundError as exception:
934
                self.log.error("Could not load NApp %s: %s",
935
                               napp.id, exception)
936
                msg = f"NApp {napp.id} exception {str(exception)}"
937
                raise KytosNAppSetupException(msg) from exception
938
939
    def unload_napp(self, username, napp_name):
940
        """Unload a specific NApp.
941
942
        Args:
943
            username (str): NApp username.
944
            napp_name (str): Name of the NApp to be unloaded.
945
        """
946
        napp = self.napps.pop((username, napp_name), None)
947
948
        if napp is None:
949
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
950
        else:
951
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
952
            napp_id = NApp(username, napp_name).id
953
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
954
            napp_shutdown_fn = self.events_listeners[event.name][0]
955
            # Call listener before removing it from events_listeners
956
            napp_shutdown_fn(event)
957
958
            # Remove rest endpoints from that napp
959
            self.api_server.remove_napp_endpoints(napp)
960
961
            # Removing listeners from that napp
962
            # pylint: disable=protected-access
963
            for event_type, napp_listeners in napp._listeners.items():
964
                event_listeners = self.events_listeners[event_type]
965
                for listener in napp_listeners:
966
                    event_listeners.remove(listener)
967
                if not event_listeners:
968
                    del self.events_listeners[event_type]
969
            # pylint: enable=protected-access
970
971
        return napp
972
973
    def unload_napps(self):
974
        """Unload all loaded NApps that are not core NApps
975
976
        NApps are unloaded in the reverse order that they are enabled to
977
        facilitate to shutdown gracefully.
978
        """
979
        napps = []
980
        for napp in reversed(self.napps_manager.get_enabled_napps()):
981
            if napp := self.unload_napp(napp.username, napp.name):
982
                napps.append(napp)
983
        return napps
984
985
    def reload_napp_module(self, username, napp_name, napp_file):
986
        """Reload a NApp Module."""
987
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
988
        try:
989
            napp_module = import_module(mod_name)
990
        except ModuleNotFoundError:
991
            self.log.error("Module '%s' not found", mod_name)
992
            raise
993
        try:
994
            napp_module = reload_module(napp_module)
995
        except ImportError as err:
996
            self.log.error("Error reloading NApp '%s/%s': %s",
997
                           username, napp_name, err)
998
            raise
999
1000
    def reload_napp(self, username, napp_name):
1001
        """Reload a NApp."""
1002
        self.unload_napp(username, napp_name)
1003
        try:
1004
            self.reload_napp_module(username, napp_name, 'settings')
1005
            self.reload_napp_module(username, napp_name, 'main')
1006
        except (ModuleNotFoundError, ImportError):
1007
            return 400
1008
        self.log.info("NApp '%s/%s' successfully reloaded",
1009
                      username, napp_name)
1010
        self.load_napp(username, napp_name)
1011
        return 200
1012
1013
    def rest_reload_napp(self, request: Request) -> JSONResponse:
1014
        """Request reload a NApp."""
1015
        username = request.path_params["username"]
1016
        napp_name = request.path_params["napp_name"]
1017
        res = self.reload_napp(username, napp_name)
1018
        if res == 200:
1019
            return JSONResponse('reloaded')
1020
        return JSONResponse('fail to reload', status_code=res)
1021
1022
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
1023
        """Request reload all NApps."""
1024
        for napp in self.napps:
1025
            self.reload_napp(*napp)
1026
        return JSONResponse('reloaded')
1027
1028
    def _full_queue_counter(self) -> Counter:
1029
        """Generate full queue stats counter."""
1030
        buffer_counter = defaultdict(Counter)
1031
        for buffer in self.buffers.get_all_buffers():
1032
            if not buffer.full():
1033
                continue
1034
            while not buffer.empty():
1035
                event = buffer.get()
1036
                buffer_counter[buffer.name][event.name] += 1
1037
        return buffer_counter
1038
1039
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
1040
        """Try to fmt traceback message."""
1041
        if counter:
1042
            counter = dict(counter)
1043
            message = (
1044
                f"{message}\nFull KytosEventBuffers counters: {counter}"
1045
            )
1046
        return message
1047
1048
    def get_link_or_create(
1049
        self,
1050
        endpoint_a: Interface,
1051
        endpoint_b: Interface,
1052
        link_dict: Optional[dict] = None,
1053
    ) -> tuple[Link, bool]:
1054
        """Get an existing link or create a new one.
1055
1056
        Returns:
1057
            Tuple(Link, bool): Link and a boolean whether it has been created.
1058
        """
1059
        new_link = Link(endpoint_a, endpoint_b)
1060
1061
        # If new_link is an old link but mismatched,
1062
        # then treat it as a new link
1063
        if (new_link.id in self.links
1064
                and not self.detect_mismatched_link(new_link)):
1065
            return (self.links[new_link.id], False)
1066
1067
        with new_link.lock:
1068
            # Check if any interface already has a link
1069
            # This old_link is a leftover link that needs to be removed
1070
            # The other endpoint of the link is the leftover interface
1071
            if endpoint_a.link and endpoint_a.link != new_link:
1072
                old_link = endpoint_a.link
1073
                leftover_interface = (
1074
                    old_link.endpoint_a
1075
                    if old_link.endpoint_a != endpoint_a
1076
                    else old_link.endpoint_b
1077
                )
1078
                self.log.warning(
1079
                    f"Leftover mismatched link"
1080
                    f" {endpoint_a.link} in interface"
1081
                    f" {leftover_interface}"
1082
                )
1083
1084
            if endpoint_b.link and endpoint_b.link != new_link:
1085
                old_link = endpoint_b.link
1086
                leftover_interface = (
1087
                    old_link.endpoint_b
1088
                    if old_link.endpoint_b != endpoint_b
1089
                    else old_link.endpoint_a
1090
                )
1091
                self.log.warning(
1092
                    f"Leftover mismatched link "
1093
                    f" {endpoint_b.link} in interface"
1094
                    f" {leftover_interface}"
1095
                )
1096
1097
            if new_link.id not in self.links:
1098
                self.links[new_link.id] = new_link
1099
1100
            endpoint_a.update_link(new_link)
1101
            endpoint_b.update_link(new_link)
1102
            new_link.endpoint_a = endpoint_a
1103
            new_link.endpoint_b = endpoint_b
1104
            endpoint_a.nni = True
1105
            endpoint_b.nni = True
1106
1107
            if link_dict:
1108
                if link_dict['enabled']:
1109
                    new_link.enable()
1110
                else:
1111
                    new_link.disable()
1112
1113
                if link_dict.get("metadata"):
1114
                    new_link.extend_metadata(link_dict["metadata"])
1115
1116
        return (self.links[new_link.id], True)
1117
1118
    def get_link(self, link_id: str) -> Optional[Link]:
1119
        """Return a link by its ID.
1120
1121
        Returns:
1122
            Optional[Link]: Link if found, None otherwise.
1123
        """
1124
        return self.links.get(link_id)
1125
1126
    @staticmethod
1127
    def detect_mismatched_link(link: Link) -> frozenset[str]:
1128
        """Check if a link is mismatched."""
1129
        if (link.endpoint_a.link and link.endpoint_b
1130
                and link.endpoint_a.link == link.endpoint_b.link):
1131
            return frozenset()
1132
        return frozenset(["mismatched_link"])
1133
1134
    def link_status_mismatched(self, link: Link) -> Optional[EntityStatus]:
1135
        """Check if a link is mismatched and return a status."""
1136
        if self.detect_mismatched_link(link):
1137
            return EntityStatus.DOWN
1138
        return None
1139