Passed
Push — master ( 6652ed...2f17b4 )
by Vinicius
03:43 queued 16s
created

kytos.core.controller.Controller.stop_controller()   B

Complexity

Conditions 5

Size

Total Lines 51
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 26
nop 2
dl 0
loc 51
rs 8.7893
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
import traceback
25
from asyncio import AbstractEventLoop
26
from collections import Counter, defaultdict
27
from importlib import import_module
28
from importlib import reload as reload_module
29
from importlib.util import module_from_spec, spec_from_file_location
30
from pathlib import Path
31
32
from pyof.foundation.exceptions import PackException
33
34
from kytos.core.api_server import APIServer, JSONResponse, Request
35
from kytos.core.apm import init_apm
36
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
37
from kytos.core.auth import Auth
38
from kytos.core.buffers import KytosBuffers
39
from kytos.core.config import KytosConfig
40
from kytos.core.connection import Connection, ConnectionState
41
from kytos.core.db import db_conn_wait
42
from kytos.core.dead_letter import DeadLetter
43
from kytos.core.events import KytosEvent
44
from kytos.core.exceptions import (KytosAPMInitException, KytosDBInitException,
45
                                   KytosNAppSetupException)
46
from kytos.core.helpers import executors, now
47
from kytos.core.logs import LogManager
48
from kytos.core.napps.base import NApp
49
from kytos.core.napps.manager import NAppsManager
50
from kytos.core.napps.napp_dir_listener import NAppDirListener
51
from kytos.core.queue_monitor import QueueMonitorWindow
52
from kytos.core.switch import Switch
53
54
__all__ = ('Controller',)
55
56
57
def exc_handler(_, exc, __):
58
    """Log uncaught exceptions.
59
60
    Args:
61
        exc_type (ignored): exception type
62
        exc: exception instance
63
        tb (ignored): traceback
64
    """
65
    logging.basicConfig(filename='errlog.log',
66
                        format='%(asctime)s:%(pathname)'
67
                        's:%(levelname)s:%(message)s')
68
    logging.exception('Uncaught Exception: %s', exc)
69
70
71
class Controller:
72
    """Main class of Kytos.
73
74
    The main responsibilities of this class are:
75
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
76
        - manage KytosNApps (install, load and unload);
77
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
78
        - manage which event should be sent to NApps methods;
79
        - manage the buffers handlers, considering one thread per handler.
80
    """
81
82
    # Created issue #568 for the disabled checks.
83
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
84
    # pylint: disable=consider-using-with,unnecessary-dunder-call
85
    # pylint: disable=too-many-lines
86
    def __init__(self, options=None, loop: AbstractEventLoop = None):
87
        """Init method of Controller class takes the parameters below.
88
89
        Args:
90
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
91
                instance of :class:`~kytos.core.config.KytosConfig` class.
92
            loop asyncio.AbstractEventLoop
93
        """
94
        if options is None:
95
            options = KytosConfig().options['daemon']
96
97
        self.loop = loop
98
99
        # asyncio tasks
100
        self._tasks: list[asyncio.Task] = []
101
102
        #: KytosBuffers: KytosBuffer object with Controller buffers
103
        self._buffers: KytosBuffers = None
104
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
105
        #:
106
        #: This dict stores all connections between the controller and the
107
        #: switches. The key for this dict is a tuple (ip, port). The content
108
        #: is a Connection
109
        self.connections: dict[tuple, Connection] = {}
110
        #: dict: mapping of events and event listeners.
111
        #:
112
        #: The key of the dict is a KytosEvent (or a string that represent a
113
        #: regex to match against KytosEvents) and the value is a list of
114
        #: methods that will receive the referenced event
115
        self.events_listeners = {'kytos/core.connection.new':
116
                                 [self.new_connection]}
117
118
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
119
        #:
120
        #: The key is the napp name (string), while the value is the napp
121
        #: instance itself.
122
        self.napps = {}
123
        #: Object generated by ParseArgs on config.py file
124
        self.options = options
125
        #: KytosServer: Instance of KytosServer that will be listening to TCP
126
        #: connections.
127
        self.server = None
128
        #: dict: Current existing switches.
129
        #:
130
        #: The key is the switch dpid, while the value is a Switch object.
131
        self.switches = {}  # dpid: Switch()
