Passed
Pull Request — master (#375)
by Vinicius
08:19
created

Controller.start_controller()   A

Complexity

Conditions 2

Size

Total Lines 46
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 29
nop 1
dl 0
loc 46
rs 9.184
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16
import asyncio
17
import atexit
18
import importlib
19
import logging
20
import os
21
import re
22
import sys
23
import threading
24
from asyncio import AbstractEventLoop
25
from importlib import import_module
26
from importlib import reload as reload_module
27
from importlib.util import module_from_spec, spec_from_file_location
28
from pathlib import Path
29
from socket import error as SocketError
30
31
from pyof.foundation.exceptions import PackException
32
33
from kytos.core.api_server import APIServer, JSONResponse, Request
34
from kytos.core.apm import init_apm
35
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
36
from kytos.core.auth import Auth
37
from kytos.core.buffers import KytosBuffers
38
from kytos.core.config import KytosConfig
39
from kytos.core.connection import ConnectionState
40
from kytos.core.db import db_conn_wait
41
from kytos.core.dead_letter import DeadLetter
42
from kytos.core.events import KytosEvent
43
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
44
from kytos.core.helpers import executors, now
45
from kytos.core.interface import Interface
46
from kytos.core.logs import LogManager
47
from kytos.core.napps.base import NApp
48
from kytos.core.napps.manager import NAppsManager
49
from kytos.core.napps.napp_dir_listener import NAppDirListener
50
from kytos.core.switch import Switch
51
52
__all__ = ('Controller',)
53
54
55
def exc_handler(_, exc, __):
56
    """Log uncaught exceptions.
57
58
    Args:
59
        exc_type (ignored): exception type
60
        exc: exception instance
61
        tb (ignored): traceback
62
    """
63
    logging.basicConfig(filename='errlog.log',
64
                        format='%(asctime)s:%(pathname)'
65
                        's:%(levelname)s:%(message)s')
66
    logging.exception('Uncaught Exception: %s', exc)
67
68
69
class Controller:
70
    """Main class of Kytos.
71
72
    The main responsibilities of this class are:
73
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
74
        - manage KytosNApps (install, load and unload);
75
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
76
        - manage which event should be sent to NApps methods;
77
        - manage the buffers handlers, considering one thread per handler.
78
    """
79
80
    # Created issue #568 for the disabled checks.
81
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
82
    # pylint: disable=consider-using-with,unnecessary-dunder-call
83
    # pylint: disable=too-many-lines
84
    def __init__(self, options=None, loop: AbstractEventLoop = None):
85
        """Init method of Controller class takes the parameters below.
86
87
        Args:
88
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
89
                instance of :class:`~kytos.core.config.KytosConfig` class.
90
            loop asyncio.AbstractEventLoop
91
        """
92
        if options is None:
93
            options = KytosConfig().options['daemon']
94
95
        self.loop = loop
96
97
        # asyncio tasks
98
        self._tasks: list[asyncio.Task] = []
99
100
        #: KytosBuffers: KytosBuffer object with Controller buffers
101
        self._buffers: KytosBuffers = None
102
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
103
        #:
104
        #: This dict stores all connections between the controller and the
105
        #: switches. The key for this dict is a tuple (ip, port). The content
106
        #: is a Connection
107
        self.connections = {}
108
        #: dict: mapping of events and event listeners.
109
        #:
110
        #: The key of the dict is a KytosEvent (or a string that represent a
111
        #: regex to match against KytosEvents) and the value is a list of
112
        #: methods that will receive the referenced event
113
        self.events_listeners = {'kytos/core.connection.new':
114
                                 [self.new_connection]}
115
116
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
117
        #:
118
        #: The key is the napp name (string), while the value is the napp
119
        #: instance itself.
120
        self.napps = {}
121
        #: Object generated by ParseArgs on config.py file
122
        self.options = options
123
        #: KytosServer: Instance of KytosServer that will be listening to TCP
124
        #: connections.
125
        self.server = None
126
        #: dict: Current existing switches.
127
        #:
128
        #: The key is the switch dpid, while the value is a Switch object.
129
        self.switches = {}  # dpid: Switch()
130
        self._switches_lock = threading.Lock()
131
132
        #: datetime.datetime: Time when the controller finished starting.
133
        self.started_at = None
134
135
        #: logging.Logger: Logger instance used by Kytos.
136
        self.log = None
137
138
        #: Observer that handle NApps when they are enabled or disabled.
139
        self.napp_dir_listener = NAppDirListener(self)
140
141
        self.napps_manager = NAppsManager(self)
142
143
        #: API Server used to expose rest endpoints.
144
        self.api_server = APIServer(self.options.listen,
145
                                    self.options.api_port,
146
                                    self.napps_manager, self.options.napps)
147
148
        self.auth = None
149
        self.dead_letter = DeadLetter(self)
150
        self._alisten_tasks = set()
151
152
        self._register_endpoints()
153
        #: Adding the napps 'enabled' directory into the PATH
154
        #: Now you can access the enabled napps with:
155
        #: from napps.<username>.<napp_name> import ?....
156
        sys.path.append(os.path.join(self.options.napps, os.pardir))
157
        sys.excepthook = exc_handler
158
159
    def start_auth(self):
160
        """Initialize Auth() and its services"""
161
        self.auth = Auth(self)
162
        self.auth.register_core_auth_services()
163
164
    def enable_logs(self):
165
        """Register kytos log and enable the logs."""
166
        decorators = self.options.logger_decorators
167
        try:
168
            decorators = [self._resolve(deco) for deco in decorators]
169
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
170
            print(f'Failed to resolve decorator module: {err.name}')
171
            sys.exit(1)
172
        except AttributeError:
173
            print(f'Failed to resolve decorator name: {decorators}')
174
            sys.exit(1)
175
        LogManager.decorate_logger_class(*decorators)
176
        LogManager.load_config_file(self.options.logging, self.options.debug)
177
        # pylint: disable=fixme
178
        # TODO issue 371 (future PR for 2023.1)
179
        # LogManager.enable_websocket(self.api_server.server)
180
        self.log = logging.getLogger(__name__)
181
        self._patch_core_loggers()
182
183
    @staticmethod
184
    def _resolve(name):
185
        """Resolve a dotted name to a global object."""
186
        mod, _, attr = name.rpartition('.')
187
        mod = importlib.import_module(mod)
188
        return getattr(mod, attr)
189
190
    @staticmethod
191
    def _patch_core_loggers():
192
        """Patch in updated loggers to 'kytos.core.*' modules"""
193
        match_str = 'kytos.core.'
194
        str_len = len(match_str)
195
        reloadable_mods = [module for mod_name, module in sys.modules.items()
196
                           if mod_name[:str_len] == match_str]
197
        for module in reloadable_mods:
198
            module.LOG = logging.getLogger(module.__name__)
199
200
    @staticmethod
201
    def loggers():
202
        """List all logging Loggers.
203
204
        Return a list of Logger objects, with name and logging level.
205
        """
206
        # pylint: disable=no-member
207
        return [logging.getLogger(name)
208
                for name in logging.root.manager.loggerDict
209
                if "kytos" in name]
210
211
    def toggle_debug(self, name=None):
212
        """Enable/disable logging debug messages to a given logger name.
213
214
        If the name parameter is not specified the debug will be
215
        enabled/disabled following the initial config file. It will decide
216
        to enable/disable using the 'kytos' name to find the current
217
        logger level.
218
        Obs: To disable the debug the logging will be set to NOTSET
219
220
        Args:
221
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
222
        """
223
        # pylint: disable=no-member
224
        if name and name not in logging.root.manager.loggerDict:
225
            # A Logger name that is not declared in logging will raise an error
226
            # otherwise logging would create a new Logger.
227
            raise ValueError(f"Invalid logger name: {name}")
228
229
        if not name:
230
            # Logger name not specified.
231
            level = logging.getLogger('kytos').getEffectiveLevel()
232
            enable_debug = level != logging.DEBUG
233
234
            # Enable/disable default Loggers
235
            LogManager.load_config_file(self.options.logging, enable_debug)
236
            return
237
238
        # Get effective logger level for the name
239
        level = logging.getLogger(name).getEffectiveLevel()
240
        logger = logging.getLogger(name)
241
242
        if level == logging.DEBUG:
243
            # disable debug
244
            logger.setLevel(logging.NOTSET)
245
        else:
246
            # enable debug
247
            logger.setLevel(logging.DEBUG)
248
249
    def start(self, restart=False):
250
        """Create pidfile and call start_controller method."""
251
        self.enable_logs()
252
        if self.options.database:
253
            self.db_conn_or_core_shutdown()
254
            self.start_auth()
255
        if self.options.apm:
256
            self.init_apm_or_core_shutdown()
257
        if not restart:
258
            self.create_pidfile()
259
        self.start_controller()
260
261
    def create_pidfile(self):
262
        """Create a pidfile."""
263
        pid = os.getpid()
264
265
        # Creates directory if it doesn't exist
266
        # System can erase /var/run's content
267
        pid_folder = Path(self.options.pidfile).parent
268
        self.log.info(pid_folder)
269
270
        # Pylint incorrectly infers Path objects
271
        # https://github.com/PyCQA/pylint/issues/224
272
        # pylint: disable=no-member
273
        if not pid_folder.exists():
274
            pid_folder.mkdir()
275
            pid_folder.chmod(0o1777)
276
        # pylint: enable=no-member
277
278
        # Make sure the file is deleted when controller stops
279
        atexit.register(Path(self.options.pidfile).unlink)
280
281
        # Checks if a pidfile exists. Creates a new file.
282
        try:
283
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
284
        except OSError:
285
            # This happens if there is a pidfile already.
286
            # We shall check if the process that created the pidfile is still
287
            # running.
288
            try:
289
                existing_file = open(self.options.pidfile, mode='r',
290
                                     encoding="utf8")
291
                old_pid = int(existing_file.read())
292
                os.kill(old_pid, 0)
293
                # If kill() doesn't return an error, there's a process running
294
                # with the same PID. We assume it is Kytos and quit.
295
                # Otherwise, overwrite the file and proceed.
296
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
297
                             "running. Aborting.")
