Passed
Push — master ( 775f07...3147d4 )
by Vinicius
03:25 queued 18s
created

Controller.stop_queue_monitors()   A

Complexity

Conditions 2

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
from socket import error as SocketError
32
33
from pyof.foundation.exceptions import PackException
34
35
from kytos.core.api_server import APIServer, JSONResponse, Request
36
from kytos.core.apm import init_apm
37
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
38
from kytos.core.auth import Auth
39
from kytos.core.buffers import KytosBuffers
40
from kytos.core.config import KytosConfig
41
from kytos.core.connection import ConnectionState
42
from kytos.core.db import db_conn_wait
43
from kytos.core.dead_letter import DeadLetter
44
from kytos.core.events import KytosEvent
45
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
46
                                   KytosNAppSetupException)
47
from kytos.core.helpers import executors, now
48
from kytos.core.logs import LogManager
49
from kytos.core.napps.base import NApp
50
from kytos.core.napps.manager import NAppsManager
51
from kytos.core.napps.napp_dir_listener import NAppDirListener
52
from kytos.core.queue_monitor import QueueMonitorWindow
53
from kytos.core.switch import Switch
54
55
__all__ = ('Controller',)
56
57
58
def exc_handler(_, exc, __):
59
    """Log uncaught exceptions.
60
61
    Args:
62
        exc_type (ignored): exception type
63
        exc: exception instance
64
        tb (ignored): traceback
65
    """
66
    logging.basicConfig(filename='errlog.log',
67
                        format='%(asctime)s:%(pathname)'
68
                        's:%(levelname)s:%(message)s')
69
    logging.exception('Uncaught Exception: %s', exc)
70
71
72
class Controller:
73
    """Main class of Kytos.
74
75
    The main responsibilities of this class are:
76
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
77
        - manage KytosNApps (install, load and unload);
78
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
79
        - manage which event should be sent to NApps methods;
80
        - manage the buffers handlers, considering one thread per handler.
81
    """
82
83
    # Created issue #568 for the disabled checks.
84
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
85
    # pylint: disable=consider-using-with,unnecessary-dunder-call
86
    # pylint: disable=too-many-lines
87
    def __init__(self, options=None, loop: AbstractEventLoop = None):
88
        """Init method of Controller class takes the parameters below.
89
90
        Args:
91
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
92
                instance of :class:`~kytos.core.config.KytosConfig` class.
93
            loop asyncio.AbstractEventLoop
94
        """
95
        if options is None:
96
            options = KytosConfig().options['daemon']
97
98
        self.loop = loop
99
100
        # asyncio tasks
101
        self._tasks: list[asyncio.Task] = []
102
103
        #: KytosBuffers: KytosBuffer object with Controller buffers
104
        self._buffers: KytosBuffers = None
105
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
106
        #:
107
        #: This dict stores all connections between the controller and the
108
        #: switches. The key for this dict is a tuple (ip, port). The content
109
        #: is a Connection
110
        self.connections = {}
111
        #: dict: mapping of events and event listeners.
112
        #:
113
        #: The key of the dict is a KytosEvent (or a string that represent a
114
        #: regex to match against KytosEvents) and the value is a list of
115
        #: methods that will receive the referenced event
116
        self.events_listeners = {'kytos/core.connection.new':
117
                                 [self.new_connection]}
118
119
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
120
        #:
121
        #: The key is the napp name (string), while the value is the napp
122
        #: instance itself.
123
        self.napps = {}
124
        #: Object generated by ParseArgs on config.py file
125
        self.options = options
126
        #: KytosServer: Instance of KytosServer that will be listening to TCP
127
        #: connections.
128
        self.server = None
129
        #: dict: Current existing switches.
130
        #:
131
        #: The key is the switch dpid, while the value is a Switch object.
132
        self.switches = {}  # dpid: Switch()
133
        self._switches_lock = threading.Lock()
134
135
        #: datetime.datetime: Time when the controller finished starting.
136
        self.started_at = None
137
138
        #: logging.Logger: Logger instance used by Kytos.
139
        self.log = None
140
141
        #: Observer that handle NApps when they are enabled or disabled.
