Controller.get_interface_by_id()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 22
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 2
dl 0
loc 22
rs 9.95
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from typing import Iterable, Optional
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.common import EntityStatus
41
from kytos.core.config import KytosConfig
42
from kytos.core.connection import Connection, ConnectionState
43
from kytos.core.db import db_conn_wait
44
from kytos.core.dead_letter import DeadLetter
45
from kytos.core.events import KytosEvent
46
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
47
                                   KytosNAppSetupException)
48
from kytos.core.helpers import executors, now
49
from kytos.core.interface import Interface
50
from kytos.core.link import Link
51
from kytos.core.logs import LogManager
52
from kytos.core.napps.base import NApp
53
from kytos.core.napps.manager import NAppsManager
54
from kytos.core.napps.napp_dir_listener import NAppDirListener
55
from kytos.core.pacing import Pacer
56
from kytos.core.queue_monitor import QueueMonitorWindow
57
from kytos.core.switch import Switch
58
59
__all__ = ('Controller',)
60
61
62
def exc_handler(_, exc, __):
63
    """Log uncaught exceptions.
64
65
    Args:
66
        exc_type (ignored): exception type
67
        exc: exception instance
68
        tb (ignored): traceback
69
    """
70
    logging.basicConfig(filename='errlog.log',
71
                        format='%(asctime)s:%(pathname)'
72
                        's:%(levelname)s:%(message)s')
73
    logging.exception('Uncaught Exception: %s', exc)
74
75
76
class Controller:
77
    """Main class of Kytos.
78
79
    The main responsibilities of this class are:
80
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
81
        - manage KytosNApps (install, load and unload);
82
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
83
        - manage which event should be sent to NApps methods;
84
        - manage the buffers handlers, considering one thread per handler.
85
    """
86
87
    # Created issue #568 for the disabled checks.
88
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
89
    # pylint: disable=consider-using-with,unnecessary-dunder-call
90
    # pylint: disable=too-many-lines
91
    def __init__(self, options=None, loop: AbstractEventLoop = None):
92
        """Init method of Controller class takes the parameters below.
93
94
        Args:
95
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
96
                instance of :class:`~kytos.core.config.KytosConfig` class.
97
            loop asyncio.AbstractEventLoop
98
        """
99
        if options is None:
100
            options = KytosConfig().options['daemon']
101
102
        self.loop = loop
103
104
        # asyncio tasks
105
        self._tasks: list[asyncio.Task] = []
106
107
        #: KytosBuffers: KytosBuffer object with Controller buffers
108
        self._buffers: KytosBuffers = None
109
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
110
        #:
111
        #: This dict stores all connections between the controller and the
112
        #: switches. The key for this dict is a tuple (ip, port). The content
113
        #: is a Connection
114
        self.connections: dict[tuple, Connection] = {}
115
        #: dict: mapping of events and event listeners.
116
        #:
117
        #: The key of the dict is a KytosEvent (or a string that represent a
118
        #: regex to match against KytosEvents) and the value is a list of
119
        #: methods that will receive the referenced event
120
        self.events_listeners = {'kytos/core.connection.new':
121
                                 [self.new_connection]}
122
123
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
124
        #:
125
        #: The key is the napp name (string), while the value is the napp
126
        #: instance itself.
127
        self.napps = {}
128
        #: Object generated by ParseArgs on config.py file
129
        self.options = options
130
        #: KytosServer: Instance of KytosServer that will be listening to TCP
131
        #: connections.
132
        self.server = None
133
        #: dict: Current existing switches.
134
        #:
135
        #: The key is the switch dpid, while the value is a Switch object.
136
        self.switches: dict[str, Switch] = {}
137
        self._switches_lock = threading.Lock()
138
139
        #: datetime.datetime: Time when the controller finished starting.
140
        self.started_at = None
141
142
        #: logging.Logger: Logger instance used by Kytos.
143
        self.log = None
144
145
        #: Observer that handle NApps when they are enabled or disabled.
146
        self.napp_dir_listener = NAppDirListener(self)
147
148
        self.napps_manager = NAppsManager(self)
149
150
        #: API Server used to expose rest endpoints.
151
        self.api_server = APIServer(self.options.listen,
152
                                    self.options.api_port,
153
                                    self.napps_manager,
154
                                    self.options.napps,
155
                                    self.options.api_traceback_on_500)
156
157
        self.auth = None
158
        self.dead_letter = DeadLetter(self)
159
        self._alisten_tasks = set()
160
        self.qmonitors: list[QueueMonitorWindow] = []
161
162
        #: APM client in memory to be closed when necessary
163
        self.apm = None
164
165
        self._register_endpoints()
166
        #: Adding the napps 'enabled' directory into the PATH
167
        #: Now you can access the enabled napps with:
168
        #: from napps.<username>.<napp_name> import ?....
169
        sys.path.append(os.path.join(self.options.napps, os.pardir))