298
                sys.exit(error_msg.format(self.options.pidfile))
299
            except OSError:
300
                try:
301
                    pidfile = open(self.options.pidfile, mode='w',
302
                                   encoding="utf8")
303
                except OSError as exception:
304
                    error_msg = "Failed to create pidfile {}: {}."
305
                    sys.exit(error_msg.format(self.options.pidfile, exception))
306
307
        # Identifies the process that created the pidfile.
308
        pidfile.write(str(pid))
309
        pidfile.close()
310
311
    @property
312
    def buffers(self) -> KytosBuffers:
313
        """KytosBuffers exposed through this property to allow lazy
314
        async initiliation with an event loop running."""
315
        if not self._buffers:
316
            self._buffers = KytosBuffers()
317
        return self._buffers
318
319
    def start_controller(self):
320
        """Start the controller.
321
322
        Starts the KytosServer (TCP Server) coroutine.
323
        Starts a thread for each buffer handler.
324
        Load the installed apps.
325
        """
326
        self.log.info("Starting Kytos - Kytos Controller")
327
        if not self._buffers:
328
            self._buffers = KytosBuffers()
329
        self.server = KytosServer((self.options.listen,
330
                                   int(self.options.port)),
331
                                  KytosServerProtocol,
332
                                  self,
333
                                  self.options.protocol_name)
