Passed
Pull Request — master (#600)
by Aldo
04:48
created

kytos.core.controller.Controller.create_pidfile()   B

Complexity

Conditions 5

Size

Total Lines 49
Code Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 27
nop 1
dl 0
loc 49
rs 8.7653
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from typing import Iterable, Optional
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.common import EntityStatus
41
from kytos.core.config import KytosConfig
42
from kytos.core.connection import Connection, ConnectionState
43
from kytos.core.db import db_conn_wait
44
from kytos.core.dead_letter import DeadLetter
45
from kytos.core.events import KytosEvent
46
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
47
                                   KytosNAppSetupException)
48
from kytos.core.helpers import executors, now
49
from kytos.core.interface import Interface
50
from kytos.core.link import Link
51
from kytos.core.logs import LogManager
52
from kytos.core.napps.base import NApp
53
from kytos.core.napps.manager import NAppsManager
54
from kytos.core.napps.napp_dir_listener import NAppDirListener
55
from kytos.core.pacing import Pacer
56
from kytos.core.queue_monitor import QueueMonitorWindow
57
from kytos.core.switch import Switch
58
59
__all__ = ('Controller',)
60
61
62
def exc_handler(_, exc, __):
63
    """Log uncaught exceptions.
64
65
    Args:
66
        exc_type (ignored): exception type
67
        exc: exception instance
68
        tb (ignored): traceback
69
    """
70
    logging.basicConfig(filename='errlog.log',
71
                        format='%(asctime)s:%(pathname)'
72
                        's:%(levelname)s:%(message)s')
73
    logging.exception('Uncaught Exception: %s', exc)
74
75
76
class Controller:
77
    """Main class of Kytos.
78
79
    The main responsibilities of this class are:
80
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
81
        - manage KytosNApps (install, load and unload);
82
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
83
        - manage which event should be sent to NApps methods;
84
        - manage the buffers handlers, considering one thread per handler.
85
    """
86
87
    # Created issue #568 for the disabled checks.
88
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
89
    # pylint: disable=consider-using-with,unnecessary-dunder-call
90
    # pylint: disable=too-many-lines
91
    def __init__(self, options=None, loop: AbstractEventLoop = None):
92
        """Init method of Controller class takes the parameters below.
93
94
        Args:
95
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
96
                instance of :class:`~kytos.core.config.KytosConfig` class.
97
            loop asyncio.AbstractEventLoop
98
        """
99
        if options is None:
100
            options = KytosConfig().options['daemon']
101
102
        self.loop = loop
103
104
        # asyncio tasks
105
        self._tasks: list[asyncio.Task] = []
106
107
        #: KytosBuffers: KytosBuffer object with Controller buffers
108
        self._buffers: KytosBuffers = None
109
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
110
        #:
111
        #: This dict stores all connections between the controller and the
112
        #: switches. The key for this dict is a tuple (ip, port). The content
113
        #: is a Connection
114
        self.connections: dict[tuple, Connection] = {}
115
        #: dict: mapping of events and event listeners.
116
        #:
117
        #: The key of the dict is a KytosEvent (or a string that represent a
118
        #: regex to match against KytosEvents) and the value is a list of
119
        #: methods that will receive the referenced event
120
        self.events_listeners = {'kytos/core.connection.new':
121
                                 [self.new_connection]}
122
123
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
124
        #:
125
        #: The key is the napp name (string), while the value is the napp
126
        #: instance itself.
127
        self.napps = {}
128
        #: Object generated by ParseArgs on config.py file
129
        self.options = options
130
        #: KytosServer: Instance of KytosServer that will be listening to TCP
131
        #: connections.
132
        self.server = None
133
        #: dict: Current existing switches.
134
        #:
135
        #: The key is the switch dpid, while the value is a Switch object.
136
        self.switches: dict[str, Switch] = {}
137
        self._switches_lock = threading.Lock()
138
139
        #: datetime.datetime: Time when the controller finished starting.
140
        self.started_at = None
141
142
        #: logging.Logger: Logger instance used by Kytos.
143
        self.log = None
144
145
        #: Observer that handle NApps when they are enabled or disabled.
146
        self.napp_dir_listener = NAppDirListener(self)
147
148
        self.napps_manager = NAppsManager(self)
149
150
        #: API Server used to expose rest endpoints.
151
        self.api_server = APIServer(self.options.listen,
152
                                    self.options.api_port,
153
                                    self.napps_manager,
154
                                    self.options.napps,
155
                                    self.options.api_traceback_on_500)
156
157
        self.auth = None
158
        self.dead_letter = DeadLetter(self)
159
        self._alisten_tasks = set()
160
        self.qmonitors: list[QueueMonitorWindow] = []
161
162
        #: APM client in memory to be closed when necessary
163
        self.apm = None
164
165
        self._register_endpoints()
166
        #: Adding the napps 'enabled' directory into the PATH
167
        #: Now you can access the enabled napps with:
168
        #: from napps.<username>.<napp_name> import ?....