170
        sys.excepthook = exc_handler
171
172
        #: Pacer for controlling the rate which actions can be executed
173
        self.pacer = Pacer("memory://")
174
175
        self.links: dict[str, Link] = {}
176
        self.links_lock = threading.Lock()
177
        Link.register_status_reason_func("controller_mismatched_reason",
178
                                         self.detect_mismatched_link)
179
        Link.register_status_func("controller_mismatched_status",
180
                                  self.link_status_mismatched)
181
182
    def start_auth(self):
183
        """Initialize Auth() and its services"""
184
        self.auth = Auth(self)
185
        self.auth.register_core_auth_services()
186
187
    def enable_logs(self):
188
        """Register kytos log and enable the logs."""
189
        decorators = self.options.logger_decorators
190
        try:
191
            decorators = [self._resolve(deco) for deco in decorators]
192
        except ModuleNotFoundError as err:
193
            sys.exit(f'Failed to resolve decorator module: {err.name}')
194
        except AttributeError:
195
            sys.exit(f'Failed to resolve decorator name: {decorators}')
196
        LogManager.decorate_logger_class(*decorators)
197
        LogManager.load_config_file(self.options.logging, self.options.debug)
198
        # pylint: disable=fixme
199
        # TODO issue 371
200
        # LogManager.enable_websocket(self.api_server.server)
201
        self.log = logging.getLogger(__name__)
202
        self._patch_core_loggers()
203
204
    @staticmethod
205
    def _resolve(name):
206
        """Resolve a dotted name to a global object."""
207
        mod, _, attr = name.rpartition('.')
208
        mod = importlib.import_module(mod)
209
        return getattr(mod, attr)
210
211
    @staticmethod
212
    def _patch_core_loggers():
213
        """Patch in updated loggers to 'kytos.core.*' modules"""
214
        match_str = 'kytos.core.'
215
        str_len = len(match_str)
216
        reloadable_mods = [module for mod_name, module in sys.modules.items()
217
                           if mod_name[:str_len] == match_str]
218
        for module in reloadable_mods:
219
            new_logger = logging.getLogger(module.__name__)
220
            if hasattr(module, "LOG"):
221
                old_logger = module.LOG
222
                for handler in old_logger.handlers:
223
                    new_logger.addHandler(handler)
224
                for log_filter in old_logger.filters:
225
                    new_logger.addFilter(log_filter)
226
            module.LOG = new_logger
227
228
    @staticmethod
229
    def loggers():
230
        """List all logging Loggers.
231
232
        Return a list of Logger objects, with name and logging level.
233
        """
234
        # pylint: disable=no-member
235
        return [logging.getLogger(name)
236
                for name in logging.root.manager.loggerDict
237
                if "kytos" in name]
238
239
    def toggle_debug(self, name=None):
240
        """Enable/disable logging debug messages to a given logger name.
241
242
        If the name parameter is not specified the debug will be
243
        enabled/disabled following the initial config file. It will decide
244
        to enable/disable using the 'kytos' name to find the current
245
        logger level.
246
        Obs: To disable the debug the logging will be set to NOTSET
247
248
        Args:
249
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
250
        """
251
        # pylint: disable=no-member
252
        if name and name not in logging.root.manager.loggerDict:
253
            # A Logger name that is not declared in logging will raise an error
254
            # otherwise logging would create a new Logger.
255
            raise ValueError(f"Invalid logger name: {name}")
256
257
        if not name:
258
            # Logger name not specified.
259
            level = logging.getLogger('kytos').getEffectiveLevel()
260
            enable_debug = level != logging.DEBUG
261
262
            # Enable/disable default Loggers
263
            LogManager.load_config_file(self.options.logging, enable_debug)
264
            return
265
266
        # Get effective logger level for the name
267
        level = logging.getLogger(name).getEffectiveLevel()
268
        logger = logging.getLogger(name)
269
270
        if level == logging.DEBUG:
271
            # disable debug
272
            logger.setLevel(logging.NOTSET)
273
        else:
274
            # enable debug
275
            logger.setLevel(logging.DEBUG)
276
277
    async def start(self, restart=False):
278
        """Create pidfile and call start_controller method."""
279
        self.enable_logs()
280
        # pylint: disable=broad-except
281
        try:
282
            if self.options.database:
283
                db_conn_wait(db_backend=self.options.database)
284
            if self.options.apm:
285
                self.apm = init_apm(self.options.apm, app=self.api_server.app)
286
            if not restart:
287
                self.create_pidfile()
288
            await self.start_controller()
289
        except (KytosDBInitException, KytosAPMInitException) as exc:
290
            message = f"Kytos couldn't start because of {str(exc)}"
291
            sys.exit(message)
292
        except Exception as exc:
293
            exc_fmt = traceback.format_exc(chain=True)
294
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
295
            counter = self._full_queue_counter()