334
335
        self.log.info("Starting TCP server: %s", self.server)
336
        self.server.serve_forever()
337
338
        task = self.loop.create_task(self.api_server.serve())
339
        self._tasks.append(task)
340
341
        # ASYNC TODO: ensure all threads started correctly
342
        # This is critical, if any of them failed starting we should exit.
343
        # sys.exit(error_msg.format(thread, exception))
344
345
        self.log.info("Loading Kytos NApps...")
346
        self.napp_dir_listener.start()
347
        self.pre_install_napps(self.options.napps_pre_installed)
348
        self.load_napps()
349
        self.api_server.start_web_ui()
350
351
        # Start task handlers consumers after NApps to potentially
352
        # avoid discarding an event if a consumer isn't ready yet
353
        task = self.loop.create_task(self.event_handler("conn"))
354
        self._tasks.append(task)
355
        task = self.loop.create_task(self.event_handler("raw"))
356
        self._tasks.append(task)
357
        task = self.loop.create_task(self.event_handler("msg_in"))
358
        self._tasks.append(task)
359
        task = self.loop.create_task(self.msg_out_event_handler())
360
        self._tasks.append(task)
361
        task = self.loop.create_task(self.event_handler("app"))
362
        self._tasks.append(task)
363
364
        self.started_at = now()
