Passed
Pull Request — master (#588)
by Aldo
04:48
created

kytos.core.controller.Controller.get_link()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from typing import Iterable, Optional
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.config import KytosConfig
41
from kytos.core.connection import Connection, ConnectionState
42
from kytos.core.db import db_conn_wait
43
from kytos.core.dead_letter import DeadLetter
44
from kytos.core.events import KytosEvent
45
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
46
                                   KytosNAppSetupException)
47
from kytos.core.helpers import executors, now
48
from kytos.core.interface import Interface
49
from kytos.core.link import Link
50
from kytos.core.logs import LogManager
51
from kytos.core.napps.base import NApp
52
from kytos.core.napps.manager import NAppsManager
53
from kytos.core.napps.napp_dir_listener import NAppDirListener
54
from kytos.core.pacing import Pacer
55
from kytos.core.queue_monitor import QueueMonitorWindow
56
from kytos.core.switch import Switch
57
58
__all__ = ('Controller',)
59
60
61
def exc_handler(_, exc, __):
62
    """Log uncaught exceptions.
63
64
    Args:
65
        exc_type (ignored): exception type
66
        exc: exception instance
67
        tb (ignored): traceback
68
    """
69
    logging.basicConfig(filename='errlog.log',
70
                        format='%(asctime)s:%(pathname)'
71
                        's:%(levelname)s:%(message)s')
72
    logging.exception('Uncaught Exception: %s', exc)
73
74
75
class Controller:
76
    """Main class of Kytos.
77
78
    The main responsibilities of this class are:
79
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
80
        - manage KytosNApps (install, load and unload);
81
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
82
        - manage which event should be sent to NApps methods;
83
        - manage the buffers handlers, considering one thread per handler.
84
    """
85
86
    # Created issue #568 for the disabled checks.
87
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
88
    # pylint: disable=consider-using-with,unnecessary-dunder-call
89
    # pylint: disable=too-many-lines
90
    def __init__(self, options=None, loop: AbstractEventLoop = None):
91
        """Init method of Controller class takes the parameters below.
92
93
        Args:
94
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
95
                instance of :class:`~kytos.core.config.KytosConfig` class.
96
            loop asyncio.AbstractEventLoop
97
        """
98
        if options is None:
99
            options = KytosConfig().options['daemon']
100
101
        self.loop = loop
102
103
        # asyncio tasks
104
        self._tasks: list[asyncio.Task] = []
105
106
        #: KytosBuffers: KytosBuffer object with Controller buffers
107
        self._buffers: KytosBuffers = None
108
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
109
        #:
110
        #: This dict stores all connections between the controller and the
111
        #: switches. The key for this dict is a tuple (ip, port). The content
112
        #: is a Connection
113
        self.connections: dict[tuple, Connection] = {}
114
        #: dict: mapping of events and event listeners.
115
        #:
116
        #: The key of the dict is a KytosEvent (or a string that represent a
117
        #: regex to match against KytosEvents) and the value is a list of
118
        #: methods that will receive the referenced event
119
        self.events_listeners = {'kytos/core.connection.new':
120
                                 [self.new_connection]}
121
122
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
123
        #:
124
        #: The key is the napp name (string), while the value is the napp
125
        #: instance itself.
126
        self.napps = {}
127
        #: Object generated by ParseArgs on config.py file
128
        self.options = options
129
        #: KytosServer: Instance of KytosServer that will be listening to TCP
130
        #: connections.
131
        self.server = None
132
        #: dict: Current existing switches.
133
        #:
134
        #: The key is the switch dpid, while the value is a Switch object.
135
        self.switches: dict[str, Switch] = {}
136
        self._switches_lock = threading.Lock()
137
138
        #: datetime.datetime: Time when the controller finished starting.
139
        self.started_at = None
140
141
        #: logging.Logger: Logger instance used by Kytos.
142
        self.log = None
143
144
        #: Observer that handle NApps when they are enabled or disabled.
145
        self.napp_dir_listener = NAppDirListener(self)
146
147
        self.napps_manager = NAppsManager(self)
148
149
        #: API Server used to expose rest endpoints.
150
        self.api_server = APIServer(self.options.listen,
151
                                    self.options.api_port,
152
                                    self.napps_manager,
153
                                    self.options.napps,
154
                                    self.options.api_traceback_on_500)
155
156
        self.auth = None
157
        self.dead_letter = DeadLetter(self)
158
        self._alisten_tasks = set()
159
        self.qmonitors: list[QueueMonitorWindow] = []
160
161
        #: APM client in memory to be closed when necessary
162
        self.apm = None