296
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
297
298
    def start_queue_monitors(self) -> None:
299
        """Start QueueMonitorWindows."""
300
        for data in self.options.event_buffer_monitors:
301
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
302
                self.qmonitors.append(qmon)
303
        for data in self.options.thread_pool_queue_monitors:
304
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
305
                self.qmonitors.append(qmon)
306
        for qmonitor in self.qmonitors:
307
            self.log.info(f"Starting {qmonitor}...")
308
            qmonitor.start()
309
310
    def stop_queue_monitors(self) -> None:
311
        """Stop QueueMonitorWindows."""
312
        for qmonitor in self.qmonitors:
313
            self.log.info(f"Stopping {qmonitor}...")
314
            qmonitor.stop()
315
316
    def create_pidfile(self):
317
        """Create a pidfile."""
318
        pid = os.getpid()
319
320
        # Creates directory if it doesn't exist
321
        # System can erase /var/run's content
322
        pid_folder = Path(self.options.pidfile).parent
323
        self.log.info(pid_folder)
324
325
        # Pylint incorrectly infers Path objects
326
        # https://github.com/PyCQA/pylint/issues/224
327
        # pylint: disable=no-member
328
        if not pid_folder.exists():
329
            pid_folder.mkdir()
330
            pid_folder.chmod(0o1777)
331
        # pylint: enable=no-member
332
333
        # Make sure the file is deleted when controller stops
334
        atexit.register(Path(self.options.pidfile).unlink)
335
336
        # Checks if a pidfile exists. Creates a new file.
337
        try:
338
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
339
        except OSError:
340
            # This happens if there is a pidfile already.
341
            # We shall check if the process that created the pidfile is still
342
            # running.
343
            try:
344
                existing_file = open(self.options.pidfile, mode='r',
345
                                     encoding="utf8")
346
                old_pid = int(existing_file.read())
347
                os.kill(old_pid, 0)
348
                # If kill() doesn't return an error, there's a process running
349
                # with the same PID. We assume it is Kytos and quit.
350
                # Otherwise, overwrite the file and proceed.
351
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
352
                             "running. Aborting.")
353
                sys.exit(error_msg.format(self.options.pidfile))
354
            except OSError:
355
                try:
356
                    pidfile = open(self.options.pidfile, mode='w',
357
                                   encoding="utf8")
358
                except OSError as exception:
359
                    error_msg = "Failed to create pidfile {}: {}."
360
                    sys.exit(error_msg.format(self.options.pidfile, exception))
361
362
        # Identifies the process that created the pidfile.
363
        pidfile.write(str(pid))
364
        pidfile.close()
365
366
    @property
367
    def buffers(self) -> KytosBuffers:
368
        """KytosBuffers exposed through this property to allow lazy
369
        async initiliation with an event loop running."""
370
        if not self._buffers:
371
            self._buffers = KytosBuffers()
372
        return self._buffers
373
374
    async def _wait_api_server_started(self):
375
        """Use this method to wait for Uvicorn server to start."""
376
        while not self.api_server.server.started:
377
            await asyncio.sleep(0.1)
378
379
    async def start_controller(self):
380
        """Start the controller.
381
382
        Starts the KytosServer (TCP Server) coroutine.
383
        Starts a thread for each buffer handler.
384
        Load the installed apps.
385
        """
386
        self.log.info("Starting Kytos - Kytos Controller")
387
        if not self._buffers:
388
            self._buffers = KytosBuffers()
389
        self.server = KytosServer((self.options.listen,
390
                                   int(self.options.port)),
391
                                  KytosServerProtocol,
392
                                  self,
393
                                  self.options.protocol_name)
394
395
        self.log.info("Starting API server")
396
        task = self.loop.create_task(self.api_server.serve())
397
        self._tasks.append(task)
398
        await self._wait_api_server_started()
399
400
        self.log.info(f"Starting TCP server: {self.server}")
401
        self.server.serve_forever()
402
403
        # ASYNC TODO: ensure all threads started correctly
404
        # This is critical, if any of them failed starting we should exit.
405
        # sys.exit(error_msg.format(thread, exception))
406
407
        self.log.info("Starting authorization.")
408
        self.start_auth()
409
        self.log.info("Loading Kytos NApps...")
410
        self.napp_dir_listener.start()
411
        self.pre_install_napps(self.options.napps_pre_installed)
412
        self.load_napps()
413
        self.api_server.start_web_ui()
414
415
        # Start task handlers consumers after NApps to potentially
416
        # avoid discarding an event if a consumer isn't ready yet
417
        task = self.loop.create_task(self.event_handler("conn"))
418
        self._tasks.append(task)
419
        task = self.loop.create_task(self.event_handler("raw"))
420
        self._tasks.append(task)
421
        task = self.loop.create_task(self.event_handler("msg_in"))
422
        self._tasks.append(task)
423
        task = self.loop.create_task(self.msg_out_event_handler())