169
        sys.path.append(os.path.join(self.options.napps, os.pardir))
170
        sys.excepthook = exc_handler
171
172
        #: Pacer for controlling the rate which actions can be executed
173
        self.pacer = Pacer("memory://")
174
175
        self.links: dict[str, Link] = {}
176
        self.links_lock = threading.Lock()
177
        Link.register_status_reason_func("controller_mismatched_reason",
178
                                         self.detect_mismatched_link)
179
        Link.register_status_func("controller_mismatched_status",
180
                                  self.link_status_mismatched)
181
182
    def start_auth(self):
183
        """Initialize Auth() and its services"""
184
        self.auth = Auth(self)
185
        self.auth.register_core_auth_services()
186
187
    def enable_logs(self):
188
        """Register kytos log and enable the logs."""
189
        decorators = self.options.logger_decorators
190
        try:
191
            decorators = [self._resolve(deco) for deco in decorators]
192
        except ModuleNotFoundError as err:
193
            sys.exit(f'Failed to resolve decorator module: {err.name}')
194
        except AttributeError:
195
            sys.exit(f'Failed to resolve decorator name: {decorators}')
196
        LogManager.decorate_logger_class(*decorators)
197
        LogManager.load_config_file(self.options.logging, self.options.debug)
198
        # pylint: disable=fixme
199
        # TODO issue 371
200
        # LogManager.enable_websocket(self.api_server.server)
201
        self.log = logging.getLogger(__name__)
202
        self._patch_core_loggers()
203
204
    @staticmethod
205
    def _resolve(name):
206
        """Resolve a dotted name to a global object."""
207
        mod, _, attr = name.rpartition('.')
208
        mod = importlib.import_module(mod)
209
        return getattr(mod, attr)
210
211
    @staticmethod
212
    def _patch_core_loggers():
213
        """Patch in updated loggers to 'kytos.core.*' modules"""
214
        match_str = 'kytos.core.'
215
        str_len = len(match_str)
216
        reloadable_mods = [module for mod_name, module in sys.modules.items()
217
                           if mod_name[:str_len] == match_str]
218
        for module in reloadable_mods:
219
            new_logger = logging.getLogger(module.__name__)
220
            if hasattr(module, "LOG"):
221
                old_logger = module.LOG
222
                for handler in old_logger.handlers:
223
                    new_logger.addHandler(handler)
224
                for log_filter in old_logger.filters:
225
                    new_logger.addFilter(log_filter)
226
            module.LOG = new_logger
227
228
    @staticmethod
229
    def loggers():
230
        """List all logging Loggers.
231
232
        Return a list of Logger objects, with name and logging level.
233
        """
234
        # pylint: disable=no-member
235
        return [logging.getLogger(name)
236
                for name in logging.root.manager.loggerDict
237
                if "kytos" in name]
238
239
    def toggle_debug(self, name=None):
240
        """Enable/disable logging debug messages to a given logger name.
241
242
        If the name parameter is not specified the debug will be
243
        enabled/disabled following the initial config file. It will decide
244
        to enable/disable using the 'kytos' name to find the current
245
        logger level.
246
        Obs: To disable the debug the logging will be set to NOTSET
247
248
        Args:
249
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
250
        """
251
        # pylint: disable=no-member
252
        if name and name not in logging.root.manager.loggerDict:
253
            # A Logger name that is not declared in logging will raise an error
254
            # otherwise logging would create a new Logger.
255
            raise ValueError(f"Invalid logger name: {name}")
256
257
        if not name:
258
            # Logger name not specified.
259
            level = logging.getLogger('kytos').getEffectiveLevel()
260
            enable_debug = level != logging.DEBUG
261
262
            # Enable/disable default Loggers
263
            LogManager.load_config_file(self.options.logging, enable_debug)
264
            return
265
266
        # Get effective logger level for the name
267
        level = logging.getLogger(name).getEffectiveLevel()
268
        logger = logging.getLogger(name)
269
270
        if level == logging.DEBUG:
271
            # disable debug
272
            logger.setLevel(logging.NOTSET)
273
        else:
274
            # enable debug
275
            logger.setLevel(logging.DEBUG)
276
277
    async def start(self, restart=False):
278
        """Create pidfile and call start_controller method."""
279
        self.enable_logs()
280
        # pylint: disable=broad-except
281
        try:
282
            if self.options.database:
283
                db_conn_wait(db_backend=self.options.database)
284
            if self.options.apm:
285
                self.apm = init_apm(self.options.apm, app=self.api_server.app)
286
            if not restart:
287
                self.create_pidfile()
288
            await self.start_controller()
289
        except (KytosDBInitException, KytosAPMInitException) as exc:
290
            message = f"Kytos couldn't start because of {str(exc)}"
291
            sys.exit(message)
292
        except Exception as exc:
293
            exc_fmt = traceback.format_exc(chain=True)
294
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
295
            counter = self._full_queue_counter()