163
164
        self._register_endpoints()
165
        #: Adding the napps 'enabled' directory into the PATH
166
        #: Now you can access the enabled napps with:
167
        #: from napps.<username>.<napp_name> import ?....
168
        sys.path.append(os.path.join(self.options.napps, os.pardir))
169
        sys.excepthook = exc_handler
170
171
        #: Pacer for controlling the rate which actions can be executed
172
        self.pacer = Pacer("memory://")
173
174
        self.links: dict[str, Link] = {}
175
        self.links_lock = threading.Lock()
176
177
    def start_auth(self):
178
        """Initialize Auth() and its services"""
179
        self.auth = Auth(self)
180
        self.auth.register_core_auth_services()
181
182
    def enable_logs(self):
183
        """Register kytos log and enable the logs."""
184
        decorators = self.options.logger_decorators
185
        try:
186
            decorators = [self._resolve(deco) for deco in decorators]
187
        except ModuleNotFoundError as err:
188
            sys.exit(f'Failed to resolve decorator module: {err.name}')
189
        except AttributeError:
190
            sys.exit(f'Failed to resolve decorator name: {decorators}')
191
        LogManager.decorate_logger_class(*decorators)
192
        LogManager.load_config_file(self.options.logging, self.options.debug)
193
        # pylint: disable=fixme
194
        # TODO issue 371
195
        # LogManager.enable_websocket(self.api_server.server)
196
        self.log = logging.getLogger(__name__)
197
        self._patch_core_loggers()
198
199
    @staticmethod
200
    def _resolve(name):
201
        """Resolve a dotted name to a global object."""
202
        mod, _, attr = name.rpartition('.')
203
        mod = importlib.import_module(mod)
204
        return getattr(mod, attr)
205
206
    @staticmethod
207
    def _patch_core_loggers():
208
        """Patch in updated loggers to 'kytos.core.*' modules"""
209
        match_str = 'kytos.core.'
210
        str_len = len(match_str)
211
        reloadable_mods = [module for mod_name, module in sys.modules.items()
212
                           if mod_name[:str_len] == match_str]
213
        for module in reloadable_mods:
214
            new_logger = logging.getLogger(module.__name__)
215
            if hasattr(module, "LOG"):
216
                old_logger = module.LOG
217
                for handler in old_logger.handlers:
218
                    new_logger.addHandler(handler)
219
                for log_filter in old_logger.filters:
220
                    new_logger.addFilter(log_filter)
221
            module.LOG = new_logger
222
223
    @staticmethod
224
    def loggers():
225
        """List all logging Loggers.
226
227
        Return a list of Logger objects, with name and logging level.
228
        """
229
        # pylint: disable=no-member
230
        return [logging.getLogger(name)
231
                for name in logging.root.manager.loggerDict
232
                if "kytos" in name]
233
234
    def toggle_debug(self, name=None):
235
        """Enable/disable logging debug messages to a given logger name.
236
237
        If the name parameter is not specified the debug will be
238
        enabled/disabled following the initial config file. It will decide
239
        to enable/disable using the 'kytos' name to find the current
240
        logger level.
241
        Obs: To disable the debug the logging will be set to NOTSET
242
243
        Args:
244
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
245
        """
246
        # pylint: disable=no-member
247
        if name and name not in logging.root.manager.loggerDict:
248
            # A Logger name that is not declared in logging will raise an error
249
            # otherwise logging would create a new Logger.
250
            raise ValueError(f"Invalid logger name: {name}")
251
252
        if not name:
253
            # Logger name not specified.
254
            level = logging.getLogger('kytos').getEffectiveLevel()
255
            enable_debug = level != logging.DEBUG
256
257
            # Enable/disable default Loggers
258
            LogManager.load_config_file(self.options.logging, enable_debug)
259
            return
260
261
        # Get effective logger level for the name
262
        level = logging.getLogger(name).getEffectiveLevel()
263
        logger = logging.getLogger(name)
264
265
        if level == logging.DEBUG:
266
            # disable debug
267
            logger.setLevel(logging.NOTSET)
268
        else:
269
            # enable debug
270
            logger.setLevel(logging.DEBUG)
271
272
    async def start(self, restart=False):
273
        """Create pidfile and call start_controller method."""
274
        self.enable_logs()
275
        # pylint: disable=broad-except
276
        try:
277
            if self.options.database:
278
                db_conn_wait(db_backend=self.options.database)
279
            if self.options.apm:
280
                self.apm = init_apm(self.options.apm, app=self.api_server.app)
281
            if not restart:
282
                self.create_pidfile()
283
            await self.start_controller()