132
        self._switches_lock = threading.Lock()
133
134
        #: datetime.datetime: Time when the controller finished starting.
135
        self.started_at = None
136
137
        #: logging.Logger: Logger instance used by Kytos.
138
        self.log = None
139
140
        #: Observer that handle NApps when they are enabled or disabled.
141
        self.napp_dir_listener = NAppDirListener(self)
142
143
        self.napps_manager = NAppsManager(self)
144
145
        #: API Server used to expose rest endpoints.
146
        self.api_server = APIServer(self.options.listen,
147
                                    self.options.api_port,
148
                                    self.napps_manager,
149
                                    self.options.napps,
150
                                    self.options.api_traceback_on_500)
151
152
        self.auth = None
153
        self.dead_letter = DeadLetter(self)
154
        self._alisten_tasks = set()
155
        self.qmonitors: list[QueueMonitorWindow] = []
156
157
        #: APM client in memory to be closed when necessary
158
        self.apm = None
159
160
        self._register_endpoints()
161
        #: Adding the napps 'enabled' directory into the PATH
162
        #: Now you can access the enabled napps with:
163
        #: from napps.<username>.<napp_name> import ?....
164
        sys.path.append(os.path.join(self.options.napps, os.pardir))
165
        sys.excepthook = exc_handler
166
167
    def start_auth(self):
168
        """Initialize Auth() and its services"""
169
        self.auth = Auth(self)
170
        self.auth.register_core_auth_services()
171
172
    def enable_logs(self):
173
        """Register kytos log and enable the logs."""
174
        decorators = self.options.logger_decorators
175
        try:
176
            decorators = [self._resolve(deco) for deco in decorators]
177
        except ModuleNotFoundError as err:
178
            sys.exit(f'Failed to resolve decorator module: {err.name}')
179
        except AttributeError:
180
            sys.exit(f'Failed to resolve decorator name: {decorators}')
181
        LogManager.decorate_logger_class(*decorators)
182
        LogManager.load_config_file(self.options.logging, self.options.debug)
183
        # pylint: disable=fixme
184
        # TODO issue 371
185
        # LogManager.enable_websocket(self.api_server.server)
186
        self.log = logging.getLogger(__name__)
187
        self._patch_core_loggers()
188
189
    @staticmethod
190
    def _resolve(name):
191
        """Resolve a dotted name to a global object."""
192
        mod, _, attr = name.rpartition('.')
193
        mod = importlib.import_module(mod)
194
        return getattr(mod, attr)
195
196
    @staticmethod
197
    def _patch_core_loggers():
198
        """Patch in updated loggers to 'kytos.core.*' modules"""
199
        match_str = 'kytos.core.'
200
        str_len = len(match_str)
201
        reloadable_mods = [module for mod_name, module in sys.modules.items()
202
                           if mod_name[:str_len] == match_str]
203
        for module in reloadable_mods:
204
            module.LOG = logging.getLogger(module.__name__)
205
206
    @staticmethod
207
    def loggers():
208
        """List all logging Loggers.
209
210
        Return a list of Logger objects, with name and logging level.
211
        """
212
        # pylint: disable=no-member
213
        return [logging.getLogger(name)
214
                for name in logging.root.manager.loggerDict
215
                if "kytos" in name]
216
217
    def toggle_debug(self, name=None):
218
        """Enable/disable logging debug messages to a given logger name.
219
220
        If the name parameter is not specified the debug will be
221
        enabled/disabled following the initial config file. It will decide
222
        to enable/disable using the 'kytos' name to find the current
223
        logger level.
224
        Obs: To disable the debug the logging will be set to NOTSET
225
226
        Args:
227
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
228
        """
229
        # pylint: disable=no-member
230
        if name and name not in logging.root.manager.loggerDict:
231
            # A Logger name that is not declared in logging will raise an error
232
            # otherwise logging would create a new Logger.
233
            raise ValueError(f"Invalid logger name: {name}")
234
235
        if not name:
236
            # Logger name not specified.
237
            level = logging.getLogger('kytos').getEffectiveLevel()
238
            enable_debug = level != logging.DEBUG
239
240
            # Enable/disable default Loggers
241
            LogManager.load_config_file(self.options.logging, enable_debug)
242
            return
243
244
        # Get effective logger level for the name