296
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
297
298
    def start_queue_monitors(self) -> None:
299
        """Start QueueMonitorWindows."""
300
        for data in self.options.event_buffer_monitors:
301
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
302
                self.qmonitors.append(qmon)
303
        for data in self.options.thread_pool_queue_monitors:
304
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
305
                self.qmonitors.append(qmon)
306
        for qmonitor in self.qmonitors:
307
            self.log.info(f"Starting {qmonitor}...")
308
            qmonitor.start()
309
310
    def stop_queue_monitors(self) -> None:
311
        """Stop QueueMonitorWindows."""
312
        for qmonitor in self.qmonitors:
313
            self.log.info(f"Stopping {qmonitor}...")
314
            qmonitor.stop()
315
316
    def create_pidfile(self):
317
        """Create a pidfile."""
318
        pid = os.getpid()
319
320
        # Creates directory if it doesn't exist
321
        # System can erase /var/run's content
322
        pid_folder = Path(self.options.pidfile).parent
323
        self.log.info(pid_folder)
324
325
        # Pylint incorrectly infers Path objects
326
        # https://github.com/PyCQA/pylint/issues/224
327
        # pylint: disable=no-member
328
        if not pid_folder.exists():
329
            pid_folder.mkdir()
330
            pid_folder.chmod(0o1777)
331
        # pylint: enable=no-member
332
333
        # Make sure the file is deleted when controller stops
334
        atexit.register(Path(self.options.pidfile).unlink)
335
336
        # Checks if a pidfile exists. Creates a new file.
337
        try:
338
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
339
        except OSError:
340
            # This happens if there is a pidfile already.
341
            # We shall check if the process that created the pidfile is still
342
            # running.
343
            try:
344
                existing_file = open(self.options.pidfile, mode='r',
345
                                     encoding="utf8")
346
                old_pid = int(existing_file.read())
347
                os.kill(old_pid, 0)
348
                # If kill() doesn't return an error, there's a process running
349
                # with the same PID. We assume it is Kytos and quit.
350
                # Otherwise, overwrite the file and proceed.
351
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
352
                             "running. Aborting.")
353
                sys.exit(error_msg.format(self.options.pidfile))
354
            except OSError:
355
                try:
356
                    pidfile = open(self.options.pidfile, mode='w',
357
                                   encoding="utf8")
358
                except OSError as exception:
359
                    error_msg = "Failed to create pidfile {}: {}."
360
                    sys.exit(error_msg.format(self.options.pidfile, exception))
361
362
        # Identifies the process that created the pidfile.
363
        pidfile.write(str(pid))
364
        pidfile.close()
365
366
    @property
367
    def buffers(self) -> KytosBuffers:
368
        """KytosBuffers exposed through this property to allow lazy
369
        async initiliation with an event loop running."""
370
        if not self._buffers:
371
            self._buffers = KytosBuffers()
372
        return self._buffers
373
374
    async def _wait_api_server_started(self):
375
        """Use this method to wait for Uvicorn server to start."""
376
        while not self.api_server.server.started:
377
            await asyncio.sleep(0.1)
378
379
    async def start_controller(self):
380
        """Start the controller.
381
382
        Starts the KytosServer (TCP Server) coroutine.
383
        Starts a thread for each buffer handler.
384
        Load the installed apps.
385
        """
386
        self.log.info("Starting Kytos - Kytos Controller")
387
        if not self._buffers:
388
            self._buffers = KytosBuffers()
389
        self.server = KytosServer((self.options.listen,
390
                                   int(self.options.port)),
391
                                  KytosServerProtocol,
392
                                  self,
393
                                  self.options.protocol_name)
394
395
        self.log.info("Starting API server")
396
        task = self.loop.create_task(self.api_server.serve())
397
        self._tasks.append(task)
398
        await self._wait_api_server_started()
399
400
        self.log.info(f"Starting TCP server: {self.server}")
401
        self.server.serve_forever()
402
403
        # ASYNC TODO: ensure all threads started correctly
404
        # This is critical, if any of them failed starting we should exit.
405
        # sys.exit(error_msg.format(thread, exception))
406
407
        self.log.info("Starting authorization.")
408
        self.start_auth()
409
        self.log.info("Loading Kytos NApps...")
410
        self.napp_dir_listener.start()
411
        self.pre_install_napps(self.options.napps_pre_installed)
412
        self.load_napps()
413
        self.api_server.start_web_ui()
414
415
        # Start task handlers consumers after NApps to potentially
416
        # avoid discarding an event if a consumer isn't ready yet
417
        task = self.loop.create_task(self.event_handler("conn"))
418
        self._tasks.append(task)
419
        task = self.loop.create_task(self.event_handler("raw"))
420
        self._tasks.append(task)
421
        task = self.loop.create_task(self.event_handler("msg_in"))
422
        self._tasks.append(task)
423
        task = self.loop.create_task(self.msg_out_event_handler())
424
        self._tasks.append(task)
425
        task = self.loop.create_task(self.event_handler("app"))