365
366
    def db_conn_or_core_shutdown(self):
367
        """Ensure db connection or core shutdown."""
368
        try:
369
            db_conn_wait(db_backend=self.options.database)
370
        except KytosDBInitException as exc:
371
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
372
373
    def init_apm_or_core_shutdown(self, **kwargs):
374
        """Init APM instrumentation or core shutdown."""
375
        if not self.options.apm:
376
            return
377
        try:
378
            init_apm(self.options.apm, app=self.api_server.app, **kwargs)
379
        except KytosAPMInitException as exc:
380
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
381
382
    def _register_endpoints(self):
383
        """Register all rest endpoint served by kytos.
384
385
        -   Register APIServer endpoints
386
        -   Register WebUI endpoints
387
        -   Register ``/api/kytos/core/config`` endpoint
388
        """
389
        self.api_server.start_api()
390
        # Register controller endpoints as /api/kytos/core/...
391
        self.api_server.register_core_endpoint('config/',
392
                                               self.configuration_endpoint)
393
        self.api_server.register_core_endpoint('metadata/',
394
                                               Controller.metadata_endpoint)
395
        self.api_server.register_core_endpoint(
396
            'reload/{username}/{napp_name}/',
397
            self.rest_reload_napp)
398
        self.api_server.register_core_endpoint('reload/all',
399
                                               self.rest_reload_all_napps)
400
        self.dead_letter.register_endpoints()
401
402
    def register_rest_endpoint(self, url, function, methods):
403
        """Deprecate in favor of @rest decorator."""
404
        self.api_server.register_rest_endpoint(url, function, methods)
405
406
    @classmethod
407
    def metadata_endpoint(cls, _request: Request) -> JSONResponse:
408
        """Return the Kytos metadata.
409
410
        Returns:
411
            string: Json with current kytos metadata.
412
413
        """
414
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
415
        with open(meta_path, encoding="utf8") as file:
416
            meta_file = file.read()
417
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
418
        return JSONResponse(metadata)
419
420
    def configuration_endpoint(self, _request: Request) -> JSONResponse:
421
        """Return the configuration options used by Kytos.
422
423
        Returns:
424
            string: Json with current configurations used by kytos.
425
426
        """
427
        return JSONResponse(self.options.__dict__)
428
429
    def restart(self, graceful=True):
430
        """Restart Kytos SDN Controller.
431
432
        Args:
433
            graceful(bool): Represents the way that Kytos will restart.
434
        """
435
        if self.started_at is not None:
436
            self.stop(graceful)
437
            self.__init__(self.options)
438
439
        self.start(restart=True)
440
441
    def stop(self, graceful=True):
442
        """Shutdown all services used by kytos.
443
444
        This method should:
445
            - stop all Websockets
446
            - stop the API Server
447
            - stop the Controller
448
        """
449
        if self.started_at:
450
            self.stop_controller(graceful)
451
452
    def stop_controller(self, graceful=True):
453
        """Stop the controller.
454
455
        This method should:
456
            - announce on the network that the controller will shutdown;
457
            - stop receiving incoming packages;
458
            - call the 'shutdown' method of each KytosNApp that is running;
459
            - finish reading the events on all buffers;
460
            - stop each running handler;
461
            - stop all running threads;
462
            - stop the KytosServer;
463
        """
464
        self.log.info("Stopping Kytos")
465
466
        self.buffers.send_stop_signal()
467
        self.napp_dir_listener.stop()
468
469
        for pool_name in executors:
470
            self.log.info("Stopping threadpool: %s", pool_name)
471
472
        threads = threading.enumerate()