142
        self.napp_dir_listener = NAppDirListener(self)
143
144
        self.napps_manager = NAppsManager(self)
145
146
        #: API Server used to expose rest endpoints.
147
        self.api_server = APIServer(self.options.listen,
148
                                    self.options.api_port,
149
                                    self.napps_manager,
150
                                    self.options.napps,
151
                                    self.options.api_traceback_on_500)
152
153
        self.auth = None
154
        self.dead_letter = DeadLetter(self)
155
        self._alisten_tasks = set()
156
        self.qmonitors: list[QueueMonitorWindow] = []
157
158
        self._register_endpoints()
159
        #: Adding the napps 'enabled' directory into the PATH
160
        #: Now you can access the enabled napps with:
161
        #: from napps.<username>.<napp_name> import ?....
162
        sys.path.append(os.path.join(self.options.napps, os.pardir))
163
        sys.excepthook = exc_handler
164
165
    def start_auth(self):
166
        """Initialize Auth() and its services"""
167
        self.auth = Auth(self)
168
        self.auth.register_core_auth_services()
169
170
    def enable_logs(self):
171
        """Register kytos log and enable the logs."""
172
        decorators = self.options.logger_decorators
173
        try:
174
            decorators = [self._resolve(deco) for deco in decorators]
175
        except ModuleNotFoundError as err:
176
            sys.exit(f'Failed to resolve decorator module: {err.name}')
177
        except AttributeError:
178
            sys.exit(f'Failed to resolve decorator name: {decorators}')
179
        LogManager.decorate_logger_class(*decorators)
180
        LogManager.load_config_file(self.options.logging, self.options.debug)
181
        # pylint: disable=fixme
182
        # TODO issue 371
183
        # LogManager.enable_websocket(self.api_server.server)
184
        self.log = logging.getLogger(__name__)
185
        self._patch_core_loggers()
186
187
    @staticmethod
188
    def _resolve(name):
189
        """Resolve a dotted name to a global object."""
190
        mod, _, attr = name.rpartition('.')
191
        mod = importlib.import_module(mod)
192
        return getattr(mod, attr)
193
194
    @staticmethod
195
    def _patch_core_loggers():
196
        """Patch in updated loggers to 'kytos.core.*' modules"""
197
        match_str = 'kytos.core.'
198
        str_len = len(match_str)
199
        reloadable_mods = [module for mod_name, module in sys.modules.items()
200
                           if mod_name[:str_len] == match_str]
201
        for module in reloadable_mods:
202
            module.LOG = logging.getLogger(module.__name__)
203
204
    @staticmethod
205
    def loggers():
206
        """List all logging Loggers.
207
208
        Return a list of Logger objects, with name and logging level.
209
        """
210
        # pylint: disable=no-member
211
        return [logging.getLogger(name)
212
                for name in logging.root.manager.loggerDict
213
                if "kytos" in name]
214
215
    def toggle_debug(self, name=None):
216
        """Enable/disable logging debug messages to a given logger name.
217
218
        If the name parameter is not specified the debug will be
219
        enabled/disabled following the initial config file. It will decide
220
        to enable/disable using the 'kytos' name to find the current
221
        logger level.
222
        Obs: To disable the debug the logging will be set to NOTSET
223
224
        Args:
225
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
226
        """
227
        # pylint: disable=no-member
228
        if name and name not in logging.root.manager.loggerDict:
229
            # A Logger name that is not declared in logging will raise an error
230
            # otherwise logging would create a new Logger.
231
            raise ValueError(f"Invalid logger name: {name}")
232
233
        if not name:
234
            # Logger name not specified.
235
            level = logging.getLogger('kytos').getEffectiveLevel()
236
            enable_debug = level != logging.DEBUG
237
238
            # Enable/disable default Loggers
239
            LogManager.load_config_file(self.options.logging, enable_debug)
240
            return
241
242
        # Get effective logger level for the name
243
        level = logging.getLogger(name).getEffectiveLevel()
244
        logger = logging.getLogger(name)
245
246
        if level == logging.DEBUG:
247
            # disable debug
248
            logger.setLevel(logging.NOTSET)
249
        else:
250
            # enable debug
251
            logger.setLevel(logging.DEBUG)
252
253
    def start(self, restart=False):
254
        """Create pidfile and call start_controller method."""
255
        self.enable_logs()
256
        # pylint: disable=broad-except
257
        try:
258
            if self.options.database:
259
                db_conn_wait(db_backend=self.options.database)
260
                self.start_auth()
261
            if self.options.apm:
262
                init_apm(self.options.apm, app=self.api_server.app)
263
            if not restart:
264
                self.create_pidfile()
265
            self.start_controller()
266
        except (KytosDBInitException, KytosAPMInitException) as exc:
267
            message = f"Kytos couldn't start because of {str(exc)}"
268
            sys.exit(message)
269
        except Exception as exc:
270
            exc_fmt = traceback.format_exc(chain=True)
271
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
272
            counter = self._full_queue_counter()
273
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
274
275
    def start_queue_monitors(self) -> None:
276
        """Start QueueMonitorWindows."""
277
        for data in self.options.event_buffer_monitors:
278
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
279
                self.qmonitors.append(qmon)
280
        for data in self.options.thread_pool_queue_monitors:
281
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
282
                self.qmonitors.append(qmon)
283
        for qmonitor in self.qmonitors:
284
            self.log.info(f"Starting {qmonitor}...")
285
            qmonitor.start()
286
287
    def stop_queue_monitors(self) -> None:
288
        """Stop QueueMonitorWindows."""
289
        for qmonitor in self.qmonitors:
290
            self.log.info(f"Stopping {qmonitor}...")
291
            qmonitor.stop()
292
293
    def create_pidfile(self):
294
        """Create a pidfile."""
295
        pid = os.getpid()
296
297
        # Creates directory if it doesn't exist
298
        # System can erase /var/run's content
299
        pid_folder = Path(self.options.pidfile).parent
300
        self.log.info(pid_folder)
301
302
        # Pylint incorrectly infers Path objects
303
        # https://github.com/PyCQA/pylint/issues/224
304
        # pylint: disable=no-member
305
        if not pid_folder.exists():
306
            pid_folder.mkdir()
307
            pid_folder.chmod(0o1777)
308
        # pylint: enable=no-member
309
310
        # Make sure the file is deleted when controller stops
311
        atexit.register(Path(self.options.pidfile).unlink)
312
313
        # Checks if a pidfile exists. Creates a new file.
314
        try:
315
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
316
        except OSError:
317
            # This happens if there is a pidfile already.
318
            # We shall check if the process that created the pidfile is still
319
            # running.
320
            try:
321
                existing_file = open(self.options.pidfile, mode='r',
322
                                     encoding="utf8")
323
                old_pid = int(existing_file.read())
324
                os.kill(old_pid, 0)
325
                # If kill() doesn't return an error, there's a process running
326
                # with the same PID. We assume it is Kytos and quit.
327
                # Otherwise, overwrite the file and proceed.
328
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
329
                             "running. Aborting.")
330
                sys.exit(error_msg.format(self.options.pidfile))
331
            except OSError:
332
                try:
333
                    pidfile = open(self.options.pidfile, mode='w',
334
                                   encoding="utf8")
335
                except OSError as exception:
336
                    error_msg = "Failed to create pidfile {}: {}."
337
                    sys.exit(error_msg.format(self.options.pidfile, exception))
338
339
        # Identifies the process that created the pidfile.
340
        pidfile.write(str(pid))
341
        pidfile.close()
342
343
    @property
344
    def buffers(self) -> KytosBuffers:
345
        """KytosBuffers exposed through this property to allow lazy
346
        async initiliation with an event loop running."""
347
        if not self._buffers:
348
            self._buffers = KytosBuffers()
349
        return self._buffers
350
351
    def start_controller(self):
352
        """Start the controller.
353
354
        Starts the KytosServer (TCP Server) coroutine.
355
        Starts a thread for each buffer handler.
356
        Load the installed apps.
357
        """
358
        self.log.info("Starting Kytos - Kytos Controller")
359
        if not self._buffers:
360
            self._buffers = KytosBuffers()