426
        self._tasks.append(task)
427
        task = self.loop.create_task(self.event_handler("meta"))
428
        self._tasks.append(task)
429
430
        self.start_queue_monitors()
431
        self.started_at = now()
432
433
    def _register_endpoints(self):
434
        """Register all rest endpoint served by kytos.
435
436
        -   Register APIServer endpoints
437
        -   Register WebUI endpoints
438
        -   Register ``/api/kytos/core/config`` endpoint
439
        """
440
        self.api_server.start_api()
441
        # Register controller endpoints as /api/kytos/core/...
442
        self.api_server.register_core_endpoint('config/',
443
                                               self.configuration_endpoint)
444
        self.api_server.register_core_endpoint('metadata/',
445
                                               Controller.metadata_endpoint)
446
        self.api_server.register_core_endpoint(
447
            'reload/{username}/{napp_name}/',
448
            self.rest_reload_napp)
449
        self.api_server.register_core_endpoint('reload/all',
450
                                               self.rest_reload_all_napps)
451
        self.dead_letter.register_endpoints()
452
453
    def register_rest_endpoint(self, url, function, methods):
454
        """Deprecate in favor of @rest decorator."""
455
        self.api_server.register_rest_endpoint(url, function, methods)
456
457
    @classmethod
458
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
459
        """Return the Kytos metadata.
460
461
        Returns:
462
            string: Json with current kytos metadata.
463
464
        """
465
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
466
        with open(meta_path, encoding="utf8") as file:
467
            meta_file = file.read()
468
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
469
        return JSONResponse(metadata)
470
471
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
472
        """Return the configuration options used by Kytos.
473
474
        Returns:
475
            string: Json with current configurations used by kytos.
476
477
        """
478
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
479
480
    def restart(self, graceful=True):
481
        """Restart Kytos SDN Controller.
482
483
        Args:
484
            graceful(bool): Represents the way that Kytos will restart.
485
        """
486
        if self.started_at is not None:
487
            self.stop(graceful)
488
            self.__init__(self.options)
489
490
        self.start(restart=True)
491
492
    def stop(self, graceful=True):
493
        """Shutdown all services used by kytos.
494
495
        This method should:
496
            - stop all Websockets
497
            - stop the API Server
498
            - stop the Controller
499
        """
500
        if self.started_at:
501
            self.stop_controller(graceful)
502
503
    def stop_controller(self, graceful=True):
504
        """Stop the controller.
505
506
        This method should:
507
            - announce on the network that the controller will shutdown;
508
            - stop receiving incoming packages;
509
            - call the 'shutdown' method of each KytosNApp that is running;
510
            - finish reading the events on all buffers;
511
            - stop each running handler;
512
            - stop all running threads;
513
            - stop the KytosServer;
514
        """
515
        self.log.info("Stopping Kytos")
516
517
        self.buffers.send_stop_signal()
518
        self.napp_dir_listener.stop()
519
520
        for pool_name in executors:
521
            self.log.info("Stopping threadpool: %s", pool_name)
522
523
        threads = threading.enumerate()
524
        self.log.debug("%s threads before threadpool shutdown: %s",
525
                       len(threads), threads)
526
527
        for executor_pool in executors.values():
528
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
529
530
        # self.server.socket.shutdown()
531
        # self.server.socket.close()
532
533
        self.started_at = None
534
        napps = self.unload_napps()
535
        self.log.info(
536
            f"Waiting for {len(napps)} NApps shutdown confirmation..."
537
        )
538
        for napp in napps:
539
            self.log.info(f"Waiting {napp} shutdown confirmation...")
540
            napp.__dict__['_KytosNApp__event'].wait()
541
            self.log.info(f"{napp} was shutdown")
542
543
        # Cancel all async tasks (event handlers and servers)
544
        for task in self._tasks:
545
            task.cancel()
546
547
        # ASYNC TODO: close connections
548
        # self.server.server_close()
549
550
        self.stop_queue_monitors()
551
        if self.apm:
552
            self.log.info("Stopping APM server...")
553
            self.apm.close()
554
            self.log.info("Stopped APM Server")
555
        self.log.info("Stopping API Server...")
556
        self.api_server.stop()
557
        self.log.info("Stopped API Server")
558
        self.log.info("Stopping TCP Server...")
559
        self.server.shutdown()
560
        self.log.info("Stopped TCP Server")
561
        self.loop.stop()
562
563
    def status(self):
564
        """Return status of Kytos Server.
565
566
        If the controller kytos is running this method will be returned
567
        "Running since 'Started_At'", otherwise "Stopped".
568
569
        Returns:
570
            string: String with kytos status.
571
572
        """
573
        if self.started_at:
574
            return f"Running since {self.started_at}"
575
        return "Stopped"
576
577
    def uptime(self):
578
        """Return the uptime of kytos server.
579
580
        This method should return:
581
            - 0 if Kytos Server is stopped.
582
            - (kytos.start_at - datetime.now) if Kytos Server is running.
583
584
        Returns:
585
           datetime.timedelta: The uptime interval.
586
587
        """