424
        self._tasks.append(task)
425
        task = self.loop.create_task(self.event_handler("app"))
426
        self._tasks.append(task)
427
        task = self.loop.create_task(self.event_handler("meta"))
428
        self._tasks.append(task)
429
430
        self.start_queue_monitors()
431
        self.started_at = now()
432
433
    def _register_endpoints(self):
434
        """Register all rest endpoint served by kytos.
435
436
        -   Register APIServer endpoints
437
        -   Register WebUI endpoints
438
        -   Register ``/api/kytos/core/config`` endpoint
439
        """
440
        self.api_server.start_api()
441
        # Register controller endpoints as /api/kytos/core/...
442
        self.api_server.register_core_endpoint('config/',
443
                                               self.configuration_endpoint)
444
        self.api_server.register_core_endpoint('metadata/',
445
                                               Controller.metadata_endpoint)
446
        self.api_server.register_core_endpoint(
447
            'reload/{username}/{napp_name}/',
448
            self.rest_reload_napp)
449
        self.api_server.register_core_endpoint('reload/all',
450
                                               self.rest_reload_all_napps)
451
        self.dead_letter.register_endpoints()
452
453
    def register_rest_endpoint(self, url, function, methods):
454
        """Deprecate in favor of @rest decorator."""
455
        self.api_server.register_rest_endpoint(url, function, methods)
456
457
    @classmethod
458
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
459
        """Return the Kytos metadata.
460
461
        Returns:
462
            string: Json with current kytos metadata.
463
464
        """
465
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
466
        with open(meta_path, encoding="utf8") as file:
467
            meta_file = file.read()
468
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
469
        return JSONResponse(metadata)
470
471
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
472
        """Return the configuration options used by Kytos.
473
474
        Returns:
475
            string: Json with current configurations used by kytos.
476
477
        """
478
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
479
480
    def restart(self, graceful=True):
481
        """Restart Kytos SDN Controller.
482
483
        Args:
484
            graceful(bool): Represents the way that Kytos will restart.
485
        """
486
        if self.started_at is not None:
487
            self.stop(graceful)
488
            self.__init__(self.options)
489
490
        self.start(restart=True)
491
492
    def stop(self, graceful=True):
493
        """Shutdown all services used by kytos.
494
495
        This method should:
496
            - stop all Websockets
497
            - stop the API Server
498
            - stop the Controller
499
        """
500
        if self.started_at:
501
            self.stop_controller(graceful)
502
503
    def stop_controller(self, graceful=True):
504
        """Stop the controller.
505
506
        This method should:
507
            - announce on the network that the controller will shutdown;
508
            - stop receiving incoming packages;
509
            - call the 'shutdown' method of each KytosNApp that is running;
510
            - finish reading the events on all buffers;
511
            - stop each running handler;
512
            - stop all running threads;
513
            - stop the KytosServer;
514
        """
515
        self.log.info("Stopping Kytos")
516
517
        self.buffers.send_stop_signal()
518
        self.napp_dir_listener.stop()
519
520
        for pool_name in executors:
521
            self.log.info("Stopping threadpool: %s", pool_name)
522
523
        threads = threading.enumerate()
524
        self.log.debug("%s threads before threadpool shutdown: %s",
525
                       len(threads), threads)
526
527
        for executor_pool in executors.values():
528
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
529
530
        # self.server.socket.shutdown()
531
        # self.server.socket.close()
532
533
        self.started_at = None
534
        self.unload_napps()
535
536
        # Cancel all async tasks (event handlers and servers)
537
        for task in self._tasks:
538
            task.cancel()
539
540
        # ASYNC TODO: close connections
541
        # self.server.server_close()
542
543
        self.stop_queue_monitors()
544
        if self.apm:
545
            self.log.info("Stopping APM server...")
546
            self.apm.close()
547
            self.log.info("Stopped APM Server")
548
        self.log.info("Stopping API Server...")
549
        self.api_server.stop()
550
        self.log.info("Stopped API Server")
551
        self.log.info("Stopping TCP Server...")
552
        self.server.shutdown()
553
        self.log.info("Stopped TCP Server")
554
        self.loop.stop()
555
556
    def status(self):
557
        """Return status of Kytos Server.
558
559
        If the controller kytos is running this method will be returned
560
        "Running since 'Started_At'", otherwise "Stopped".
561
562
        Returns:
563
            string: String with kytos status.
564
565
        """
566
        if self.started_at:
567
            return f"Running since {self.started_at}"
568
        return "Stopped"
569
570
    def uptime(self):
571
        """Return the uptime of kytos server.
572
573
        This method should return:
574
            - 0 if Kytos Server is stopped.
575
            - (kytos.start_at - datetime.now) if Kytos Server is running.
576
577
        Returns:
578
           datetime.timedelta: The uptime interval.
579
580
        """
581
        return now() - self.started_at if self.started_at else 0