245
        level = logging.getLogger(name).getEffectiveLevel()
246
        logger = logging.getLogger(name)
247
248
        if level == logging.DEBUG:
249
            # disable debug
250
            logger.setLevel(logging.NOTSET)
251
        else:
252
            # enable debug
253
            logger.setLevel(logging.DEBUG)
254
255
    def start(self, restart=False):
256
        """Create pidfile and call start_controller method."""
257
        self.enable_logs()
258
        # pylint: disable=broad-except
259
        try:
260
            if self.options.database:
261
                db_conn_wait(db_backend=self.options.database)
262
                self.start_auth()
263
            if self.options.apm:
264
                self.apm = init_apm(self.options.apm, app=self.api_server.app)
265
            if not restart:
266
                self.create_pidfile()
267
            self.start_controller()
268
        except (KytosDBInitException, KytosAPMInitException) as exc:
269
            message = f"Kytos couldn't start because of {str(exc)}"
270
            sys.exit(message)
271
        except Exception as exc:
272
            exc_fmt = traceback.format_exc(chain=True)
273
            message = f"Kytos couldn't start because of {str(exc)} {exc_fmt}"
274
            counter = self._full_queue_counter()
275
            sys.exit(self._try_to_fmt_traceback_msg(message, counter))
276
277
    def start_queue_monitors(self) -> None:
278
        """Start QueueMonitorWindows."""
279
        for data in self.options.event_buffer_monitors:
280
            for qmon in QueueMonitorWindow.from_buffer_config(self, **data):
281
                self.qmonitors.append(qmon)
282
        for data in self.options.thread_pool_queue_monitors:
283
            for qmon in QueueMonitorWindow.from_threadpool_config(**data):
284
                self.qmonitors.append(qmon)
285
        for qmonitor in self.qmonitors:
286
            self.log.info(f"Starting {qmonitor}...")
287
            qmonitor.start()
288
289
    def stop_queue_monitors(self) -> None:
290
        """Stop QueueMonitorWindows."""
291
        for qmonitor in self.qmonitors:
292
            self.log.info(f"Stopping {qmonitor}...")
293
            qmonitor.stop()
294
295
    def create_pidfile(self):
296
        """Create a pidfile."""
297
        pid = os.getpid()
298
299
        # Creates directory if it doesn't exist
300
        # System can erase /var/run's content
301
        pid_folder = Path(self.options.pidfile).parent
302
        self.log.info(pid_folder)
303
304
        # Pylint incorrectly infers Path objects
305
        # https://github.com/PyCQA/pylint/issues/224
306
        # pylint: disable=no-member
307
        if not pid_folder.exists():
308
            pid_folder.mkdir()
309
            pid_folder.chmod(0o1777)
310
        # pylint: enable=no-member
311
312
        # Make sure the file is deleted when controller stops
313
        atexit.register(Path(self.options.pidfile).unlink)
314
315
        # Checks if a pidfile exists. Creates a new file.
316
        try:
317
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
318
        except OSError:
319
            # This happens if there is a pidfile already.
320
            # We shall check if the process that created the pidfile is still
321
            # running.
322
            try:
323
                existing_file = open(self.options.pidfile, mode='r',
324
                                     encoding="utf8")
325
                old_pid = int(existing_file.read())
326
                os.kill(old_pid, 0)
327
                # If kill() doesn't return an error, there's a process running
328
                # with the same PID. We assume it is Kytos and quit.
329
                # Otherwise, overwrite the file and proceed.
330
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
331
                             "running. Aborting.")
332
                sys.exit(error_msg.format(self.options.pidfile))
333
            except OSError:
334
                try:
335
                    pidfile = open(self.options.pidfile, mode='w',
336
                                   encoding="utf8")
337
                except OSError as exception:
338
                    error_msg = "Failed to create pidfile {}: {}."
339
                    sys.exit(error_msg.format(self.options.pidfile, exception))
340
341
        # Identifies the process that created the pidfile.
342
        pidfile.write(str(pid))
343
        pidfile.close()
344
345
    @property
346
    def buffers(self) -> KytosBuffers:
347
        """KytosBuffers exposed through this property to allow lazy
348
        async initiliation with an event loop running."""
349
        if not self._buffers:
350
            self._buffers = KytosBuffers()