473
        self.log.debug("%s threads before threadpool shutdown: %s",
474
                       len(threads), threads)
475
476
        for executor_pool in executors.values():
477
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
478
479
        # self.server.socket.shutdown()
480
        # self.server.socket.close()
481
482
        self.started_at = None
483
        self.unload_napps()
484
485
        # Cancel all async tasks (event handlers and servers)
486
        for task in self._tasks:
487
            task.cancel()
488
489
        # ASYNC TODO: close connections
490
        # self.server.server_close()
491
492
        self.log.info("Stopping API Server...")
493
        self.api_server.stop()
494
        self.log.info("Stopped API Server")
495
        self.log.info("Stopping TCP Server...")
496
        self.server.shutdown()
497
        self.log.info("Stopped TCP Server")
498
        self.loop.stop()
499
500
    def status(self):
501
        """Return status of Kytos Server.
502
503
        If the controller kytos is running this method will be returned
504
        "Running since 'Started_At'", otherwise "Stopped".
505
506
        Returns:
507
            string: String with kytos status.
508
509
        """
510
        if self.started_at:
511
            return f"Running since {self.started_at}"
512
        return "Stopped"
513
514
    def uptime(self):
515
        """Return the uptime of kytos server.
516
517
        This method should return:
518
            - 0 if Kytos Server is stopped.
519
            - (kytos.start_at - datetime.now) if Kytos Server is running.
520
521
        Returns:
522
           datetime.timedelta: The uptime interval.
523
524
        """
525
        return now() - self.started_at if self.started_at else 0
526
527
    def notify_listeners(self, event):
528
        """Send the event to the specified listeners.
529
530
        Loops over self.events_listeners matching (by regexp) the attribute
531
        name of the event with the keys of events_listeners. If a match occurs,
532
        then send the event to each registered listener.
533
534
        Args:
535
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
536
        """
537
538
        self.log.debug("looking for listeners for %s", event)
539
        for event_regex, listeners in dict(self.events_listeners).items():
540
            # self.log.debug("listeners found for %s: %r => %s", event,
541
            #                event_regex, [l.__qualname__ for l in listeners])
542
            # Do not match if the event has more characters
543
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
544
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
545
                event_regex += '$'
546
            if re.match(event_regex, event.name):
547
                self.log.debug('Calling listeners for %s', event)
548
                for listener in listeners:
549
                    if asyncio.iscoroutinefunction(listener):
550
                        task = asyncio.create_task(listener(event))
551
                        self._alisten_tasks.add(task)
552
                        task.add_done_callback(self._alisten_tasks.discard)
553
                    else:
554
                        listener(event)
555
556
    async def event_handler(self, buffer_name: str):
557
        """Default event handler that gets from an event buffer."""
558
        event_buffer = getattr(self.buffers, buffer_name)
559
        self.log.info(f"Event handler {buffer_name} started")
560
        while True:
561
            event = await event_buffer.aget()
562
            self.notify_listeners(event)
563
564
            if event.name == "kytos/core.shutdown":
565
                self.log.debug(f"Event handler {buffer_name} stopped")
566
                break
567
568
    async def publish_connection_error(self, event):
569
        """Publish connection error event.
570
571
        Args:
572
            event (KytosEvent): Event that triggered for error propagation.
573
        """
574
        event.name = \
575
            f"kytos/core.{event.destination.protocol.name}.connection.error"
576
        error_msg = f"Connection state: {event.destination.state}"
577
        event.content["exception"] = error_msg
578
        await self.buffers.app.aput(event)
579
580
    async def msg_out_event_handler(self):
581
        """Handle msg_out events.
582
583
        Listen to the msg_out buffer and send all its events to the
584
        corresponding listeners.
585
        """
586
        self.log.info("Event handler msg_out started")
587
        while True:
588
            triggered_event = await self.buffers.msg_out.aget()
589
590
            if triggered_event.name == "kytos/core.shutdown":
591
                self.log.debug("Message Out Event handler stopped")
592
                break
593
594
            message = triggered_event.content['message']
595
            destination = triggered_event.destination
596
            try:
597
                if (destination and
598
                        not destination.state == ConnectionState.FINISHED):
599
                    packet = message.pack()
600
                    destination.send(packet)
601
                    self.log.debug('Connection %s: OUT OFP, '
602
                                   'version: %s, type: %s, xid: %s - %s',
603
                                   destination.id,
604
                                   message.header.version,
605
                                   message.header.message_type,
606
                                   message.header.xid,
607
                                   packet.hex())
608
                    self.notify_listeners(triggered_event)
609
            except (OSError, SocketError):
610
                await self.publish_connection_error(triggered_event)
611
                self.log.info("connection closed. Cannot send message")
612
            except PackException as err:
613
                self.log.error(
614
                    f"Discarding message: {message}, event: {triggered_event} "
615
                    f"because of PackException {err}"
616
                )
617
618
    def get_interface_by_id(self, interface_id):
619
        """Find a Interface  with interface_id.
620
621
        Args:
622
            interface_id(str): Interface Identifier.
623
624
        Returns:
625
            Interface: Instance of Interface with the id given.
626
627
        """
628
        if interface_id is None:
629
            return None
630
631
        switch_id = ":".join(interface_id.split(":")[:-1])
632
        interface_number = int(interface_id.split(":")[-1])
633
634
        switch = self.switches.get(switch_id)
635
636
        if not switch:
637
            return None
638
639
        return switch.interfaces.get(interface_number, None)
640
641
    def get_switch_by_dpid(self, dpid):
642
        """Return a specific switch by dpid.
643
644
        Args:
645
            dpid (|DPID|): dpid object used to identify a switch.
646
647
        Returns:
648
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
649
650
        """
651
        return self.switches.get(dpid)
652
653
    def get_switch_or_create(self, dpid, connection=None):
654
        """Return switch or create it if necessary.
655
656
        Args:
657
            dpid (|DPID|): dpid object used to identify a switch.
658
            connection (:class:`~kytos.core.connection.Connection`):
659
                connection used by switch. If a switch has a connection that
660
                will be updated.
661
662
        Returns:
663
            :class:`~kytos.core.switch.Switch`: new or existent switch.
664
665
        """
666
        with self._switches_lock:
667
            if connection:
668
                self.create_or_update_connection(connection)
669
670
            switch = self.get_switch_by_dpid(dpid)
671
            event_name = 'kytos/core.switch.'
672
673
            if switch is None:
674
                switch = Switch(dpid=dpid)
675
                self.add_new_switch(switch)
676
                event_name += 'new'
677
            else:
678
                event_name += 'reconnected'
679
680
            self.set_switch_options(dpid=dpid)
681
            event = KytosEvent(name=event_name, content={'switch': switch})
682
683
            if connection:
684
                old_connection = switch.connection
685
                switch.update_connection(connection)
686
687
                if old_connection is not connection:
688
                    self.remove_connection(old_connection)
689
690
            self.buffers.app.put(event)
691
692
            return switch
693
694
    def set_switch_options(self, dpid):
695
        """Update the switch settings based on kytos.conf options.
696
697
        Args:
698
            dpid (str): dpid used to identify a switch.
699
700
        """
701
        switch = self.switches.get(dpid)
702
        if not switch:
703
            return
704
705
        vlan_pool = {}
706
        vlan_pool = self.options.vlan_pool
707
        if not vlan_pool:
708
            return
709
710
        if vlan_pool.get(dpid):
711
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
712
            for intf_num, port_list in vlan_pool[dpid].items():
713
                if not switch.interfaces.get((intf_num)):
714
                    vlan_ids = set()
715
                    for vlan_range in port_list:
716
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
717
                        for vlan_id in range(vlan_begin, vlan_end):
718
                            vlan_ids.add(vlan_id)
719
                    intf_num = int(intf_num)
720
                    intf = Interface(name=intf_num, port_number=intf_num,
721
                                     switch=switch)
722
                    intf.set_available_tags(vlan_ids)
723
                    switch.update_interface(intf)
724
725
    def create_or_update_connection(self, connection):
726
        """Update a connection.
727
728
        Args:
729
            connection (:class:`~kytos.core.connection.Connection`):
730
                Instance of connection that will be updated.
731
        """