361
        self.server = KytosServer((self.options.listen,
362
                                   int(self.options.port)),
363
                                  KytosServerProtocol,
364
                                  self,
365
                                  self.options.protocol_name)
366
367
        self.log.info("Starting TCP server: %s", self.server)
368
        self.server.serve_forever()
369
370
        task = self.loop.create_task(self.api_server.serve())
371
        self._tasks.append(task)
372
373
        # ASYNC TODO: ensure all threads started correctly
374
        # This is critical, if any of them failed starting we should exit.
375
        # sys.exit(error_msg.format(thread, exception))
376
377
        self.log.info("Loading Kytos NApps...")
378
        self.napp_dir_listener.start()
379
        self.pre_install_napps(self.options.napps_pre_installed)
380
        self.load_napps()
381
        self.api_server.start_web_ui()
382
383
        # Start task handlers consumers after NApps to potentially
384
        # avoid discarding an event if a consumer isn't ready yet
385
        task = self.loop.create_task(self.event_handler("conn"))
386
        self._tasks.append(task)
387
        task = self.loop.create_task(self.event_handler("raw"))
388
        self._tasks.append(task)
389
        task = self.loop.create_task(self.event_handler("msg_in"))
390
        self._tasks.append(task)
391
        task = self.loop.create_task(self.msg_out_event_handler())
392
        self._tasks.append(task)
393
        task = self.loop.create_task(self.event_handler("app"))
394
        self._tasks.append(task)
395
        task = self.loop.create_task(self.event_handler("meta"))
396
        self._tasks.append(task)
397
398
        self.start_queue_monitors()
399
        self.started_at = now()
400
401
    def _register_endpoints(self):
402
        """Register all rest endpoint served by kytos.
403
404
        -   Register APIServer endpoints
405
        -   Register WebUI endpoints
406
        -   Register ``/api/kytos/core/config`` endpoint
407
        """
408
        self.api_server.start_api()
409
        # Register controller endpoints as /api/kytos/core/...
410
        self.api_server.register_core_endpoint('config/',
411
                                               self.configuration_endpoint)
412
        self.api_server.register_core_endpoint('metadata/',
413
                                               Controller.metadata_endpoint)
414
        self.api_server.register_core_endpoint(
415
            'reload/{username}/{napp_name}/',
416
            self.rest_reload_napp)
417
        self.api_server.register_core_endpoint('reload/all',
418
                                               self.rest_reload_all_napps)
419
        self.dead_letter.register_endpoints()
420
421
    def register_rest_endpoint(self, url, function, methods):
422
        """Deprecate in favor of @rest decorator."""
423
        self.api_server.register_rest_endpoint(url, function, methods)
424
425
    @classmethod
426
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
427
        """Return the Kytos metadata.
428
429
        Returns:
430
            string: Json with current kytos metadata.
431
432
        """
433
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
434
        with open(meta_path, encoding="utf8") as file:
435
            meta_file = file.read()
436
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
437
        return JSONResponse(metadata)
438
439
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
440
        """Return the configuration options used by Kytos.
441
442
        Returns:
443
            string: Json with current configurations used by kytos.
444
445
        """
446
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
447
448
    def restart(self, graceful=True):
449
        """Restart Kytos SDN Controller.
450
451
        Args:
452
            graceful(bool): Represents the way that Kytos will restart.
453
        """
454
        if self.started_at is not None:
455
            self.stop(graceful)
456
            self.__init__(self.options)
457
458
        self.start(restart=True)
459
460
    def stop(self, graceful=True):
461
        """Shutdown all services used by kytos.
462
463
        This method should:
464
            - stop all Websockets
465
            - stop the API Server
466
            - stop the Controller
467
        """
468
        if self.started_at:
469
            self.stop_controller(graceful)
470
471
    def stop_controller(self, graceful=True):
472
        """Stop the controller.
473
474
        This method should:
475
            - announce on the network that the controller will shutdown;
476
            - stop receiving incoming packages;
477
            - call the 'shutdown' method of each KytosNApp that is running;
478
            - finish reading the events on all buffers;
479
            - stop each running handler;
480
            - stop all running threads;
481
            - stop the KytosServer;
482
        """