582
583
    def notify_listeners(self, event):
584
        """Send the event to the specified listeners.
585
586
        Loops over self.events_listeners matching (by regexp) the attribute
587
        name of the event with the keys of events_listeners. If a match occurs,
588
        then send the event to each registered listener.
589
590
        Args:
591
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
592
        """
593
594
        self.log.debug("looking for listeners for %s", event)
595
        for event_regex, listeners in dict(self.events_listeners).items():
596
            # self.log.debug("listeners found for %s: %r => %s", event,
597
            #                event_regex, [l.__qualname__ for l in listeners])
598
            # Do not match if the event has more characters
599
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
600
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
601
                event_regex += '$'
602
            if re.match(event_regex, event.name):
603
                self.log.debug('Calling listeners for %s', event)
604
                for listener in listeners:
605
                    if asyncio.iscoroutinefunction(listener):
606
                        task = asyncio.create_task(listener(event))
607
                        self._alisten_tasks.add(task)
608
                        task.add_done_callback(self._alisten_tasks.discard)
609
                    else:
610
                        listener(event)
611
612
    # pylint: disable=broad-exception-caught
613
    async def event_handler(self, buffer_name: str):
614
        """Default event handler that gets from an event buffer."""
615
        event_buffer = getattr(self.buffers, buffer_name)
616
        self.log.info(f"Event handler {buffer_name} started")
617
        while True:
618
            try:
619
                # After hitting issues with a large amount of flows being
620
                # installed (16k which sends 32k events), this task was
621
                # hogging resources in the MainThread causing disconnections
622
                # and socket exceptions. With the following sleep, this task
623
                # yields to other loops mitigating the disconnection issues.
624
                # Now the cap for flow installation at the same time is 50k.
625
                await asyncio.sleep(0)
626
                event = await event_buffer.aget()
627
                self.notify_listeners(event)
628
629
                if event.name == "kytos/core.shutdown":
630
                    self.log.debug(f"Event handler {buffer_name} stopped")
631
                    break
632
            except Exception as exc:
633
                self.log.exception(f"Unhandled exception on {buffer_name}",
634
                                   exc_info=exc)
635
636
    async def publish_connection_error(self, event):
637
        """Publish connection error event.
638
639
        Args:
640
            event (KytosEvent): Event that triggered for error propagation.
641
        """
642
        event.name = \
643
            f"kytos/core.{event.destination.protocol.name}.connection.error"
644
        error_msg = f"Connection state: {event.destination.state}"
645
        event.content["exception"] = error_msg
646
        await self.buffers.conn.aput(event)
647
648
    # pylint: disable=broad-exception-caught
649
    async def msg_out_event_handler(self):
650
        """Handle msg_out events.
651
652
        Listen to the msg_out buffer and send all its events to the
653
        corresponding listeners.
654
        """
655
        self.log.info("Event handler msg_out started")
656
        while True:
657
            try:
658
                triggered_event = await self.buffers.msg_out.aget()
659
660
                if triggered_event.name == "kytos/core.shutdown":
661
                    self.log.debug("Message Out Event handler stopped")
662
                    break
663
664
                message = triggered_event.content['message']
665
                destination = triggered_event.destination
666
                if (destination and
667
                        not destination.state == ConnectionState.FINISHED):
668
                    packet = message.pack()
669
                    destination.send(packet)
670
                    self.log.debug('Connection %s: OUT OFP, '
671
                                   'version: %s, type: %s, xid: %s - %s',
672
                                   destination.id,
673
                                   message.header.version,
674
                                   message.header.message_type,
675
                                   message.header.xid,
676
                                   packet.hex())
677
                    self.notify_listeners(triggered_event)
678
            except OSError:
679
                await self.publish_connection_error(triggered_event)
680
                self.log.info("connection closed. Cannot send message")
681
            except PackException as err:
682
                self.log.error(
683
                    f"Discarding message: {message}, event: {triggered_event} "
684
                    f"because of PackException {err}"
685
                )
686
            except Exception as exc:
687
                self.log.exception("Unhandled exception on msg_out",
688
                                   exc_info=exc)
689
690
    def get_interface_by_id(self, interface_id):
691
        """Find a Interface  with interface_id.
692
693
        Args:
694
            interface_id(str): Interface Identifier.
695
696
        Returns:
697
            Interface: Instance of Interface with the id given.
698
699
        """
700
        if interface_id is None:
701
            return None
702
703
        switch_id = ":".join(interface_id.split(":")[:-1])
704
        interface_number = int(interface_id.split(":")[-1])
705
706
        switch = self.switches.get(switch_id)
707
708
        if not switch:
709
            return None
710
711
        return switch.interfaces.get(interface_number, None)
712
713
    def get_switch_by_dpid(self, dpid):
714
        """Return a specific switch by dpid.
715
716
        Args:
717
            dpid (|DPID|): dpid object used to identify a switch.
718
719
        Returns:
720
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
721
722
        """