351
        return self._buffers
352
353
    def start_controller(self):
354
        """Start the controller.
355
356
        Starts the KytosServer (TCP Server) coroutine.
357
        Starts a thread for each buffer handler.
358
        Load the installed apps.
359
        """
360
        self.log.info("Starting Kytos - Kytos Controller")
361
        if not self._buffers:
362
            self._buffers = KytosBuffers()
363
        self.server = KytosServer((self.options.listen,
364
                                   int(self.options.port)),
365
                                  KytosServerProtocol,
366
                                  self,
367
                                  self.options.protocol_name)
368
369
        self.log.info("Starting TCP server: %s", self.server)
370
        self.server.serve_forever()
371
372
        task = self.loop.create_task(self.api_server.serve())
373
        self._tasks.append(task)
374
375
        # ASYNC TODO: ensure all threads started correctly
376
        # This is critical, if any of them failed starting we should exit.
377
        # sys.exit(error_msg.format(thread, exception))
378
379
        self.log.info("Loading Kytos NApps...")
380
        self.napp_dir_listener.start()
381
        self.pre_install_napps(self.options.napps_pre_installed)
382
        self.load_napps()
383
        self.api_server.start_web_ui()
384
385
        # Start task handlers consumers after NApps to potentially
386
        # avoid discarding an event if a consumer isn't ready yet
387
        task = self.loop.create_task(self.event_handler("conn"))
388
        self._tasks.append(task)
389
        task = self.loop.create_task(self.event_handler("raw"))
390
        self._tasks.append(task)
391
        task = self.loop.create_task(self.event_handler("msg_in"))
392
        self._tasks.append(task)
393
        task = self.loop.create_task(self.msg_out_event_handler())
394
        self._tasks.append(task)
395
        task = self.loop.create_task(self.event_handler("app"))
396
        self._tasks.append(task)
397
        task = self.loop.create_task(self.event_handler("meta"))
398
        self._tasks.append(task)
399
400
        self.start_queue_monitors()
401
        self.started_at = now()
402
403
    def _register_endpoints(self):
404
        """Register all rest endpoint served by kytos.
405
406
        -   Register APIServer endpoints
407
        -   Register WebUI endpoints
408
        -   Register ``/api/kytos/core/config`` endpoint
409
        """
410
        self.api_server.start_api()
411
        # Register controller endpoints as /api/kytos/core/...
412
        self.api_server.register_core_endpoint('config/',
413
                                               self.configuration_endpoint)
414
        self.api_server.register_core_endpoint('metadata/',
415
                                               Controller.metadata_endpoint)
416
        self.api_server.register_core_endpoint(
417
            'reload/{username}/{napp_name}/',
418
            self.rest_reload_napp)
419
        self.api_server.register_core_endpoint('reload/all',
420
                                               self.rest_reload_all_napps)
421
        self.dead_letter.register_endpoints()
422
423
    def register_rest_endpoint(self, url, function, methods):
424
        """Deprecate in favor of @rest decorator."""
425
        self.api_server.register_rest_endpoint(url, function, methods)
426
427
    @classmethod
428
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
429
        """Return the Kytos metadata.
430
431
        Returns:
432
            string: Json with current kytos metadata.
433
434
        """
435
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
436
        with open(meta_path, encoding="utf8") as file:
437
            meta_file = file.read()
438
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
439
        return JSONResponse(metadata)
440
441
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
442
        """Return the configuration options used by Kytos.
443
444
        Returns:
445
            string: Json with current configurations used by kytos.
446
447
        """
448
        return JSONResponse(KytosConfig.options_exposed(self.options.__dict__))
449
450
    def restart(self, graceful=True):
451
        """Restart Kytos SDN Controller.
452
453
        Args:
454
            graceful(bool): Represents the way that Kytos will restart.
455
        """
456
        if self.started_at is not None:
457
            self.stop(graceful)
458
            self.__init__(self.options)
459
460
        self.start(restart=True)
461
462
    def stop(self, graceful=True):
463
        """Shutdown all services used by kytos.
464
465
        This method should:
466
            - stop all Websockets
467
            - stop the API Server
468
            - stop the Controller
469
        """
470
        if self.started_at:
471
            self.stop_controller(graceful)
472
473
    def stop_controller(self, graceful=True):