284
        except (KytosDBInitException, KytosAPMInitException) as exc:
285
            message = f"Kytos couldn't start because of {str(exc)}"
286
            sys.exit(message)
287
        except Exception as exc:
288
            exc_fmt = traceback.format_exc(chain=True)
289
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
290
            counter = self._full_queue_counter()
291
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
292
293
    def start_queue_monitors(self) -> None:
294
        """Start QueueMonitorWindows."""
295
        for data in self.options.event_buffer_monitors:
296
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
297
                self.qmonitors.append(qmon)
298
        for data in self.options.thread_pool_queue_monitors:
299
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
300
                self.qmonitors.append(qmon)
301
        for qmonitor in self.qmonitors:
302
            self.log.info(f"Starting {qmonitor}...")
303
            qmonitor.start()
304
305
    def stop_queue_monitors(self) -> None:
306
        """Stop QueueMonitorWindows."""
307
        for qmonitor in self.qmonitors:
308
            self.log.info(f"Stopping {qmonitor}...")
309
            qmonitor.stop()
310
311
    def create_pidfile(self):
312
        """Create a pidfile."""
313
        pid = os.getpid()
314
315
        # Creates directory if it doesn't exist
316
        # System can erase /var/run's content
317
        pid_folder = Path(self.options.pidfile).parent
318
        self.log.info(pid_folder)
319
320
        # Pylint incorrectly infers Path objects
321
        # https://github.com/PyCQA/pylint/issues/224
322
        # pylint: disable=no-member
323
        if not pid_folder.exists():
324
            pid_folder.mkdir()
325
            pid_folder.chmod(0o1777)
326
        # pylint: enable=no-member
327
328
        # Make sure the file is deleted when controller stops
329
        atexit.register(Path(self.options.pidfile).unlink)
330
331
        # Checks if a pidfile exists. Creates a new file.
332
        try:
333
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
334
        except OSError:
335
            # This happens if there is a pidfile already.
336
            # We shall check if the process that created the pidfile is still
337
            # running.
338
            try:
339
                existing_file = open(self.options.pidfile, mode='r',
340
                                     encoding="utf8")
341
                old_pid = int(existing_file.read())
342
                os.kill(old_pid, 0)
343
                # If kill() doesn't return an error, there's a process running
344
                # with the same PID. We assume it is Kytos and quit.
345
                # Otherwise, overwrite the file and proceed.
346
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
347
                             "running. Aborting.")
348
                sys.exit(error_msg.format(self.options.pidfile))
349
            except OSError:
350
                try:
351
                    pidfile = open(self.options.pidfile, mode='w',
352
                                   encoding="utf8")
353
                except OSError as exception:
354
                    error_msg = "Failed to create pidfile {}: {}."
355
                    sys.exit(error_msg.format(self.options.pidfile, exception))
356
357
        # Identifies the process that created the pidfile.
358
        pidfile.write(str(pid))
359
        pidfile.close()
360
361
    @property
362
    def buffers(self) -> KytosBuffers:
363
        """KytosBuffers exposed through this property to allow lazy
364
        async initiliation with an event loop running."""
365
        if not self._buffers:
366
            self._buffers = KytosBuffers()
367
        return self._buffers
368
369
    async def _wait_api_server_started(self):
370
        """Use this method to wait for Uvicorn server to start."""
371
        while not self.api_server.server.started:
372
            await asyncio.sleep(0.1)
373
374
    async def start_controller(self):
375
        """Start the controller.
376
377
        Starts the KytosServer (TCP Server) coroutine.
378
        Starts a thread for each buffer handler.
379
        Load the installed apps.
380
        """
381
        self.log.info("Starting Kytos - Kytos Controller")
382
        if not self._buffers:
383
            self._buffers = KytosBuffers()
384
        self.server = KytosServer((self.options.listen,
385
                                   int(self.options.port)),
386
                                  KytosServerProtocol,
387
                                  self,
388
                                  self.options.protocol_name)
389
390
        self.log.info("Starting API server")
391
        task = self.loop.create_task(self.api_server.serve())
392
        self._tasks.append(task)
393
        await self._wait_api_server_started()
394
395
        self.log.info(f"Starting TCP server: {self.server}")
396
        self.server.serve_forever()
397
398
        # ASYNC TODO: ensure all threads started correctly
399
        # This is critical, if any of them failed starting we should exit.
400
        # sys.exit(error_msg.format(thread, exception))
401
402
        self.log.info("Starting authorization.")
403
        self.start_auth()
404
        self.log.info("Loading Kytos NApps...")
405
        self.napp_dir_listener.start()