732
        self.connections[connection.id] = connection
733
734
    def get_connection_by_id(self, conn_id):
735
        """Return a existent connection by id.
736
737
        Args:
738
            id (int): id from a connection.
739
740
        Returns:
741
            :class:`~kytos.core.connection.Connection`: Instance of connection
742
                or None Type.
743
744
        """
745
        return self.connections.get(conn_id)
746
747
    def remove_connection(self, connection):
748
        """Close a existent connection and remove it.
749
750
        Args:
751
            connection (:class:`~kytos.core.connection.Connection`):
752
                Instance of connection that will be removed.
753
        """
754
        if connection is None:
755
            return False
756
757
        try:
758
            connection.close()
759
            del self.connections[connection.id]
760
        except KeyError:
761
            return False
762
        return True
763
764
    def remove_switch(self, switch):
765
        """Remove an existent switch.
766
767
        Args:
768
            switch (:class:`~kytos.core.switch.Switch`):
769
                Instance of switch that will be removed.
770
        """
771
        try:
772
            del self.switches[switch.dpid]
773
        except KeyError:
774
            return False
775
        return True
776
777
    def new_connection(self, event):
778
        """Handle a kytos/core.connection.new event.
779
780
        This method will read new connection event and store the connection
781
        (socket) into the connections attribute on the controller.
782
783
        It also clear all references to the connection since it is a new
784
        connection on the same ip:port.
785
786
        Args:
787
            event (~kytos.core.KytosEvent):
788
                The received event (``kytos/core.connection.new``) with the
789
                needed info.
790
        """
791
        self.log.info("Handling %s...", event)
792
793
        connection = event.source
794
        self.log.debug("Event source: %s", event.source)
795
796
        # Remove old connection (aka cleanup) if it exists
797
        if self.get_connection_by_id(connection.id):
798
            self.remove_connection(connection.id)
799
800
        # Update connections with the new connection
801
        self.create_or_update_connection(connection)
802
803
    def add_new_switch(self, switch):
804
        """Add a new switch on the controller.
805
806
        Args:
807
            switch (Switch): A Switch object
808
        """
809
        self.switches[switch.dpid] = switch
810
811
    def _import_napp(self, username, napp_name):
812
        """Import a NApp module.
813
814
        Raises:
815
            FileNotFoundError: if NApp's main.py is not found.
816
            ModuleNotFoundError: if any NApp requirement is not installed.
817
818
        """
819
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
820
        path = os.path.join(self.options.napps, username, napp_name,
821
                            'main.py')
822
        napp_spec = spec_from_file_location(mod_name, path)
823
        napp_module = module_from_spec(napp_spec)
824
        sys.modules[napp_spec.name] = napp_module
825
        napp_spec.loader.exec_module(napp_module)
826
        return napp_module
827
828
    def load_napp(self, username, napp_name):
829
        """Load a single NApp.
830
831
        Args:
832
            username (str): NApp username (makes up NApp's path).
833
            napp_name (str): Name of the NApp to be loaded.
834
        """
835
        if (username, napp_name) in self.napps:
836
            message = 'NApp %s/%s was already loaded'
837
            self.log.warning(message, username, napp_name)
838
            return
839
840
        try:
841
            napp_module = self._import_napp(username, napp_name)
842
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
843
            self.log.error("Error loading NApp '%s/%s': %s",
844
                           username, napp_name, err)
845
            return
846
        except FileNotFoundError as err:
847
            msg = "NApp module not found, assuming it's a meta-napp: %s"
848
            self.log.warning(msg, err.filename)
849
            return
850
851
        try:
852
            napp = napp_module.Main(controller=self)
853
        except:  # noqa pylint: disable=bare-except
854
            self.log.critical("NApp initialization failed: %s/%s",
855
                              username, napp_name, exc_info=True)
856
            return
857
858
        self.napps[(username, napp_name)] = napp
859
860
        napp.start()
861
        self.api_server.authenticate_endpoints(napp)
862
        self.api_server.register_napp_endpoints(napp)
863
864
        # pylint: disable=protected-access