474
        """Stop the controller.
475
476
        This method should:
477
            - announce on the network that the controller will shutdown;
478
            - stop receiving incoming packages;
479
            - call the 'shutdown' method of each KytosNApp that is running;
480
            - finish reading the events on all buffers;
481
            - stop each running handler;
482
            - stop all running threads;
483
            - stop the KytosServer;
484
        """
485
        self.log.info("Stopping Kytos")
486
487
        self.buffers.send_stop_signal()
488
        self.napp_dir_listener.stop()
489
490
        for pool_name in executors:
491
            self.log.info("Stopping threadpool: %s", pool_name)
492
493
        threads = threading.enumerate()
494
        self.log.debug("%s threads before threadpool shutdown: %s",
495
                       len(threads), threads)
496
497
        for executor_pool in executors.values():
498
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
499
500
        # self.server.socket.shutdown()
501
        # self.server.socket.close()
502
503
        self.started_at = None
504
        self.unload_napps()
505
506
        # Cancel all async tasks (event handlers and servers)
507
        for task in self._tasks:
508
            task.cancel()
509
510
        # ASYNC TODO: close connections
511
        # self.server.server_close()
512
513
        self.stop_queue_monitors()
514
        if self.apm:
515
            self.log.info("Stopping APM server...")
516
            self.apm.close()
517
        self.log.info("Stopping API Server...")
518
        self.api_server.stop()
519
        self.log.info("Stopped API Server")
520
        self.log.info("Stopping TCP Server...")
521
        self.server.shutdown()
522
        self.log.info("Stopped TCP Server")
523
        self.loop.stop()
524
525
    def status(self):
526
        """Return status of Kytos Server.
527
528
        If the controller kytos is running this method will be returned
529
        "Running since 'Started_At'", otherwise "Stopped".
530
531
        Returns:
532
            string: String with kytos status.
533
534
        """
535
        if self.started_at:
536
            return f"Running since {self.started_at}"
537
        return "Stopped"
538
539
    def uptime(self):
540
        """Return the uptime of kytos server.
541
542
        This method should return:
543
            - 0 if Kytos Server is stopped.
544
            - (kytos.start_at - datetime.now) if Kytos Server is running.
545
546
        Returns:
547
           datetime.timedelta: The uptime interval.
548
549
        """
550
        return now() - self.started_at if self.started_at else 0
551
552
    def notify_listeners(self, event):
553
        """Send the event to the specified listeners.
554
555
        Loops over self.events_listeners matching (by regexp) the attribute
556
        name of the event with the keys of events_listeners. If a match occurs,
557
        then send the event to each registered listener.
558
559
        Args:
560
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
561
        """
562
563
        self.log.debug("looking for listeners for %s", event)
564
        for event_regex, listeners in dict(self.events_listeners).items():
565
            # self.log.debug("listeners found for %s: %r => %s", event,
566
            #                event_regex, [l.__qualname__ for l in listeners])
567
            # Do not match if the event has more characters
568
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
569
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
570
                event_regex += '$'
571
            if re.match(event_regex, event.name):
572
                self.log.debug('Calling listeners for %s', event)
573
                for listener in listeners:
574
                    if asyncio.iscoroutinefunction(listener):
575
                        task = asyncio.create_task(listener(event))
576
                        self._alisten_tasks.add(task)
577
                        task.add_done_callback(self._alisten_tasks.discard)
578
                    else:
579
                        listener(event)
580
581
    async def event_handler(self, buffer_name: str):
582
        """Default event handler that gets from an event buffer."""
583
        event_buffer = getattr(self.buffers, buffer_name)
584
        self.log.info(f"Event handler {buffer_name} started")
585
        while True:
586
            event = await event_buffer.aget()
587
            self.notify_listeners(event)
588
589
            if event.name == "kytos/core.shutdown":
590
                self.log.debug(f"Event handler {buffer_name} stopped")
591
                break
592
593
    async def publish_connection_error(self, event):
594
        """Publish connection error event.
595
596
        Args:
597
            event (KytosEvent): Event that triggered for error propagation.
598
        """
599
        event.name = \
600
            f"kytos/core.{event.destination.protocol.name}.connection.error"
601
        error_msg = f"Connection state: {event.destination.state}"
602
        event.content["exception"] = error_msg