406
        self.pre_install_napps(self.options.napps_pre_installed)
407
        self.load_napps()
408
        self.api_server.start_web_ui()
409
410
        # Start task handlers consumers after NApps to potentially
411
        # avoid discarding an event if a consumer isn't ready yet
412
        task = self.loop.create_task(self.event_handler("conn"))
413
        self._tasks.append(task)
414
        task = self.loop.create_task(self.event_handler("raw"))
415
        self._tasks.append(task)
416
        task = self.loop.create_task(self.event_handler("msg_in"))
417
        self._tasks.append(task)
418
        task = self.loop.create_task(self.msg_out_event_handler())
419
        self._tasks.append(task)
420
        task = self.loop.create_task(self.event_handler("app"))
421
        self._tasks.append(task)
422
        task = self.loop.create_task(self.event_handler("meta"))
423
        self._tasks.append(task)
424
425
        self.start_queue_monitors()
426
        self.started_at = now()
427
428
    def _register_endpoints(self):
429
        """Register all rest endpoint served by kytos.
430
431
        -   Register APIServer endpoints
432
        -   Register WebUI endpoints
433
        -   Register ``/api/kytos/core/config`` endpoint
434
        """
435
        self.api_server.start_api()
436
        # Register controller endpoints as /api/kytos/core/...
437
        self.api_server.register_core_endpoint('config/',
438
                                               self.configuration_endpoint)
439
        self.api_server.register_core_endpoint('metadata/',
440
                                               Controller.metadata_endpoint)
441
        self.api_server.register_core_endpoint(
442
            'reload/{username}/{napp_name}/',
443
            self.rest_reload_napp)
444
        self.api_server.register_core_endpoint('reload/all',
445
                                               self.rest_reload_all_napps)
446
        self.dead_letter.register_endpoints()
447
448
    def register_rest_endpoint(self, url, function, methods):
449
        """Deprecate in favor of @rest decorator."""
450
        self.api_server.register_rest_endpoint(url, function, methods)
451
452
    @classmethod
453
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
454
        """Return the Kytos metadata.
455
456
        Returns:
457
            string: Json with current kytos metadata.
458
459
        """
460
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
461
        with open(meta_path, encoding="utf8") as file:
462
            meta_file = file.read()
463
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
464
        return JSONResponse(metadata)
465
466
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
467
        """Return the configuration options used by Kytos.
468
469
        Returns:
470
            string: Json with current configurations used by kytos.
471
472
        """
473
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
474
475
    def restart(self, graceful=True):
476
        """Restart Kytos SDN Controller.
477
478
        Args:
479
            graceful(bool): Represents the way that Kytos will restart.
480
        """
481
        if self.started_at is not None:
482
            self.stop(graceful)
483
            self.__init__(self.options)
484
485
        self.start(restart=True)
486
487
    def stop(self, graceful=True):
488
        """Shutdown all services used by kytos.
489
490
        This method should:
491
            - stop all Websockets
492
            - stop the API Server
493
            - stop the Controller
494
        """
495
        if self.started_at:
496
            self.stop_controller(graceful)
497
498
    def stop_controller(self, graceful=True):
499
        """Stop the controller.
500
501
        This method should:
502
            - announce on the network that the controller will shutdown;
503
            - stop receiving incoming packages;
504
            - call the 'shutdown' method of each KytosNApp that is running;
505
            - finish reading the events on all buffers;
506
            - stop each running handler;
507
            - stop all running threads;
508
            - stop the KytosServer;
509
        """
510
        self.log.info("Stopping Kytos")
511
512
        self.buffers.send_stop_signal()
513
        self.napp_dir_listener.stop()
514
515
        for pool_name in executors:
516
            self.log.info("Stopping threadpool: %s", pool_name)
517
518
        threads = threading.enumerate()
519
        self.log.debug("%s threads before threadpool shutdown: %s",
520
                       len(threads), threads)
521
522
        for executor_pool in executors.values():
523
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
524
525
        # self.server.socket.shutdown()
526
        # self.server.socket.close()
527
528
        self.started_at = None
529
        self.unload_napps()
530
531
        # Cancel all async tasks (event handlers and servers)
532
        for task in self._tasks:
533
            task.cancel()
534
535
        # ASYNC TODO: close connections
536
        # self.server.server_close()
537
538
        self.stop_queue_monitors()
539
        if self.apm:
540
            self.log.info("Stopping APM server...")
541
            self.apm.close()
542
            self.log.info("Stopped APM Server")
543
        self.log.info("Stopping API Server...")
544
        self.api_server.stop()
545
        self.log.info("Stopped API Server")