588
        return now() - self.started_at if self.started_at else 0
589
590
    def notify_listeners(self, event):
591
        """Send the event to the specified listeners.
592
593
        Loops over self.events_listeners matching (by regexp) the attribute
594
        name of the event with the keys of events_listeners. If a match occurs,
595
        then send the event to each registered listener.
596
597
        Args:
598
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
599
        """
600
601
        self.log.debug("looking for listeners for %s", event)
602
        for event_regex, listeners in dict(self.events_listeners).items():
603
            # self.log.debug("listeners found for %s: %r => %s", event,
604
            #                event_regex, [l.__qualname__ for l in listeners])
605
            # Do not match if the event has more characters
606
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
607
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
608
                event_regex += '$'
609
            if re.match(event_regex, event.name):
610
                self.log.debug('Calling listeners for %s', event)
611
                for listener in listeners:
612
                    if asyncio.iscoroutinefunction(listener):
613
                        task = asyncio.create_task(listener(event))
614
                        self._alisten_tasks.add(task)
615
                        task.add_done_callback(self._alisten_tasks.discard)
616
                    else:
617
                        listener(event)
618
619
    # pylint: disable=broad-exception-caught
620
    async def event_handler(self, buffer_name: str):
621
        """Default event handler that gets from an event buffer."""
622
        event_buffer = getattr(self.buffers, buffer_name)
623
        self.log.info(f"Event handler {buffer_name} started")
624
        while True:
625
            try:
626
                # After hitting issues with a large amount of flows being
627
                # installed (16k which sends 32k events), this task was
628
                # hogging resources in the MainThread causing disconnections
629
                # and socket exceptions. With the following sleep, this task
630
                # yields to other loops mitigating the disconnection issues.
631
                # Now the cap for flow installation at the same time is 50k.
632
                await asyncio.sleep(0)
633
                event = await event_buffer.aget()
634
                self.notify_listeners(event)
635
636
                if event.name == "kytos/core.shutdown":
637
                    self.log.debug(f"Event handler {buffer_name} stopped")
638
                    break
639
            except Exception as exc:
640
                self.log.exception(f"Unhandled exception on {buffer_name}",
641
                                   exc_info=exc)
642
643
    async def publish_connection_error(self, event):
644
        """Publish connection error event.
645
646
        Args:
647
            event (KytosEvent): Event that triggered for error propagation.
648
        """
649
        event.name = \
650
            f"kytos/core.{event.destination.protocol.name}.connection.error"
651
        error_msg = f"Connection state: {event.destination.state}"
652
        event.content["exception"] = error_msg
653
        await self.buffers.conn.aput(event)
654
655
    # pylint: disable=broad-exception-caught
656
    async def msg_out_event_handler(self):
657
        """Handle msg_out events.
658
659
        Listen to the msg_out buffer and send all its events to the
660
        corresponding listeners.
661
        """
662
        self.log.info("Event handler msg_out started")
663
        while True:
664
            try:
665
                triggered_event = await self.buffers.msg_out.aget()
666
667
                if triggered_event.name == "kytos/core.shutdown":
668
                    self.log.debug("Message Out Event handler stopped")
669
                    break
670
671
                message = triggered_event.content['message']
672
                destination = triggered_event.destination
673
                if (destination and
674
                        not destination.state == ConnectionState.FINISHED):
675
                    packet = message.pack()
676
                    destination.send(packet)
677
                    self.log.debug('Connection %s: OUT OFP, '
678
                                   'version: %s, type: %s, xid: %s - %s',
679
                                   destination.id,
680
                                   message.header.version,
681
                                   message.header.message_type,
682
                                   message.header.xid,
683
                                   packet.hex())
684
                    self.notify_listeners(triggered_event)
685
            except OSError:
686
                await self.publish_connection_error(triggered_event)
687
                self.log.info("connection closed. Cannot send message")
688
            except PackException as err:
689
                self.log.error(
690
                    f"Discarding message: {message}, event: {triggered_event} "
691
                    f"because of PackException {err}"
692
                )
693
            except Exception as exc:
694
                self.log.exception("Unhandled exception on msg_out",
695
                                   exc_info=exc)
696
697
    def get_interface_by_id(self, interface_id):
698
        """Find a Interface  with interface_id.
699
700
        Args:
701
            interface_id(str): Interface Identifier.
702
703
        Returns:
704
            Interface: Instance of Interface with the id given.
705
706
        """
707
        if interface_id is None:
708
            return None
709
710
        switch_id = ":".join(interface_id.split(":")[:-1])
711
        interface_number = int(interface_id.split(":")[-1])
712
713
        switch = self.switches.get(switch_id)
714
715
        if not switch:
716
            return None
717
718
        return switch.interfaces.get(interface_number, None)
719
720
    def get_switch_by_dpid(self, dpid):