723
        return self.switches.get(dpid)
724
725
    def get_switch_or_create(self, dpid, connection=None):
726
        """Return switch or create it if necessary.
727
728
        Args:
729
            dpid (|DPID|): dpid object used to identify a switch.
730
            connection (:class:`~kytos.core.connection.Connection`):
731
                connection used by switch. If a switch has a connection that
732
                will be updated.
733
734
        Returns:
735
            :class:`~kytos.core.switch.Switch`: new or existent switch.
736
737
        """
738
        with self._switches_lock:
739
            if connection:
740
                self.create_or_update_connection(connection)
741
742
            switch = self.get_switch_by_dpid(dpid)
743
            event_name = 'kytos/core.switch.'
744
745
            if switch is None:
746
                switch = Switch(dpid=dpid)
747
                self.add_new_switch(switch)
748
                event_name += 'new'
749
            else:
750
                event_name += 'reconnected'
751
            event = KytosEvent(name=event_name, content={'switch': switch})
752
753
            if connection:
754
                old_connection = switch.connection
755
                switch.update_connection(connection)
756
757
                if old_connection is not connection:
758
                    self.remove_connection(old_connection)
759
760
            self.buffers.conn.put(event)
761
762
            return switch
763
764
    def create_or_update_connection(self, connection):
765
        """Update a connection.
766
767
        Args:
768
            connection (:class:`~kytos.core.connection.Connection`):
769
                Instance of connection that will be updated.
770
        """
771
        self.connections[connection.id] = connection
772
773
    def get_connection_by_id(self, conn_id):
774
        """Return a existent connection by id.
775
776
        Args:
777
            id (int): id from a connection.
778
779
        Returns:
780
            :class:`~kytos.core.connection.Connection`: Instance of connection
781
                or None Type.
782
783
        """
784
        return self.connections.get(conn_id)
785
786
    def remove_connection(self, connection):
787
        """Close a existent connection and remove it.
788
789
        Args:
790
            connection (:class:`~kytos.core.connection.Connection`):
791
                Instance of connection that will be removed.
792
        """
793
        if connection is None:
794
            return False
795
796
        try:
797
            connection.close()
798
            del self.connections[connection.id]
799
        except KeyError:
800
            return False
801
        return True
802
803
    def remove_switch(self, switch):
804
        """Remove an existent switch.
805
806
        Args:
807
            switch (:class:`~kytos.core.switch.Switch`):
808
                Instance of switch that will be removed.
809
        """
810
        try:
811
            del self.switches[switch.dpid]
812
        except KeyError:
813
            return False
814
        return True
815
816
    def new_connection(self, event):
817
        """Handle a kytos/core.connection.new event.
818
819
        This method will read new connection event and store the connection
820
        (socket) into the connections attribute on the controller.
821
822
        It also clear all references to the connection since it is a new
823
        connection on the same ip:port.
824
825
        Args:
826
            event (~kytos.core.KytosEvent):
827
                The received event (``kytos/core.connection.new``) with the
828
                needed info.
829
        """
830
        self.log.info("Handling %s...", event)
831
832
        connection = event.source
833
        self.log.debug("Event source: %s", event.source)
834
835
        # Remove old connection (aka cleanup) if it exists
836
        if self.get_connection_by_id(connection.id):
837
            self.remove_connection(connection.id)
838
839
        # Update connections with the new connection
840
        self.create_or_update_connection(connection)
841
842
    def add_new_switch(self, switch):
843
        """Add a new switch on the controller.
844
845
        Args:
846
            switch (Switch): A Switch object
847
        """
848
        self.switches[switch.dpid] = switch
849
850
    def _import_napp(self, username, napp_name):
851
        """Import a NApp module.
852
853
        Raises:
854
            FileNotFoundError: if NApp's main.py is not found.
855
            ModuleNotFoundError: if any NApp requirement is not installed.
856
857
        """
858
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
859
        path = os.path.join(self.options.napps, username, napp_name,
860
                            'main.py')
861
        napp_spec = spec_from_file_location(mod_name, path)
862
        napp_module = module_from_spec(napp_spec)
863
        sys.modules[napp_spec.name] = napp_module
864
        napp_spec.loader.exec_module(napp_module)
865
        return napp_module
866
867
    def load_napp(self, username, napp_name):
868
        """Load a single NApp.
869
870
        Args:
871
            username (str): NApp username (makes up NApp's path).
872
            napp_name (str): Name of the NApp to be loaded.
873
        """
874
        if (username, napp_name) in self.napps:
875
            message = 'NApp %s/%s was already loaded'
876
            self.log.warning(message, username, napp_name)
877
            return
878
879
        try:
880
            napp_module = self._import_napp(username, napp_name)
881
        except ModuleNotFoundError as err:
882
            self.log.error("Error loading NApp '%s/%s': %s",
883
                           username, napp_name, err)