546
        self.log.info("Stopping TCP Server...")
547
        self.server.shutdown()
548
        self.log.info("Stopped TCP Server")
549
        self.loop.stop()
550
551
    def status(self):
552
        """Return status of Kytos Server.
553
554
        If the controller kytos is running this method will be returned
555
        "Running since 'Started_At'", otherwise "Stopped".
556
557
        Returns:
558
            string: String with kytos status.
559
560
        """
561
        if self.started_at:
562
            return f"Running since {self.started_at}"
563
        return "Stopped"
564
565
    def uptime(self):
566
        """Return the uptime of kytos server.
567
568
        This method should return:
569
            - 0 if Kytos Server is stopped.
570
            - (kytos.start_at - datetime.now) if Kytos Server is running.
571
572
        Returns:
573
           datetime.timedelta: The uptime interval.
574
575
        """
576
        return now() - self.started_at if self.started_at else 0
577
578
    def notify_listeners(self, event):
579
        """Send the event to the specified listeners.
580
581
        Loops over self.events_listeners matching (by regexp) the attribute
582
        name of the event with the keys of events_listeners. If a match occurs,
583
        then send the event to each registered listener.
584
585
        Args:
586
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
587
        """
588
589
        self.log.debug("looking for listeners for %s", event)
590
        for event_regex, listeners in dict(self.events_listeners).items():
591
            # self.log.debug("listeners found for %s: %r => %s", event,
592
            #                event_regex, [l.__qualname__ for l in listeners])
593
            # Do not match if the event has more characters
594
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
595
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
596
                event_regex += '$'
597
            if re.match(event_regex, event.name):
598
                self.log.debug('Calling listeners for %s', event)
599
                for listener in listeners:
600
                    if asyncio.iscoroutinefunction(listener):
601
                        task = asyncio.create_task(listener(event))
602
                        self._alisten_tasks.add(task)
603
                        task.add_done_callback(self._alisten_tasks.discard)
604
                    else:
605
                        listener(event)
606
607
    # pylint: disable=broad-exception-caught
608
    async def event_handler(self, buffer_name: str):
609
        """Default event handler that gets from an event buffer."""
610
        event_buffer = getattr(self.buffers, buffer_name)
611
        self.log.info(f"Event handler {buffer_name} started")
612
        while True:
613
            try:
614
                # After hitting issues with a large amount of flows being
615
                # installed (16k which sends 32k events), this task was
616
                # hogging resources in the MainThread causing disconnections
617
                # and socket exceptions. With the following sleep, this task
618
                # yields to other loops mitigating the disconnection issues.
619
                # Now the cap for flow installation at the same time is 50k.
620
                await asyncio.sleep(0)
621
                event = await event_buffer.aget()
622
                self.notify_listeners(event)
623
624
                if event.name == "kytos/core.shutdown":
625
                    self.log.debug(f"Event handler {buffer_name} stopped")
626
                    break
627
            except Exception as exc:
628
                self.log.exception(f"Unhandled exception on {buffer_name}",
629
                                   exc_info=exc)
630
631
    async def publish_connection_error(self, event):
632
        """Publish connection error event.
633
634
        Args:
635
            event (KytosEvent): Event that triggered for error propagation.
636
        """
637
        event.name = \
638
            f"kytos/core.{event.destination.protocol.name}.connection.error"
639
        error_msg = f"Connection state: {event.destination.state}"
640
        event.content["exception"] = error_msg
641
        await self.buffers.conn.aput(event)
642
643
    # pylint: disable=broad-exception-caught
644
    async def msg_out_event_handler(self):
645
        """Handle msg_out events.
646
647
        Listen to the msg_out buffer and send all its events to the
648
        corresponding listeners.
649
        """
650
        self.log.info("Event handler msg_out started")
651
        while True:
652
            try:
653
                triggered_event = await self.buffers.msg_out.aget()
654
655
                if triggered_event.name == "kytos/core.shutdown":
656
                    self.log.debug("Message Out Event handler stopped")
657
                    break
658
659
                message = triggered_event.content['message']
660
                destination = triggered_event.destination
661
                if (destination and
662
                        not destination.state == ConnectionState.FINISHED):
663
                    packet = message.pack()
664
                    destination.send(packet)
665
                    self.log.debug('Connection %s: OUT OFP, '
666
                                   'version: %s, type: %s, xid: %s - %s',
667
                                   destination.id,
668
                                   message.header.version,
669
                                   message.header.message_type,
670
                                   message.header.xid,
671
                                   packet.hex())
672
                    self.notify_listeners(triggered_event)
673
            except OSError:
674
                await self.publish_connection_error(triggered_event)
675
                self.log.info("connection closed. Cannot send message")
676
            except PackException as err:
677
                self.log.error(
678
                    f"Discarding message: {message}, event: {triggered_event} "
679
                    f"because of PackException {err}"
680
                )
681
            except Exception as exc:
682
                self.log.exception("Unhandled exception on msg_out",
683
                                   exc_info=exc)
684
685
    def get_interface_by_id(self, interface_id):
686
        """Find a Interface  with interface_id.
687
688
        Args:
689
            interface_id(str): Interface Identifier.
690
691
        Returns:
692
            Interface: Instance of Interface with the id given.
693
694
        """
695
        if interface_id is None:
696
            return None
697
698
        switch_id = ":".join(interface_id.split(":")[:-1])
699
        interface_number = int(interface_id.split(":")[-1])
700
701
        switch = self.switches.get(switch_id)
702
703
        if not switch:
704
            return None
705
706
        return switch.interfaces.get(interface_number, None)
707
708
    def get_switch_by_dpid(self, dpid):
709
        """Return a specific switch by dpid.
710
711
        Args:
712
            dpid (|DPID|): dpid object used to identify a switch.
713
714
        Returns:
715
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
716
717
        """
718
        return self.switches.get(dpid)
719
720
    def get_switch_or_create(self, dpid, connection=None):
721
        """Return switch or create it if necessary.
722
723
        Args:
724
            dpid (|DPID|): dpid object used to identify a switch.
725
            connection (:class:`~kytos.core.connection.Connection`):
726
                connection used by switch. If a switch has a connection that
727
                will be updated.
728
729
        Returns:
730
            :class:`~kytos.core.switch.Switch`: new or existent switch.
731
732
        """
733
        with self._switches_lock:
734
            if connection:
735
                self.create_or_update_connection(connection)
736
737
            switch = self.get_switch_by_dpid(dpid)
738
            event_name = 'kytos/core.switch.'
739
740
            if switch is None:
741
                switch = Switch(dpid=dpid)
742
                self.add_new_switch(switch)
743
                event_name += 'new'
744
            else:
745
                event_name += 'reconnected'
746
            event = KytosEvent(name=event_name, content={'switch': switch})
747
748
            if connection:
749
                old_connection = switch.connection
750
                switch.update_connection(connection)
751
752
                if old_connection is not connection:
753
                    self.remove_connection(old_connection)
754
755
            self.buffers.conn.put(event)
756
757
            return switch
758
759
    def create_or_update_connection(self, connection):
760
        """Update a connection.
761
762
        Args:
763
            connection (:class:`~kytos.core.connection.Connection`):
764
                Instance of connection that will be updated.
765
        """
766
        self.connections[connection.id] = connection
767
768
    def get_connection_by_id(self, conn_id):
769
        """Return a existent connection by id.
770
771
        Args:
772
            id (int): id from a connection.
773
774
        Returns:
775
            :class:`~kytos.core.connection.Connection`: Instance of connection
776
                or None Type.
777
778
        """
779
        return self.connections.get(conn_id)
780
781
    def remove_connection(self, connection):
782
        """Close a existent connection and remove it.
783
784
        Args:
785
            connection (:class:`~kytos.core.connection.Connection`):
786
                Instance of connection that will be removed.
787
        """
788
        if connection is None:
789
            return False
790
791
        try:
792
            connection.close()
793
            del self.connections[connection.id]
794
        except KeyError:
795
            return False
796
        return True
797
798
    def remove_switch(self, switch):
799
        """Remove an existent switch.
800
801
        Args:
802
            switch (:class:`~kytos.core.switch.Switch`):
803
                Instance of switch that will be removed.
804
        """
805
        try:
806
            del self.switches[switch.dpid]
807
        except KeyError:
808
            return False
809
        return True
810
811
    def new_connection(self, event):
812
        """Handle a kytos/core.connection.new event.
813
814
        This method will read new connection event and store the connection
815
        (socket) into the connections attribute on the controller.
816
817
        It also clear all references to the connection since it is a new
818
        connection on the same ip:port.
819
820
        Args:
821
            event (~kytos.core.KytosEvent):
822
                The received event (``kytos/core.connection.new``) with the
823
                needed info.
824
        """
825
        self.log.info("Handling %s...", event)
826
827
        connection = event.source
828
        self.log.debug("Event source: %s", event.source)
829
830
        # Remove old connection (aka cleanup) if it exists
831
        if self.get_connection_by_id(connection.id):
832
            self.remove_connection(connection.id)
833
834
        # Update connections with the new connection