483
        self.log.info("Stopping Kytos")
484
485
        self.buffers.send_stop_signal()
486
        self.napp_dir_listener.stop()
487
488
        for pool_name in executors:
489
            self.log.info("Stopping threadpool: %s", pool_name)
490
491
        threads = threading.enumerate()
492
        self.log.debug("%s threads before threadpool shutdown: %s",
493
                       len(threads), threads)
494
495
        for executor_pool in executors.values():
496
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
497
498
        # self.server.socket.shutdown()
499
        # self.server.socket.close()
500
501
        self.started_at = None
502
        self.unload_napps()
503
504
        # Cancel all async tasks (event handlers and servers)
505
        for task in self._tasks:
506
            task.cancel()
507
508
        # ASYNC TODO: close connections
509
        # self.server.server_close()
510
511
        self.stop_queue_monitors()
512
        self.log.info("Stopping API Server...")
513
        self.api_server.stop()
514
        self.log.info("Stopped API Server")
515
        self.log.info("Stopping TCP Server...")
516
        self.server.shutdown()
517
        self.log.info("Stopped TCP Server")
518
        self.loop.stop()
519
520
    def status(self):
521
        """Return status of Kytos Server.
522
523
        If the controller kytos is running this method will be returned
524
        "Running since 'Started_At'", otherwise "Stopped".
525
526
        Returns:
527
            string: String with kytos status.
528
529
        """
530
        if self.started_at:
531
            return f"Running since {self.started_at}"
532
        return "Stopped"
533
534
    def uptime(self):
535
        """Return the uptime of kytos server.
536
537
        This method should return:
538
            - 0 if Kytos Server is stopped.
539
            - (kytos.start_at - datetime.now) if Kytos Server is running.
540
541
        Returns:
542
           datetime.timedelta: The uptime interval.
543
544
        """
545
        return now() - self.started_at if self.started_at else 0
546
547
    def notify_listeners(self, event):
548
        """Send the event to the specified listeners.
549
550
        Loops over self.events_listeners matching (by regexp) the attribute
551
        name of the event with the keys of events_listeners. If a match occurs,
552
        then send the event to each registered listener.
553
554
        Args:
555
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
556
        """
557
558
        self.log.debug("looking for listeners for %s", event)
559
        for event_regex, listeners in dict(self.events_listeners).items():
560
            # self.log.debug("listeners found for %s: %r => %s", event,
561
            #                event_regex, [l.__qualname__ for l in listeners])
562
            # Do not match if the event has more characters
563
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
564
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
565
                event_regex += '$'
566
            if re.match(event_regex, event.name):
567
                self.log.debug('Calling listeners for %s', event)
568
                for listener in listeners:
569
                    if asyncio.iscoroutinefunction(listener):
570
                        task = asyncio.create_task(listener(event))
571
                        self._alisten_tasks.add(task)
572
                        task.add_done_callback(self._alisten_tasks.discard)
573
                    else:
574
                        listener(event)
575
576
    async def event_handler(self, buffer_name: str):
577
        """Default event handler that gets from an event buffer."""
578
        event_buffer = getattr(self.buffers, buffer_name)
579
        self.log.info(f"Event handler {buffer_name} started")
580
        while True:
581
            event = await event_buffer.aget()
582
            self.notify_listeners(event)
583
584
            if event.name == "kytos/core.shutdown":
585
                self.log.debug(f"Event handler {buffer_name} stopped")
586
                break
587
588
    async def publish_connection_error(self, event):
589
        """Publish connection error event.
590
591
        Args:
592
            event (KytosEvent): Event that triggered for error propagation.
593
        """
594
        event.name = \
595
            f"kytos/core.{event.destination.protocol.name}.connection.error"
596
        error_msg = f"Connection state: {event.destination.state}"
597
        event.content["exception"] = error_msg
598
        await self.buffers.conn.aput(event)
599
600
    async def msg_out_event_handler(self):
601
        """Handle msg_out events.
602
603
        Listen to the msg_out buffer and send all its events to the
604
        corresponding listeners.
605
        """
606
        self.log.info("Event handler msg_out started")
607
        while True:
608
            triggered_event = await self.buffers.msg_out.aget()
609
610
            if triggered_event.name == "kytos/core.shutdown":
611
                self.log.debug("Message Out Event handler stopped")
612
                break
613
614
            message = triggered_event.content['message']
615
            destination = triggered_event.destination
616
            try:
617
                if (destination and
618
                        not destination.state == ConnectionState.FINISHED):
619
                    packet = message.pack()
620
                    destination.send(packet)
621
                    self.log.debug('Connection %s: OUT OFP, '
622
                                   'version: %s, type: %s, xid: %s - %s',
623
                                   destination.id,
624
                                   message.header.version,
625
                                   message.header.message_type,
626
                                   message.header.xid,
627
                                   packet.hex())
628
                    self.notify_listeners(triggered_event)
629
            except (OSError, SocketError):
630
                await self.publish_connection_error(triggered_event)
631
                self.log.info("connection closed. Cannot send message")
632
            except PackException as err:
633
                self.log.error(
634
                    f"Discarding message: {message}, event: {triggered_event} "
635
                    f"because of PackException {err}"
636
                )
637
638
    def get_interface_by_id(self, interface_id):
639
        """Find a Interface  with interface_id.
640
641
        Args:
642
            interface_id(str): Interface Identifier.
643
644
        Returns:
645
            Interface: Instance of Interface with the id given.
646
647
        """
648
        if interface_id is None:
649
            return None
650
651
        switch_id = ":".join(interface_id.split(":")[:-1])
652
        interface_number = int(interface_id.split(":")[-1])
653
654
        switch = self.switches.get(switch_id)
655
656
        if not switch:
657
            return None
658
659
        return switch.interfaces.get(interface_number, None)
660
661
    def get_switch_by_dpid(self, dpid):
662
        """Return a specific switch by dpid.
663
664
        Args:
665
            dpid (|DPID|): dpid object used to identify a switch.
666
667
        Returns:
668
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
669
670
        """
671
        return self.switches.get(dpid)
672
673
    def get_switch_or_create(self, dpid, connection=None):
674
        """Return switch or create it if necessary.
675
676
        Args:
677
            dpid (|DPID|): dpid object used to identify a switch.
678
            connection (:class:`~kytos.core.connection.Connection`):
679
                connection used by switch. If a switch has a connection that
680
                will be updated.
681
682
        Returns:
683
            :class:`~kytos.core.switch.Switch`: new or existent switch.
684
685
        """
686
        with self._switches_lock:
687
            if connection:
688
                self.create_or_update_connection(connection)
689
690
            switch = self.get_switch_by_dpid(dpid)
691
            event_name = 'kytos/core.switch.'
692
693
            if switch is None:
694
                switch = Switch(dpid=dpid)
695
                self.add_new_switch(switch)
696
                event_name += 'new'
697
            else:
698
                event_name += 'reconnected'
699
            event = KytosEvent(name=event_name, content={'switch': switch})
700
701
            if connection:
702
                old_connection = switch.connection
703
                switch.update_connection(connection)
704
705
                if old_connection is not connection:
706
                    self.remove_connection(old_connection)
707
708
            self.buffers.conn.put(event)
709
710
            return switch
711
712
    def create_or_update_connection(self, connection):
713
        """Update a connection.
714
715
        Args:
716
            connection (:class:`~kytos.core.connection.Connection`):
717
                Instance of connection that will be updated.
718
        """
719
        self.connections[connection.id] = connection
720
721
    def get_connection_by_id(self, conn_id):
722
        """Return a existent connection by id.
723
724
        Args:
725
            id (int): id from a connection.
726
727
        Returns:
728
            :class:`~kytos.core.connection.Connection`: Instance of connection
729
                or None Type.
730
731
        """
732
        return self.connections.get(conn_id)
733
734
    def remove_connection(self, connection):
735
        """Close a existent connection and remove it.
736
737
        Args:
738
            connection (:class:`~kytos.core.connection.Connection`):
739
                Instance of connection that will be removed.
740
        """
741
        if connection is None:
742
            return False
743
744
        try:
745
            connection.close()
746
            del self.connections[connection.id]