865
        for event, listeners in napp._listeners.items():
866
            self.events_listeners.setdefault(event, []).extend(listeners)
867
        # pylint: enable=protected-access
868
869
    def pre_install_napps(self, napps, enable=True):
870
        """Pre install and enable NApps.
871
872
        Before installing, it'll check if it's installed yet.
873
874
        Args:
875
            napps ([str]): List of NApps to be pre-installed and enabled.
876
        """
877
        all_napps = self.napps_manager.get_installed_napps()
878
        installed = [str(napp) for napp in all_napps]
879
        napps_diff = [napp for napp in napps if napp not in installed]
880
        for napp in napps_diff:
881
            self.napps_manager.install(napp, enable=enable)
882
883
    def load_napps(self):
884
        """Load all NApps enabled on the NApps dir."""
885
        for napp in self.napps_manager.get_enabled_napps():
886
            try:
887
                self.log.info("Loading NApp %s", napp.id)
888
                self.load_napp(napp.username, napp.name)
889
            except FileNotFoundError as exception:
890
                self.log.error("Could not load NApp %s: %s",
891
                               napp.id, exception)
892
893
    def unload_napp(self, username, napp_name):
894
        """Unload a specific NApp.
895
896
        Args:
897
            username (str): NApp username.
898
            napp_name (str): Name of the NApp to be unloaded.
899
        """
900
        napp = self.napps.pop((username, napp_name), None)
901
902
        if napp is None:
903
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
904
        else:
905
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
906
            napp_id = NApp(username, napp_name).id
907
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
908
            napp_shutdown_fn = self.events_listeners[event.name][0]
909
            # Call listener before removing it from events_listeners
910
            napp_shutdown_fn(event)
911
912
            # Remove rest endpoints from that napp
913
            self.api_server.remove_napp_endpoints(napp)
914
915
            # Removing listeners from that napp
916
            # pylint: disable=protected-access
917
            for event_type, napp_listeners in napp._listeners.items():
918
                event_listeners = self.events_listeners[event_type]
919
                for listener in napp_listeners:
920
                    event_listeners.remove(listener)
921
                if not event_listeners:
922
                    del self.events_listeners[event_type]
923
            # pylint: enable=protected-access
924
925
    def unload_napps(self):
926
        """Unload all loaded NApps that are not core NApps
927
928
        NApps are unloaded in the reverse order that they are enabled to
929
        facilitate to shutdown gracefully.
930
        """
931
        for napp in reversed(self.napps_manager.get_enabled_napps()):
932
            self.unload_napp(napp.username, napp.name)
933
934
    def reload_napp_module(self, username, napp_name, napp_file):
935
        """Reload a NApp Module."""
936
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
937
        try:
938
            napp_module = import_module(mod_name)
939
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
940
            self.log.error("Module '%s' not found", mod_name)
941
            raise
942
        try:
943
            napp_module = reload_module(napp_module)
944
        except ImportError as err:
945
            self.log.error("Error reloading NApp '%s/%s': %s",
946
                           username, napp_name, err)
947
            raise
948
949
    def reload_napp(self, username, napp_name):
950
        """Reload a NApp."""
951
        self.unload_napp(username, napp_name)
952
        try:
953
            self.reload_napp_module(username, napp_name, 'settings')
954
            self.reload_napp_module(username, napp_name, 'main')
955
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
956
            return 400
957
        self.log.info("NApp '%s/%s' successfully reloaded",
958
                      username, napp_name)
959
        self.load_napp(username, napp_name)
960
        return 200
961
962
    def rest_reload_napp(self, request: Request) -> JSONResponse:
963
        """Request reload a NApp."""
964
        username = request.path_params["username"]
965
        napp_name = request.path_params["napp_name"]
966
        res = self.reload_napp(username, napp_name)
967
        if res == 200:
968
            return JSONResponse('reloaded')
969
        return JSONResponse('fail to reload', status_code=res)
970
971
    def rest_reload_all_napps(self, _request: Request) -> JSONResponse:
972
        """Request reload all NApps."""
973
        for napp in self.napps:
974
            self.reload_napp(*napp)
975
        return JSONResponse('reloaded')
976