884
            return
885
        except FileNotFoundError as err:
886
            msg = "NApp module not found, assuming it's a meta-napp: %s"
887
            self.log.warning(msg, err.filename)
888
            return
889
890
        try:
891
            napp = napp_module.Main(controller=self)
892
        except Exception as exc:  # noqa pylint: disable=bare-except
893
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
894
            raise KytosNAppSetupException(msg) from exc
895
896
        self.napps[(username, napp_name)] = napp
897
898
        napp.start()
899
        self.api_server.authenticate_endpoints(napp)
900
        self.api_server.register_napp_endpoints(napp)
901
902
        # pylint: disable=protected-access
903
        for event, listeners in napp._listeners.items():
904
            self.events_listeners.setdefault(event, []).extend(listeners)
905
        # pylint: enable=protected-access
906
907
    def pre_install_napps(self, napps, enable=True):
908
        """Pre install and enable NApps.
909
910
        Before installing, it'll check if it's installed yet.
911
912
        Args:
913
            napps ([str]): List of NApps to be pre-installed and enabled.
914
        """
915
        all_napps = self.napps_manager.get_installed_napps()
916
        installed = [str(napp) for napp in all_napps]
917
        napps_diff = [napp for napp in napps if napp not in installed]
918
        for napp in napps_diff:
919
            self.napps_manager.install(napp, enable=enable)
920
921
    def load_napps(self):
922
        """Load all NApps enabled on the NApps dir."""
923
        for napp in self.napps_manager.get_enabled_napps():
924
            try:
925
                self.log.info("Loading NApp %s", napp.id)
926
                self.load_napp(napp.username, napp.name)
927
            except FileNotFoundError as exception:
928
                self.log.error("Could not load NApp %s: %s",
929
                               napp.id, exception)
930
                msg = f"NApp {napp.id} exception {str(exception)}"
931
                raise KytosNAppSetupException(msg) from exception
932
933
    def unload_napp(self, username, napp_name):
934
        """Unload a specific NApp.
935
936
        Args:
937
            username (str): NApp username.
938
            napp_name (str): Name of the NApp to be unloaded.
939
        """
940
        napp = self.napps.pop((username, napp_name), None)
941
942
        if napp is None:
943
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
944
        else:
945
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
946
            napp_id = NApp(username, napp_name).id
947
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
948
            napp_shutdown_fn = self.events_listeners[event.name][0]
949
            # Call listener before removing it from events_listeners
950
            napp_shutdown_fn(event)
951
952
            # Remove rest endpoints from that napp
953
            self.api_server.remove_napp_endpoints(napp)
954
955
            # Removing listeners from that napp
956
            # pylint: disable=protected-access
957
            for event_type, napp_listeners in napp._listeners.items():
958
                event_listeners = self.events_listeners[event_type]
959
                for listener in napp_listeners:
960
                    event_listeners.remove(listener)
961
                if not event_listeners:
962
                    del self.events_listeners[event_type]
963
            # pylint: enable=protected-access
964
965
    def unload_napps(self):
966
        """Unload all loaded NApps that are not core NApps
967
968
        NApps are unloaded in the reverse order that they are enabled to
969
        facilitate to shutdown gracefully.
970
        """
971
        for napp in reversed(self.napps_manager.get_enabled_napps()):
972
            self.unload_napp(napp.username, napp.name)
973
974
    def reload_napp_module(self, username, napp_name, napp_file):
975
        """Reload a NApp Module."""
976
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
977
        try:
978
            napp_module = import_module(mod_name)
979
        except ModuleNotFoundError:
980
            self.log.error("Module '%s' not found", mod_name)
981
            raise
982
        try:
983
            napp_module = reload_module(napp_module)
984
        except ImportError as err:
985
            self.log.error("Error reloading NApp '%s/%s': %s",
986
                           username, napp_name, err)
987
            raise
988
989
    def reload_napp(self, username, napp_name):
990
        """Reload a NApp."""
991
        self.unload_napp(username, napp_name)
992
        try:
993
            self.reload_napp_module(username, napp_name, 'settings')
994
            self.reload_napp_module(username, napp_name, 'main')
995
        except (ModuleNotFoundError, ImportError):
996
            return 400
997
        self.log.info("NApp '%s/%s' successfully reloaded",
998
                      username, napp_name)
999
        self.load_napp(username, napp_name)
1000
        return 200
1001
1002
    def rest_reload_napp(self, request: Request) -> JSONResponse:
1003
        """Request reload a NApp."""
1004
        username = request.path_params["username"]
1005
        napp_name = request.path_params["napp_name"]
1006
        res = self.reload_napp(username, napp_name)
1007
        if res == 200:
1008
            return JSONResponse('reloaded')
1009
        return JSONResponse('fail to reload', status_code=res)
1010
1011
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
1012
        """Request reload all NApps."""
1013
        for napp in self.napps:
1014
            self.reload_napp(*napp)
1015
        return JSONResponse('reloaded')
1016
1017
    def _full_queue_counter(self) -> Counter:
1018
        """Generate full queue stats counter."""
1019
        buffer_counter = defaultdict(Counter)
1020
        for buffer in self.buffers.get_all_buffers():
1021
            if not buffer.full():
1022
                continue
1023
            while not buffer.empty():
1024
                event = buffer.get()
1025
                buffer_counter[buffer.name][event.name] += 1
1026
        return buffer_counter
1027
1028
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
1029
        """Try to fmt traceback message."""
1030
        if counter:
1031
            counter = dict(counter)
1032
            message = (
1033
                f"{message}\nFull KytosEventBuffers counters: {counter}"
1034
            )
1035
        return message
1036
1037
    def get_link_or_create(
1038
        self,
1039
        endpoint_a: Interface,
1040
        endpoint_b: Interface,
1041
        link_dict: Optional[dict] = None,
1042
    ) -> tuple[Link, bool]:
1043
        """Get an existing link or create a new one.
1044
1045
        Returns:
1046
            Tuple(Link, bool): Link and a boolean whether it has been created.
1047
        """
1048
        with self.links_lock:
1049
            new_link = Link(endpoint_a, endpoint_b)
1050
1051
            # If new_link is an old link but mismatched,
1052
            # then treat it as a new link
1053
            if (new_link.id in self.links
1054
                    and not self.detect_mismatched_link(new_link)):
1055
                return (self.links[new_link.id], False)
1056
1057
            with new_link.link_lock:
1058
                # Check if any interface already has a link
1059
                # This old_link is a leftover link that needs to be removed
1060
                # The other endpoint of the link is the leftover interface
1061
                if endpoint_a.link and endpoint_a.link != new_link:
1062
                    old_link = endpoint_a.link
1063
                    leftover_interface = (old_link.endpoint_a
1064
                                          if old_link.endpoint_a != endpoint_a
1065
                                          else old_link.endpoint_b)
1066
                    self.log.warning(f"Leftover mismatched link"
1067
                                     f" {endpoint_a.link} in interface"
1068
                                     f" {leftover_interface}")
1069
1070
                if endpoint_b.link and endpoint_b.link != new_link:
1071
                    old_link = endpoint_b.link
1072
                    leftover_interface = (old_link.endpoint_b
1073
                                          if old_link.endpoint_b != endpoint_b
1074
                                          else old_link.endpoint_a)
1075
                    self.log.warning(f"Leftover mismatched link "
1076
                                     f" {endpoint_b.link} in interface"
1077
                                     f" {leftover_interface}")
1078
1079
                if new_link.id not in self.links:
1080
                    self.links[new_link.id] = new_link
1081
1082
                endpoint_a.update_link(new_link)
1083
                endpoint_b.update_link(new_link)
1084
                new_link.endpoint_a = endpoint_a
1085
                new_link.endpoint_b = endpoint_b
1086
                endpoint_a.nni = True
1087
                endpoint_b.nni = True
1088
1089
                if link_dict:
1090
                    if link_dict['enabled']:
1091
                        new_link.enable()
1092
                    else:
1093
                        new_link.disable()
1094
1095
                    if link_dict.get("metadata"):
1096
                        new_link.extend_metadata(link_dict["metadata"])
1097
1098
        return (self.links[new_link.id], True)
1099
1100
    def get_link(self, link_id: str) -> Optional[Link]:
1101
        """Return a link by its ID.
1102
1103
        Returns:
1104
            Optional[Link]: Link if found, None otherwise.
1105
        """
1106
        return self.links.get(link_id)
1107
1108
    def get_links_from_interfaces(
1109
        self,
1110
        interfaces: Iterable[Interface]
1111
    ) -> dict[str, Link]:
1112
        """Get a list of links that matched to all/any given interfaces."""
1113
        links_found = {}
1114
        with self.links_lock:
1115
            for link in self.links.copy().values():
1116
                for interface in interfaces:
1117
                    if any((
1118
                        interface.id == link.endpoint_a.id,
1119
                        interface.id == link.endpoint_b.id,
1120
                    )):
1121
                        links_found[link.id] = link
1122
            return links_found
1123
1124
    @staticmethod
1125
    def detect_mismatched_link(link: Link) -> frozenset[str]:
1126
        """Check if a link is mismatched."""
1127
        if (link.endpoint_a.link and link.endpoint_b
1128
                and link.endpoint_a.link == link.endpoint_b.link):
1129
            return frozenset()
1130
        return frozenset(["mismatched_link"])
1131
1132
    def link_status_mismatched(self, link: Link) -> Optional[EntityStatus]:
1133
        """Check if a link is mismatched and return a status."""
1134
        if self.detect_mismatched_link(link):
1135
            return EntityStatus.DOWN
1136
        return None
1137