747
        except KeyError:
748
            return False
749
        return True
750
751
    def remove_switch(self, switch):
752
        """Remove an existent switch.
753
754
        Args:
755
            switch (:class:`~kytos.core.switch.Switch`):
756
                Instance of switch that will be removed.
757
        """
758
        try:
759
            del self.switches[switch.dpid]
760
        except KeyError:
761
            return False
762
        return True
763
764
    def new_connection(self, event):
765
        """Handle a kytos/core.connection.new event.
766
767
        This method will read new connection event and store the connection
768
        (socket) into the connections attribute on the controller.
769
770
        It also clear all references to the connection since it is a new
771
        connection on the same ip:port.
772
773
        Args:
774
            event (~kytos.core.KytosEvent):
775
                The received event (``kytos/core.connection.new``) with the
776
                needed info.
777
        """
778
        self.log.info("Handling %s...", event)
779
780
        connection = event.source
781
        self.log.debug("Event source: %s", event.source)
782
783
        # Remove old connection (aka cleanup) if it exists
784
        if self.get_connection_by_id(connection.id):
785
            self.remove_connection(connection.id)
786
787
        # Update connections with the new connection
788
        self.create_or_update_connection(connection)
789
790
    def add_new_switch(self, switch):
791
        """Add a new switch on the controller.
792
793
        Args:
794
            switch (Switch): A Switch object
795
        """
796
        self.switches[switch.dpid] = switch
797
798
    def _import_napp(self, username, napp_name):
799
        """Import a NApp module.
800
801
        Raises:
802
            FileNotFoundError: if NApp's main.py is not found.
803
            ModuleNotFoundError: if any NApp requirement is not installed.
804
805
        """
806
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
807
        path = os.path.join(self.options.napps, username, napp_name,
808
                            'main.py')
809
        napp_spec = spec_from_file_location(mod_name, path)
810
        napp_module = module_from_spec(napp_spec)
811
        sys.modules[napp_spec.name] = napp_module
812
        napp_spec.loader.exec_module(napp_module)
813
        return napp_module
814
815
    def load_napp(self, username, napp_name):
816
        """Load a single NApp.
817
818
        Args:
819
            username (str): NApp username (makes up NApp's path).
820
            napp_name (str): Name of the NApp to be loaded.
821
        """
822
        if (username, napp_name) in self.napps:
823
            message = 'NApp %s/%s was already loaded'
824
            self.log.warning(message, username, napp_name)
825
            return
826
827
        try:
828
            napp_module = self._import_napp(username, napp_name)
829
        except ModuleNotFoundError as err:
830
            self.log.error("Error loading NApp '%s/%s': %s",
831
                           username, napp_name, err)
832
            return
833
        except FileNotFoundError as err:
834
            msg = "NApp module not found, assuming it's a meta-napp: %s"
835
            self.log.warning(msg, err.filename)
836
            return
837
838
        try:
839
            napp = napp_module.Main(controller=self)
840
        except Exception as exc:  # noqa pylint: disable=bare-except
841
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
842
            raise KytosNAppSetupException(msg) from exc
843
844
        self.napps[(username, napp_name)] = napp
845
846
        napp.start()
847
        self.api_server.authenticate_endpoints(napp)
848
        self.api_server.register_napp_endpoints(napp)
849
850
        # pylint: disable=protected-access
851
        for event, listeners in napp._listeners.items():
852
            self.events_listeners.setdefault(event, []).extend(listeners)
853
        # pylint: enable=protected-access
854
855
    def pre_install_napps(self, napps, enable=True):
856
        """Pre install and enable NApps.
857
858
        Before installing, it'll check if it's installed yet.
859
860
        Args:
861
            napps ([str]): List of NApps to be pre-installed and enabled.
862
        """
863
        all_napps = self.napps_manager.get_installed_napps()
864
        installed = [str(napp) for napp in all_napps]
865
        napps_diff = [napp for napp in napps if napp not in installed]
866
        for napp in napps_diff:
867
            self.napps_manager.install(napp, enable=enable)
868
869
    def load_napps(self):
870
        """Load all NApps enabled on the NApps dir."""