721
        """Return a specific switch by dpid.
722
723
        Args:
724
            dpid (|DPID|): dpid object used to identify a switch.
725
726
        Returns:
727
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
728
729
        """
730
        return self.switches.get(dpid)
731
732
    def get_switch_or_create(self, dpid, connection=None):
733
        """Return switch or create it if necessary.
734
735
        Args:
736
            dpid (|DPID|): dpid object used to identify a switch.
737
            connection (:class:`~kytos.core.connection.Connection`):
738
                connection used by switch. If a switch has a connection that
739
                will be updated.
740
741
        Returns:
742
            :class:`~kytos.core.switch.Switch`: new or existent switch.
743
744
        """
745
        with self._switches_lock:
746
            if connection:
747
                self.create_or_update_connection(connection)
748
749
            switch = self.get_switch_by_dpid(dpid)
750
            event_name = 'kytos/core.switch.'
751
752
            if switch is None:
753
                switch = Switch(dpid=dpid)
754
                self.add_new_switch(switch)
755
                event_name += 'new'
756
            else:
757
                event_name += 'reconnected'
758
            event = KytosEvent(name=event_name, content={'switch': switch})
759
760
            if connection:
761
                old_connection = switch.connection
762
                switch.update_connection(connection)
763
764
                if old_connection is not connection:
765
                    self.remove_connection(old_connection)
766
767
            self.buffers.conn.put(event)
768
769
            return switch
770
771
    def create_or_update_connection(self, connection):
772
        """Update a connection.
773
774
        Args:
775
            connection (:class:`~kytos.core.connection.Connection`):
776
                Instance of connection that will be updated.
777
        """
778
        self.connections[connection.id] = connection
779
780
    def get_connection_by_id(self, conn_id):
781
        """Return a existent connection by id.
782
783
        Args:
784
            id (int): id from a connection.
785
786
        Returns:
787
            :class:`~kytos.core.connection.Connection`: Instance of connection
788
                or None Type.
789
790
        """
791
        return self.connections.get(conn_id)
792
793
    def remove_connection(self, connection):
794
        """Close a existent connection and remove it.
795
796
        Args:
797
            connection (:class:`~kytos.core.connection.Connection`):
798
                Instance of connection that will be removed.
799
        """
800
        if connection is None:
801
            return False
802
803
        try:
804
            connection.close()
805
            del self.connections[connection.id]
806
        except KeyError:
807
            return False
808
        return True
809
810
    def remove_switch(self, switch):
811
        """Remove an existent switch.
812
813
        Args:
814
            switch (:class:`~kytos.core.switch.Switch`):
815
                Instance of switch that will be removed.
816
        """
817
        try:
818
            del self.switches[switch.dpid]
819
        except KeyError:
820
            return False
821
        return True
822
823
    def new_connection(self, event):
824
        """Handle a kytos/core.connection.new event.
825
826
        This method will read new connection event and store the connection
827
        (socket) into the connections attribute on the controller.
828
829
        It also clear all references to the connection since it is a new
830
        connection on the same ip:port.
831
832
        Args:
833
            event (~kytos.core.KytosEvent):
834
                The received event (``kytos/core.connection.new``) with the
835
                needed info.
836
        """
837
        self.log.info("Handling %s...", event)
838
839
        connection = event.source
840
        self.log.debug("Event source: %s", event.source)
841
842
        # Remove old connection (aka cleanup) if it exists
843
        if self.get_connection_by_id(connection.id):
844
            self.remove_connection(connection.id)
845
846
        # Update connections with the new connection
847
        self.create_or_update_connection(connection)
848
849
    def add_new_switch(self, switch):
850
        """Add a new switch on the controller.
851
852
        Args:
853
            switch (Switch): A Switch object
854
        """
855
        self.switches[switch.dpid] = switch
856
857
    def _import_napp(self, username, napp_name):
858
        """Import a NApp module.
859
860
        Raises:
861
            FileNotFoundError: if NApp's main.py is not found.
862
            ModuleNotFoundError: if any NApp requirement is not installed.
863
864
        """
865
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
866
        path = os.path.join(self.options.napps, username, napp_name,
867
                            'main.py')
868
        napp_spec = spec_from_file_location(mod_name, path)
869
        napp_module = module_from_spec(napp_spec)
870
        sys.modules[napp_spec.name] = napp_module
871
        napp_spec.loader.exec_module(napp_module)
872
        return napp_module
873
874
    def load_napp(self, username, napp_name):
875
        """Load a single NApp.
876
877
        Args:
878
            username (str): NApp username (makes up NApp's path).
879
            napp_name (str): Name of the NApp to be loaded.
880
        """
881
        if (username, napp_name) in self.napps:
882
            message = 'NApp %s/%s was already loaded'
883
            self.log.warning(message, username, napp_name)
884
            return
885
886
        try:
887
            napp_module = self._import_napp(username, napp_name)
888
        except ModuleNotFoundError as err:
889
            self.log.error("Error loading NApp '%s/%s': %s",
890
                           username, napp_name, err)
891
            return
892
        except FileNotFoundError as err:
893
            msg = "NApp module not found, assuming it's a meta-napp: %s"
894
            self.log.warning(msg, err.filename)
895
            return
896
897
        try:
898
            napp = napp_module.Main(controller=self)
899
        except Exception as exc:  # noqa pylint: disable=bare-except
900
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
901
            raise KytosNAppSetupException(msg) from exc
902
903
        self.napps[(username, napp_name)] = napp
904
905
        napp.start()
906
        self.api_server.authenticate_endpoints(napp)
907
        self.api_server.register_napp_endpoints(napp)
908
909
        # pylint: disable=protected-access
910
        for event, listeners in napp._listeners.items():
911
            self.events_listeners.setdefault(event, []).extend(listeners)
912
        # pylint: enable=protected-access
913
914
    def pre_install_napps(self, napps, enable=True):
915
        """Pre install and enable NApps.
916
917
        Before installing, it'll check if it's installed yet.
918
919
        Args:
920
            napps ([str]): List of NApps to be pre-installed and enabled.
921
        """
922
        all_napps = self.napps_manager.get_installed_napps()
923
        installed = [str(napp) for napp in all_napps]
924
        napps_diff = [napp for napp in napps if napp not in installed]
925
        for napp in napps_diff:
926
            self.napps_manager.install(napp, enable=enable)
927
928
    def load_napps(self):
929
        """Load all NApps enabled on the NApps dir."""
930
        for napp in self.napps_manager.get_enabled_napps():
931
            try:
932
                self.log.info("Loading NApp %s", napp.id)
933
                self.load_napp(napp.username, napp.name)
934
            except FileNotFoundError as exception:
935
                self.log.error("Could not load NApp %s: %s",
936
                               napp.id, exception)
937
                msg = f"NApp {napp.id} exception {str(exception)}"
938
                raise KytosNAppSetupException(msg) from exception
939
940
    def unload_napp(self, username, napp_name):
941
        """Unload a specific NApp.
942
943
        Args:
944
            username (str): NApp username.
945
            napp_name (str): Name of the NApp to be unloaded.
946
        """
947
        napp = self.napps.pop((username, napp_name), None)
948
949
        if napp is None:
950
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
951
        else:
952
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
953
            napp_id = NApp(username, napp_name).id
954
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
955
            napp_shutdown_fn = self.events_listeners[event.name][0]
956
            # Call listener before removing it from events_listeners
957
            napp_shutdown_fn(event)
958
959
            # Remove rest endpoints from that napp
960
            self.api_server.remove_napp_endpoints(napp)
961
962
            # Removing listeners from that napp
963
            # pylint: disable=protected-access
964
            for event_type, napp_listeners in napp._listeners.items():
965
                event_listeners = self.events_listeners[event_type]
966
                for listener in napp_listeners:
967
                    event_listeners.remove(listener)
968
                if not event_listeners:
969
                    del self.events_listeners[event_type]
970
            # pylint: enable=protected-access
971
972
        return napp
973
974
    def unload_napps(self):
975
        """Unload all loaded NApps that are not core NApps
976
977
        NApps are unloaded in the reverse order that they are enabled to
978
        facilitate to shutdown gracefully.
979
        """
980
        napps = []
981
        for napp in reversed(self.napps_manager.get_enabled_napps()):
982
            if napp := self.unload_napp(napp.username, napp.name):
983
                napps.append(napp)
984
        return napps
985
986
    def reload_napp_module(self, username, napp_name, napp_file):
987
        """Reload a NApp Module."""
988
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
989
        try:
990
            napp_module = import_module(mod_name)
991
        except ModuleNotFoundError:
992
            self.log.error("Module '%s' not found", mod_name)
993
            raise
994
        try:
995
            napp_module = reload_module(napp_module)
996
        except ImportError as err:
997
            self.log.error("Error reloading NApp '%s/%s': %s",
998
                           username, napp_name, err)
999
            raise
1000
1001
    def reload_napp(self, username, napp_name):
1002
        """Reload a NApp."""
1003
        self.unload_napp(username, napp_name)
1004
        try:
1005
            self.reload_napp_module(username, napp_name, 'settings')
1006
            self.reload_napp_module(username, napp_name, 'main')
1007
        except (ModuleNotFoundError, ImportError):
1008
            return 400
1009
        self.log.info("NApp '%s/%s' successfully reloaded",
1010
                      username, napp_name)
1011
        self.load_napp(username, napp_name)
1012
        return 200
1013
1014
    def rest_reload_napp(self, request: Request) -> JSONResponse:
1015
        """Request reload a NApp."""
1016
        username = request.path_params["username"]
1017
        napp_name = request.path_params["napp_name"]
1018
        res = self.reload_napp(username, napp_name)
1019
        if res == 200:
1020
            return JSONResponse('reloaded')
1021
        return JSONResponse('fail to reload', status_code=res)