603
        await self.buffers.conn.aput(event)
604
605
    async def msg_out_event_handler(self):
606
        """Handle msg_out events.
607
608
        Listen to the msg_out buffer and send all its events to the
609
        corresponding listeners.
610
        """
611
        self.log.info("Event handler msg_out started")
612
        while True:
613
            triggered_event = await self.buffers.msg_out.aget()
614
615
            if triggered_event.name == "kytos/core.shutdown":
616
                self.log.debug("Message Out Event handler stopped")
617
                break
618
619
            message = triggered_event.content['message']
620
            destination = triggered_event.destination
621
            try:
622
                if (destination and
623
                        not destination.state == ConnectionState.FINISHED):
624
                    packet = message.pack()
625
                    destination.send(packet)
626
                    self.log.debug('Connection %s: OUT OFP, '
627
                                   'version: %s, type: %s, xid: %s - %s',
628
                                   destination.id,
629
                                   message.header.version,
630
                                   message.header.message_type,
631
                                   message.header.xid,
632
                                   packet.hex())
633
                    self.notify_listeners(triggered_event)
634
            except OSError:
635
                await self.publish_connection_error(triggered_event)
636
                self.log.info("connection closed. Cannot send message")
637
            except PackException as err:
638
                self.log.error(
639
                    f"Discarding message: {message}, event: {triggered_event} "
640
                    f"because of PackException {err}"
641
                )
642
643
    def get_interface_by_id(self, interface_id):
644
        """Find a Interface  with interface_id.
645
646
        Args:
647
            interface_id(str): Interface Identifier.
648
649
        Returns:
650
            Interface: Instance of Interface with the id given.
651
652
        """
653
        if interface_id is None:
654
            return None
655
656
        switch_id = ":".join(interface_id.split(":")[:-1])
657
        interface_number = int(interface_id.split(":")[-1])
658
659
        switch = self.switches.get(switch_id)
660
661
        if not switch:
662
            return None
663
664
        return switch.interfaces.get(interface_number, None)
665
666
    def get_switch_by_dpid(self, dpid):
667
        """Return a specific switch by dpid.
668
669
        Args:
670
            dpid (|DPID|): dpid object used to identify a switch.
671
672
        Returns:
673
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
674
675
        """
676
        return self.switches.get(dpid)
677
678
    def get_switch_or_create(self, dpid, connection=None):
679
        """Return switch or create it if necessary.
680
681
        Args:
682
            dpid (|DPID|): dpid object used to identify a switch.
683
            connection (:class:`~kytos.core.connection.Connection`):
684
                connection used by switch. If a switch has a connection that
685
                will be updated.
686
687
        Returns:
688
            :class:`~kytos.core.switch.Switch`: new or existent switch.
689
690
        """
691
        with self._switches_lock:
692
            if connection:
693
                self.create_or_update_connection(connection)
694
695
            switch = self.get_switch_by_dpid(dpid)
696
            event_name = 'kytos/core.switch.'
697
698
            if switch is None:
699
                switch = Switch(dpid=dpid)
700
                self.add_new_switch(switch)
701
                event_name += 'new'
702
            else:
703
                event_name += 'reconnected'
704
            event = KytosEvent(name=event_name, content={'switch': switch})
705
706
            if connection:
707
                old_connection = switch.connection
708
                switch.update_connection(connection)
709
710
                if old_connection is not connection:
711
                    self.remove_connection(old_connection)
712
713
            self.buffers.conn.put(event)
714
715
            return switch
716
717
    def create_or_update_connection(self, connection):
718
        """Update a connection.
719
720
        Args:
721
            connection (:class:`~kytos.core.connection.Connection`):
722
                Instance of connection that will be updated.
723
        """
724
        self.connections[connection.id] = connection
725
726
    def get_connection_by_id(self, conn_id):
727
        """Return a existent connection by id.
728
729
        Args:
730
            id (int): id from a connection.
731
732
        Returns:
733
            :class:`~kytos.core.connection.Connection`: Instance of connection
734
                or None Type.
735
736
        """
737
        return self.connections.get(conn_id)
738
739
    def remove_connection(self, connection):
740
        """Close a existent connection and remove it.
741
742
        Args:
743
            connection (:class:`~kytos.core.connection.Connection`):
744
                Instance of connection that will be removed.
745
        """