871
        for napp in self.napps_manager.get_enabled_napps():
872
            try:
873
                self.log.info("Loading NApp %s", napp.id)
874
                self.load_napp(napp.username, napp.name)
875
            except FileNotFoundError as exception:
876
                self.log.error("Could not load NApp %s: %s",
877
                               napp.id, exception)
878
                msg = f"NApp {napp.id} exception {str(exception)}"
879
                raise KytosNAppSetupException(msg) from exception
880
881
    def unload_napp(self, username, napp_name):
882
        """Unload a specific NApp.
883
884
        Args:
885
            username (str): NApp username.
886
            napp_name (str): Name of the NApp to be unloaded.
887
        """
888
        napp = self.napps.pop((username, napp_name), None)
889
890
        if napp is None:
891
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
892
        else:
893
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
894
            napp_id = NApp(username, napp_name).id
895
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
896
            napp_shutdown_fn = self.events_listeners[event.name][0]
897
            # Call listener before removing it from events_listeners
898
            napp_shutdown_fn(event)
899
900
            # Remove rest endpoints from that napp
901
            self.api_server.remove_napp_endpoints(napp)
902
903
            # Removing listeners from that napp
904
            # pylint: disable=protected-access
905
            for event_type, napp_listeners in napp._listeners.items():
906
                event_listeners = self.events_listeners[event_type]
907
                for listener in napp_listeners:
908
                    event_listeners.remove(listener)
909
                if not event_listeners:
910
                    del self.events_listeners[event_type]
911
            # pylint: enable=protected-access
912
913
    def unload_napps(self):
914
        """Unload all loaded NApps that are not core NApps
915
916
        NApps are unloaded in the reverse order that they are enabled to
917
        facilitate to shutdown gracefully.
918
        """
919
        for napp in reversed(self.napps_manager.get_enabled_napps()):
920
            self.unload_napp(napp.username, napp.name)
921
922
    def reload_napp_module(self, username, napp_name, napp_file):
923
        """Reload a NApp Module."""
924
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
925
        try:
926
            napp_module = import_module(mod_name)
927
        except ModuleNotFoundError:
928
            self.log.error("Module '%s' not found", mod_name)
929
            raise
930
        try:
931
            napp_module = reload_module(napp_module)
932
        except ImportError as err:
933
            self.log.error("Error reloading NApp '%s/%s': %s",
934
                           username, napp_name, err)
935
            raise
936
937
    def reload_napp(self, username, napp_name):
938
        """Reload a NApp."""
939
        self.unload_napp(username, napp_name)
940
        try:
941
            self.reload_napp_module(username, napp_name, 'settings')
942
            self.reload_napp_module(username, napp_name, 'main')
943
        except (ModuleNotFoundError, ImportError):
944
            return 400
945
        self.log.info("NApp '%s/%s' successfully reloaded",
946
                      username, napp_name)
947
        self.load_napp(username, napp_name)
948
        return 200
949
950
    def rest_reload_napp(self, request: Request) -> JSONResponse:
951
        """Request reload a NApp."""
952
        username = request.path_params["username"]
953
        napp_name = request.path_params["napp_name"]
954
        res = self.reload_napp(username, napp_name)
955
        if res == 200:
956
            return JSONResponse('reloaded')
957
        return JSONResponse('fail to reload', status_code=res)
958
959
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
960
        """Request reload all NApps."""
961
        for napp in self.napps:
962
            self.reload_napp(*napp)
963
        return JSONResponse('reloaded')
964
965
    def _full_queue_counter(self) -> Counter:
966
        """Generate full queue stats counter."""
967
        buffer_counter = defaultdict(Counter)
968
        for buffer in self.buffers.get_all_buffers():
969
            if not buffer.full():
970
                continue
971
            while not buffer.empty():
972
                event = buffer.get()
973
                buffer_counter[buffer.name][event.name] += 1
974
        return buffer_counter
975
976
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
977
        """Try to fmt traceback message."""
978
        if counter:
979
            counter = dict(counter)
980
            message = (
981
                f"{message}\nFull KytosEventBuffers counters: {counter}"
982
            )
983
        return message
984