835
        self.create_or_update_connection(connection)
836
837
    def add_new_switch(self, switch):
838
        """Add a new switch on the controller.
839
840
        Args:
841
            switch (Switch): A Switch object
842
        """
843
        self.switches[switch.dpid] = switch
844
845
    def _import_napp(self, username, napp_name):
846
        """Import a NApp module.
847
848
        Raises:
849
            FileNotFoundError: if NApp's main.py is not found.
850
            ModuleNotFoundError: if any NApp requirement is not installed.
851
852
        """
853
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
854
        path = os.path.join(self.options.napps, username, napp_name,
855
                            'main.py')
856
        napp_spec = spec_from_file_location(mod_name, path)
857
        napp_module = module_from_spec(napp_spec)
858
        sys.modules[napp_spec.name] = napp_module
859
        napp_spec.loader.exec_module(napp_module)
860
        return napp_module
861
862
    def load_napp(self, username, napp_name):
863
        """Load a single NApp.
864
865
        Args:
866
            username (str): NApp username (makes up NApp's path).
867
            napp_name (str): Name of the NApp to be loaded.
868
        """
869
        if (username, napp_name) in self.napps:
870
            message = 'NApp %s/%s was already loaded'
871
            self.log.warning(message, username, napp_name)
872
            return
873
874
        try:
875
            napp_module = self._import_napp(username, napp_name)
876
        except ModuleNotFoundError as err:
877
            self.log.error("Error loading NApp '%s/%s': %s",
878
                           username, napp_name, err)
879
            return
880
        except FileNotFoundError as err:
881
            msg = "NApp module not found, assuming it's a meta-napp: %s"
882
            self.log.warning(msg, err.filename)
883
            return
884
885
        try:
886
            napp = napp_module.Main(controller=self)
887
        except Exception as exc:  # noqa pylint: disable=bare-except
888
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
889
            raise KytosNAppSetupException(msg) from exc
890
891
        self.napps[(username, napp_name)] = napp
892
893
        napp.start()
894
        self.api_server.authenticate_endpoints(napp)
895
        self.api_server.register_napp_endpoints(napp)
896
897
        # pylint: disable=protected-access
898
        for event, listeners in napp._listeners.items():
899
            self.events_listeners.setdefault(event, []).extend(listeners)
900
        # pylint: enable=protected-access
901
902
    def pre_install_napps(self, napps, enable=True):
903
        """Pre install and enable NApps.
904
905
        Before installing, it'll check if it's installed yet.
906
907
        Args:
908
            napps ([str]): List of NApps to be pre-installed and enabled.
909
        """
910
        all_napps = self.napps_manager.get_installed_napps()
911
        installed = [str(napp) for napp in all_napps]
912
        napps_diff = [napp for napp in napps if napp not in installed]
913
        for napp in napps_diff:
914
            self.napps_manager.install(napp, enable=enable)
915
916
    def load_napps(self):
917
        """Load all NApps enabled on the NApps dir."""
918
        for napp in self.napps_manager.get_enabled_napps():
919
            try:
920
                self.log.info("Loading NApp %s", napp.id)
921
                self.load_napp(napp.username, napp.name)
922
            except FileNotFoundError as exception:
923
                self.log.error("Could not load NApp %s: %s",
924
                               napp.id, exception)
925
                msg = f"NApp {napp.id} exception {str(exception)}"
926
                raise KytosNAppSetupException(msg) from exception
927
928
    def unload_napp(self, username, napp_name):
929
        """Unload a specific NApp.
930
931
        Args:
932
            username (str): NApp username.
933
            napp_name (str): Name of the NApp to be unloaded.
934
        """
935
        napp = self.napps.pop((username, napp_name), None)
936
937
        if napp is None:
938
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
939
        else:
940
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
941
            napp_id = NApp(username, napp_name).id
942
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
943
            napp_shutdown_fn = self.events_listeners[event.name][0]
944
            # Call listener before removing it from events_listeners
945
            napp_shutdown_fn(event)
946
947
            # Remove rest endpoints from that napp
948
            self.api_server.remove_napp_endpoints(napp)
949
950
            # Removing listeners from that napp
951
            # pylint: disable=protected-access
952
            for event_type, napp_listeners in napp._listeners.items():
953
                event_listeners = self.events_listeners[event_type]
954
                for listener in napp_listeners:
955
                    event_listeners.remove(listener)
956
                if not event_listeners:
957
                    del self.events_listeners[event_type]
958
            # pylint: enable=protected-access
959
960
    def unload_napps(self):