746
        if connection is None:
747
            return False
748
749
        try:
750
            connection.close()
751
            del self.connections[connection.id]
752
        except KeyError:
753
            return False
754
        return True
755
756
    def remove_switch(self, switch):
757
        """Remove an existent switch.
758
759
        Args:
760
            switch (:class:`~kytos.core.switch.Switch`):
761
                Instance of switch that will be removed.
762
        """
763
        try:
764
            del self.switches[switch.dpid]
765
        except KeyError:
766
            return False
767
        return True
768
769
    def new_connection(self, event):
770
        """Handle a kytos/core.connection.new event.
771
772
        This method will read new connection event and store the connection
773
        (socket) into the connections attribute on the controller.
774
775
        It also clear all references to the connection since it is a new
776
        connection on the same ip:port.
777
778
        Args:
779
            event (~kytos.core.KytosEvent):
780
                The received event (``kytos/core.connection.new``) with the
781
                needed info.
782
        """
783
        self.log.info("Handling %s...", event)
784
785
        connection = event.source
786
        self.log.debug("Event source: %s", event.source)
787
788
        # Remove old connection (aka cleanup) if it exists
789
        if self.get_connection_by_id(connection.id):
790
            self.remove_connection(connection.id)
791
792
        # Update connections with the new connection
793
        self.create_or_update_connection(connection)
794
795
    def add_new_switch(self, switch):
796
        """Add a new switch on the controller.
797
798
        Args:
799
            switch (Switch): A Switch object
800
        """
801
        self.switches[switch.dpid] = switch
802
803
    def _import_napp(self, username, napp_name):
804
        """Import a NApp module.
805
806
        Raises:
807
            FileNotFoundError: if NApp's main.py is not found.
808
            ModuleNotFoundError: if any NApp requirement is not installed.
809
810
        """
811
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
812
        path = os.path.join(self.options.napps, username, napp_name,
813
                            'main.py')
814
        napp_spec = spec_from_file_location(mod_name, path)
815
        napp_module = module_from_spec(napp_spec)
816
        sys.modules[napp_spec.name] = napp_module
817
        napp_spec.loader.exec_module(napp_module)
818
        return napp_module
819
820
    def load_napp(self, username, napp_name):
821
        """Load a single NApp.
822
823
        Args:
824
            username (str): NApp username (makes up NApp's path).
825
            napp_name (str): Name of the NApp to be loaded.
826
        """
827
        if (username, napp_name) in self.napps:
828
            message = 'NApp %s/%s was already loaded'
829
            self.log.warning(message, username, napp_name)
830
            return
831
832
        try:
833
            napp_module = self._import_napp(username, napp_name)
834
        except ModuleNotFoundError as err:
835
            self.log.error("Error loading NApp '%s/%s': %s",
836
                           username, napp_name, err)
837
            return
838
        except FileNotFoundError as err:
839
            msg = "NApp module not found, assuming it's a meta-napp: %s"
840
            self.log.warning(msg, err.filename)
841
            return
842
843
        try:
844
            napp = napp_module.Main(controller=self)
845
        except Exception as exc:  # noqa pylint: disable=bare-except
846
            msg = f"NApp {username}/{napp_name} exception {str(exc)} "
847
            raise KytosNAppSetupException(msg) from exc
848
849
        self.napps[(username, napp_name)] = napp
850
851
        napp.start()
852
        self.api_server.authenticate_endpoints(napp)
853
        self.api_server.register_napp_endpoints(napp)
854
855
        # pylint: disable=protected-access
856
        for event, listeners in napp._listeners.items():
857
            self.events_listeners.setdefault(event, []).extend(listeners)
858
        # pylint: enable=protected-access
859
860
    def pre_install_napps(self, napps, enable=True):
861
        """Pre install and enable NApps.
862
863
        Before installing, it'll check if it's installed yet.
864
865
        Args:
866
            napps ([str]): List of NApps to be pre-installed and enabled.
867
        """
868
        all_napps = self.napps_manager.get_installed_napps()
869
        installed = [str(napp) for napp in all_napps]
870
        napps_diff = [napp for napp in napps if napp not in installed]
871
        for napp in napps_diff:
872
            self.napps_manager.install(napp, enable=enable)
873
874
    def load_napps(self):