1022
1023
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
1024
        """Request reload all NApps."""
1025
        for napp in self.napps:
1026
            self.reload_napp(*napp)
1027
        return JSONResponse('reloaded')
1028
1029
    def _full_queue_counter(self) -> Counter:
1030
        """Generate full queue stats counter."""
1031
        buffer_counter = defaultdict(Counter)
1032
        for buffer in self.buffers.get_all_buffers():
1033
            if not buffer.full():
1034
                continue
1035
            while not buffer.empty():
1036
                event = buffer.get()
1037
                buffer_counter[buffer.name][event.name] += 1
1038
        return buffer_counter
1039
1040
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
1041
        """Try to fmt traceback message."""
1042
        if counter:
1043
            counter = dict(counter)
1044
            message = (
1045
                f"{message}\nFull KytosEventBuffers counters: {counter}"
1046
            )
1047
        return message
1048
1049
    def get_link_or_create(
1050
        self,
1051
        endpoint_a: Interface,
1052
        endpoint_b: Interface,
1053
        link_dict: Optional[dict] = None,
1054
    ) -> tuple[Link, bool]:
1055
        """Get an existing link or create a new one.
1056
1057
        Returns:
1058
            Tuple(Link, bool): Link and a boolean whether it has been created.
1059
        """
1060
        with self.links_lock:
1061
            new_link = Link(endpoint_a, endpoint_b)
1062
1063
            # If new_link is an old link but mismatched,
1064
            # then treat it as a new link
1065
            if (new_link.id in self.links
1066
                    and not self.detect_mismatched_link(new_link)):
1067
                return (self.links[new_link.id], False)
1068
1069
            with new_link.link_lock:
1070
                # Check if any interface already has a link
1071
                # This old_link is a leftover link that needs to be removed
1072
                # The other endpoint of the link is the leftover interface
1073
                if endpoint_a.link and endpoint_a.link != new_link:
1074
                    old_link = endpoint_a.link
1075
                    leftover_interface = (old_link.endpoint_a
1076
                                          if old_link.endpoint_a != endpoint_a
1077
                                          else old_link.endpoint_b)
1078
                    self.log.warning(f"Leftover mismatched link"
1079
                                     f" {endpoint_a.link} in interface"
1080
                                     f" {leftover_interface}")
1081
1082
                if endpoint_b.link and endpoint_b.link != new_link:
1083
                    old_link = endpoint_b.link
1084
                    leftover_interface = (old_link.endpoint_b
1085
                                          if old_link.endpoint_b != endpoint_b
1086
                                          else old_link.endpoint_a)
1087
                    self.log.warning(f"Leftover mismatched link "
1088
                                     f" {endpoint_b.link} in interface"
1089
                                     f" {leftover_interface}")
1090
1091
                if new_link.id not in self.links:
1092
                    self.links[new_link.id] = new_link
1093
1094
                endpoint_a.update_link(new_link)
1095
                endpoint_b.update_link(new_link)
1096
                new_link.endpoint_a = endpoint_a
1097
                new_link.endpoint_b = endpoint_b
1098
                endpoint_a.nni = True
1099
                endpoint_b.nni = True
1100
1101
                if link_dict:
1102
                    if link_dict['enabled']:
1103
                        new_link.enable()
1104
                    else:
1105
                        new_link.disable()
1106
1107
                    if link_dict.get("metadata"):
1108
                        new_link.extend_metadata(link_dict["metadata"])
1109
1110
        return (self.links[new_link.id], True)
1111
1112
    def get_link(self, link_id: str) -> Optional[Link]:
1113
        """Return a link by its ID.
1114
1115
        Returns:
1116
            Optional[Link]: Link if found, None otherwise.
1117
        """
1118
        return self.links.get(link_id)
1119
1120
    def get_links_from_interfaces(
1121
        self,
1122
        interfaces: Iterable[Interface]
1123
    ) -> dict[str, Link]:
1124
        """Get a list of links that matched to all/any given interfaces."""
1125
        links_found = {}
1126
        with self.links_lock:
1127
            for link in self.links.copy().values():
1128
                for interface in interfaces:
1129
                    if any((
1130
                        interface.id == link.endpoint_a.id,
1131
                        interface.id == link.endpoint_b.id,
1132
                    )):
1133
                        links_found[link.id] = link
1134
            return links_found
1135
1136
    @staticmethod
1137
    def detect_mismatched_link(link: Link) -> frozenset[str]:
1138
        """Check if a link is mismatched."""
1139
        if (link.endpoint_a.link and link.endpoint_b
1140
                and link.endpoint_a.link == link.endpoint_b.link):
1141
            return frozenset()
1142
        return frozenset(["mismatched_link"])
1143
1144
    def link_status_mismatched(self, link: Link) -> Optional[EntityStatus]:
1145
        """Check if a link is mismatched and return a status."""
1146
        if self.detect_mismatched_link(link):
1147
            return EntityStatus.DOWN
1148
        return None
1149