961
        """Unload all loaded NApps that are not core NApps
962
963
        NApps are unloaded in the reverse order that they are enabled to
964
        facilitate to shutdown gracefully.
965
        """
966
        for napp in reversed(self.napps_manager.get_enabled_napps()):
967
            self.unload_napp(napp.username, napp.name)
968
969
    def reload_napp_module(self, username, napp_name, napp_file):
970
        """Reload a NApp Module."""
971
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
972
        try:
973
            napp_module = import_module(mod_name)
974
        except ModuleNotFoundError:
975
            self.log.error("Module '%s' not found", mod_name)
976
            raise
977
        try:
978
            napp_module = reload_module(napp_module)
979
        except ImportError as err:
980
            self.log.error("Error reloading NApp '%s/%s': %s",
981
                           username, napp_name, err)
982
            raise
983
984
    def reload_napp(self, username, napp_name):
985
        """Reload a NApp."""
986
        self.unload_napp(username, napp_name)
987
        try:
988
            self.reload_napp_module(username, napp_name, 'settings')
989
            self.reload_napp_module(username, napp_name, 'main')
990
        except (ModuleNotFoundError, ImportError):
991
            return 400
992
        self.log.info("NApp '%s/%s' successfully reloaded",
993
                      username, napp_name)
994
        self.load_napp(username, napp_name)
995
        return 200
996
997
    def rest_reload_napp(self, request: Request) -> JSONResponse:
998
        """Request reload a NApp."""
999
        username = request.path_params["username"]
1000
        napp_name = request.path_params["napp_name"]
1001
        res = self.reload_napp(username, napp_name)
1002
        if res == 200:
1003
            return JSONResponse('reloaded')
1004
        return JSONResponse('fail to reload', status_code=res)
1005
1006
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
1007
        """Request reload all NApps."""
1008
        for napp in self.napps:
1009
            self.reload_napp(*napp)
1010
        return JSONResponse('reloaded')
1011
1012
    def _full_queue_counter(self) -> Counter:
1013
        """Generate full queue stats counter."""
1014
        buffer_counter = defaultdict(Counter)
1015
        for buffer in self.buffers.get_all_buffers():
1016
            if not buffer.full():
1017
                continue
1018
            while not buffer.empty():
1019
                event = buffer.get()
1020
                buffer_counter[buffer.name][event.name] += 1
1021
        return buffer_counter
1022
1023
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
1024
        """Try to fmt traceback message."""
1025
        if counter:
1026
            counter = dict(counter)
1027
            message = (
1028
                f"{message}\nFull KytosEventBuffers counters: {counter}"
1029
            )
1030
        return message
1031
1032
    def get_link_or_create(
1033
        self,
1034
        endpoint_a: Interface,
1035
        endpoint_b: Interface,
1036
        link_dict: Optional[dict] = None,
1037
    ) -> tuple[Link, bool]:
1038
        """Get an existing link or create a new one.
1039
1040
        Returns:
1041
            Tuple(Link, bool): Link and a boolean whether it has been created.
1042
        """
1043
        with self.links_lock:
1044
            new_link = Link(endpoint_a, endpoint_b)
1045
1046
            if new_link.id in self.links:
1047
                return (self.links[new_link.id], False)
1048
1049
            self.links[new_link.id] = new_link
1050
1051
            with new_link.link_lock:
1052
                endpoint_a.update_link(new_link)
1053
                endpoint_b.update_link(new_link)
1054
                new_link.endpoint_a = endpoint_a
1055
                new_link.endpoint_b = endpoint_b
1056
                endpoint_a.nni = True
1057
                endpoint_b.nni = True
1058
1059
                if link_dict and link_dict['enabled']:
1060
                    new_link.enable()
1061
                elif link_dict and not link_dict['enabled']:
1062
                    new_link.disable()
1063
1064
        return (new_link, True)
1065
1066
    def get_link(self, link_id: str) -> Optional[Link]:
1067
        """Return a link by its ID.
1068
1069
        Returns:
1070
            Optional[Link]: Link if found, None otherwise.
1071
        """
1072
        return self.links.get(link_id)
1073
1074
    def get_links_from_interfaces(
1075
        self,
1076
        interfaces: Iterable[Interface]
1077
    ) -> dict[str, Link]:
1078
        """Get a list of links that matched to all/any given interfaces."""
1079
        links_found = {}
1080
        with self.links_lock:
1081
            for link in self.links.copy().values():
1082
                for interface in interfaces:
1083
                    if any((
1084
                        interface.id == link.endpoint_a.id,
1085
                        interface.id == link.endpoint_b.id,
1086
                    )):
1087
                        links_found[link.id] = link
1088
            return links_found
1089