875
        """Load all NApps enabled on the NApps dir."""
876
        for napp in self.napps_manager.get_enabled_napps():
877
            try:
878
                self.log.info("Loading NApp %s", napp.id)
879
                self.load_napp(napp.username, napp.name)
880
            except FileNotFoundError as exception:
881
                self.log.error("Could not load NApp %s: %s",
882
                               napp.id, exception)
883
                msg = f"NApp {napp.id} exception {str(exception)}"
884
                raise KytosNAppSetupException(msg) from exception
885
886
    def unload_napp(self, username, napp_name):
887
        """Unload a specific NApp.
888
889
        Args:
890
            username (str): NApp username.
891
            napp_name (str): Name of the NApp to be unloaded.
892
        """
893
        napp = self.napps.pop((username, napp_name), None)
894
895
        if napp is None:
896
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
897
        else:
898
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
899
            napp_id = NApp(username, napp_name).id
900
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
901
            napp_shutdown_fn = self.events_listeners[event.name][0]
902
            # Call listener before removing it from events_listeners
903
            napp_shutdown_fn(event)
904
905
            # Remove rest endpoints from that napp
906
            self.api_server.remove_napp_endpoints(napp)
907
908
            # Removing listeners from that napp
909
            # pylint: disable=protected-access
910
            for event_type, napp_listeners in napp._listeners.items():
911
                event_listeners = self.events_listeners[event_type]
912
                for listener in napp_listeners:
913
                    event_listeners.remove(listener)
914
                if not event_listeners:
915
                    del self.events_listeners[event_type]
916
            # pylint: enable=protected-access
917
918
    def unload_napps(self):
919
        """Unload all loaded NApps that are not core NApps
920
921
        NApps are unloaded in the reverse order that they are enabled to
922
        facilitate to shutdown gracefully.
923
        """
924
        for napp in reversed(self.napps_manager.get_enabled_napps()):
925
            self.unload_napp(napp.username, napp.name)
926
927
    def reload_napp_module(self, username, napp_name, napp_file):
928
        """Reload a NApp Module."""
929
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
930
        try:
931
            napp_module = import_module(mod_name)
932
        except ModuleNotFoundError:
933
            self.log.error("Module '%s' not found", mod_name)
934
            raise
935
        try:
936
            napp_module = reload_module(napp_module)
937
        except ImportError as err:
938
            self.log.error("Error reloading NApp '%s/%s': %s",
939
                           username, napp_name, err)
940
            raise
941
942
    def reload_napp(self, username, napp_name):
943
        """Reload a NApp."""
944
        self.unload_napp(username, napp_name)
945
        try:
946
            self.reload_napp_module(username, napp_name, 'settings')
947
            self.reload_napp_module(username, napp_name, 'main')
948
        except (ModuleNotFoundError, ImportError):
949
            return 400
950
        self.log.info("NApp '%s/%s' successfully reloaded",
951
                      username, napp_name)
952
        self.load_napp(username, napp_name)
953
        return 200
954
955
    def rest_reload_napp(self, request: Request) -> JSONResponse:
956
        """Request reload a NApp."""
957
        username = request.path_params["username"]
958
        napp_name = request.path_params["napp_name"]
959
        res = self.reload_napp(username, napp_name)
960
        if res == 200:
961
            return JSONResponse('reloaded')
962
        return JSONResponse('fail to reload', status_code=res)
963
964
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
965
        """Request reload all NApps."""
966
        for napp in self.napps:
967
            self.reload_napp(*napp)
968
        return JSONResponse('reloaded')
969
970
    def _full_queue_counter(self) -> Counter:
971
        """Generate full queue stats counter."""
972
        buffer_counter = defaultdict(Counter)
973
        for buffer in self.buffers.get_all_buffers():
974
            if not buffer.full():
975
                continue
976
            while not buffer.empty():
977
                event = buffer.get()
978
                buffer_counter[buffer.name][event.name] += 1
979
        return buffer_counter
980
981
    def _try_to_fmt_traceback_msg(self, message: str, counter: Counter) -> str:
982
        """Try to fmt traceback message."""
983
        if counter:
984
            counter = dict(counter)
985
            message = (
986
                f"{message}\nFull KytosEventBuffers counters: {counter}"
987
            )
988
        return message
989