Passed
Push — master ( 99a8a8...17b9df )
by Vinicius
03:24 queued 15s
created

kytos.core.controller.Controller.unload_napps()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 3
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 3
cts 3
cp 1
crap 2
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from kytos.core.api_server import APIServer
33 1
from kytos.core.apm import init_apm
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executors, now
44 1
from kytos.core.interface import Interface
45 1
from kytos.core.logs import LogManager
46 1
from kytos.core.napps.base import NApp
47 1
from kytos.core.napps.manager import NAppsManager
48 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 1
from kytos.core.switch import Switch
50
51 1
__all__ = ('Controller',)
52
53
54 1
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 1
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
81
    # pylint: disable=consider-using-with,unnecessary-dunder-call
82 1
    def __init__(self, options=None):
83
        """Init method of Controller class takes the parameters below.
84
85
        Args:
86
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
87
                instance of :class:`~kytos.core.config.KytosConfig` class.
88
        """
89 1
        if options is None:
90
            options = KytosConfig().options['daemon']
91
92 1
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 1
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 1
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 1
        self.buffers: KytosBuffers = None
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 1
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 1
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 1
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 1
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 1
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 1
        self.switches = {}  # dpid: Switch()
129 1
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 1
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 1
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 1
        self.napp_dir_listener = NAppDirListener(self)
139
140 1
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 1
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 1
        self.auth = Auth(self)
148 1
        self.dead_letter = DeadLetter(self)
149 1
        self._alisten_tasks = set()
150
151 1
        self._register_endpoints()
152
        #: Adding the napps 'enabled' directory into the PATH
153
        #: Now you can access the enabled napps with:
154
        #: from napps.<username>.<napp_name> import ?....
155 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
156 1
        sys.excepthook = exc_handler
157
158 1
    def enable_logs(self):
159
        """Register kytos log and enable the logs."""
160 1
        decorators = self.options.logger_decorators
161 1
        try:
162 1
            decorators = [self._resolve(deco) for deco in decorators]
163
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
164
            print(f'Failed to resolve decorator module: {err.name}')
165
            sys.exit(1)
166
        except AttributeError:
167
            print(f'Failed to resolve decorator name: {decorators}')
168
            sys.exit(1)
169 1
        LogManager.decorate_logger_class(*decorators)
170 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
171 1
        LogManager.enable_websocket(self.api_server.server)
172 1
        self.log = logging.getLogger(__name__)
173 1
        self._patch_core_loggers()
174
175 1
    @staticmethod
176 1
    def _resolve(name):
177
        """Resolve a dotted name to a global object."""
178
        mod, _, attr = name.rpartition('.')
179
        mod = importlib.import_module(mod)
180
        return getattr(mod, attr)
181
182 1
    @staticmethod
183 1
    def _patch_core_loggers():
184
        """Patch in updated loggers to 'kytos.core.*' modules"""
185 1
        match_str = 'kytos.core.'
186 1
        str_len = len(match_str)
187 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
188
                           if mod_name[:str_len] == match_str]
189 1
        for module in reloadable_mods:
190 1
            module.LOG = logging.getLogger(module.__name__)
191
192 1
    @staticmethod
193 1
    def loggers():
194
        """List all logging Loggers.
195
196
        Return a list of Logger objects, with name and logging level.
197
        """
198
        # pylint: disable=no-member
199 1
        return [logging.getLogger(name)
200
                for name in logging.root.manager.loggerDict
201
                if "kytos" in name]
202
203 1
    def toggle_debug(self, name=None):
204
        """Enable/disable logging debug messages to a given logger name.
205
206
        If the name parameter is not specified the debug will be
207
        enabled/disabled following the initial config file. It will decide
208
        to enable/disable using the 'kytos' name to find the current
209
        logger level.
210
        Obs: To disable the debug the logging will be set to NOTSET
211
212
        Args:
213
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
214
        """
215
        # pylint: disable=no-member
216 1
        if name and name not in logging.root.manager.loggerDict:
217
            # A Logger name that is not declared in logging will raise an error
218
            # otherwise logging would create a new Logger.
219 1
            raise ValueError(f"Invalid logger name: {name}")
220
221 1
        if not name:
222
            # Logger name not specified.
223 1
            level = logging.getLogger('kytos').getEffectiveLevel()
224 1
            enable_debug = level != logging.DEBUG
225
226
            # Enable/disable default Loggers
227 1
            LogManager.load_config_file(self.options.logging, enable_debug)
228 1
            return
229
230
        # Get effective logger level for the name
231 1
        level = logging.getLogger(name).getEffectiveLevel()
232 1
        logger = logging.getLogger(name)
233
234 1
        if level == logging.DEBUG:
235
            # disable debug
236 1
            logger.setLevel(logging.NOTSET)
237
        else:
238
            # enable debug
239 1
            logger.setLevel(logging.DEBUG)
240
241 1
    def start(self, restart=False):
242
        """Create pidfile and call start_controller method."""
243 1
        self.enable_logs()
244 1
        if self.options.database:
245 1
            self.db_conn_or_core_shutdown()
246 1
        if self.options.apm:
247 1
            self.init_apm_or_core_shutdown()
248 1
        if not restart:
249 1
            self.create_pidfile()
250 1
        self.start_controller()
251
252 1
    def create_pidfile(self):
253
        """Create a pidfile."""
254 1
        pid = os.getpid()
255
256
        # Creates directory if it doesn't exist
257
        # System can erase /var/run's content
258 1
        pid_folder = Path(self.options.pidfile).parent
259 1
        self.log.info(pid_folder)
260
261
        # Pylint incorrectly infers Path objects
262
        # https://github.com/PyCQA/pylint/issues/224
263
        # pylint: disable=no-member
264 1
        if not pid_folder.exists():
265
            pid_folder.mkdir()
266
            pid_folder.chmod(0o1777)
267
        # pylint: enable=no-member
268
269
        # Make sure the file is deleted when controller stops
270 1
        atexit.register(Path(self.options.pidfile).unlink)
271
272
        # Checks if a pidfile exists. Creates a new file.
273 1
        try:
274 1
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
275 1
        except OSError:
276
            # This happens if there is a pidfile already.
277
            # We shall check if the process that created the pidfile is still
278
            # running.
279 1
            try:
280 1
                existing_file = open(self.options.pidfile, mode='r',
281
                                     encoding="utf8")
282 1
                old_pid = int(existing_file.read())
283 1
                os.kill(old_pid, 0)
284
                # If kill() doesn't return an error, there's a process running
285
                # with the same PID. We assume it is Kytos and quit.
286
                # Otherwise, overwrite the file and proceed.
287
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
288
                             "running. Aborting.")
289
                sys.exit(error_msg.format(self.options.pidfile))
290 1
            except OSError:
291 1
                try:
292 1
                    pidfile = open(self.options.pidfile, mode='w',
293
                                   encoding="utf8")
294
                except OSError as exception:
295
                    error_msg = "Failed to create pidfile {}: {}."
296
                    sys.exit(error_msg.format(self.options.pidfile, exception))
297
298
        # Identifies the process that created the pidfile.
299 1
        pidfile.write(str(pid))
300 1
        pidfile.close()
301
302 1
    def start_controller(self):
303
        """Start the controller.
304
305
        Starts the KytosServer (TCP Server) coroutine.
306
        Starts a thread for each buffer handler.
307
        Load the installed apps.
308
        """
309 1
        self.log.info("Starting Kytos - Kytos Controller")
310 1
        self.buffers = KytosBuffers()
311 1
        self.server = KytosServer((self.options.listen,
312
                                   int(self.options.port)),
313
                                  KytosServerProtocol,
314
                                  self,
315
                                  self.options.protocol_name)
316
317 1
        self.log.info("Starting TCP server: %s", self.server)
318 1
        self.server.serve_forever()
319
320 1
        def _stop_loop(_):
321
            loop = asyncio.get_running_loop()
322
            # print(_.result())
323
            threads = threading.enumerate()
324
            self.log.debug("%s threads before loop.stop: %s",
325
                           len(threads), threads)
326
            loop.stop()
327
328 1
        async def _run_api_server_thread(executor):
329
            log = logging.getLogger('kytos.core.controller.api_server_thread')
330
            log.debug('starting')
331
            # log.debug('creating tasks')
332
            loop = asyncio.get_running_loop()
333
            blocking_tasks = [
334
                loop.run_in_executor(executor, self.api_server.run)
335
            ]
336
            # log.debug('waiting for tasks')
337
            completed, pending = await asyncio.wait(blocking_tasks)
338
            # results = [t.result() for t in completed]
339
            # log.debug('results: {!r}'.format(results))
340
            log.debug('completed: %d, pending: %d',
341
                      len(completed), len(pending))
342
343 1
        loop = asyncio.get_running_loop()
344 1
        task = loop.create_task(_run_api_server_thread(self._pool))
345 1
        task.add_done_callback(_stop_loop)
346 1
        self._tasks.append(task)
347
348 1
        self.log.info("ThreadPool started: %s", self._pool)
349
350
        # ASYNC TODO: ensure all threads started correctly
351
        # This is critical, if any of them failed starting we should exit.
352
        # sys.exit(error_msg.format(thread, exception))
353
354 1
        self.log.info("Loading Kytos NApps...")
355 1
        self.napp_dir_listener.start()
356 1
        self.pre_install_napps(self.options.napps_pre_installed)
357 1
        self.load_napps()
358
359
        # Start task handlers consumers after NApps to potentially
360
        # avoid discarding an event if a consumer isn't ready yet
361 1
        task = loop.create_task(self.event_handler("conn"))
362 1
        self._tasks.append(task)
363 1
        task = loop.create_task(self.event_handler("raw"))
364 1
        self._tasks.append(task)
365 1
        task = loop.create_task(self.event_handler("msg_in"))
366 1
        self._tasks.append(task)
367 1
        task = loop.create_task(self.msg_out_event_handler())
368 1
        self._tasks.append(task)
369 1
        task = loop.create_task(self.event_handler("app"))
370 1
        self._tasks.append(task)
371
372 1
        self.started_at = now()
373
374 1
    def db_conn_or_core_shutdown(self):
375
        """Ensure db connection or core shutdown."""
376 1
        try:
377 1
            db_conn_wait(db_backend=self.options.database)
378 1
        except KytosDBInitException as exc:
379 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
380
381 1
    def init_apm_or_core_shutdown(self, **kwargs):
382
        """Init APM instrumentation or core shutdown."""
383
        if not self.options.apm:
384
            return
385
        try:
386
            init_apm(self.options.apm, app=self.api_server.app,
387
                     **kwargs)
388
        except KytosAPMInitException as exc:
389
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
390
391 1
    def _register_endpoints(self):
392
        """Register all rest endpoint served by kytos.
393
394
        -   Register APIServer endpoints
395
        -   Register WebUI endpoints
396
        -   Register ``/api/kytos/core/config`` endpoint
397
        """
398 1
        self.api_server.start_api()
399
        # Register controller endpoints as /api/kytos/core/...
400 1
        self.api_server.register_core_endpoint('config/',
401
                                               self.configuration_endpoint)
402 1
        self.api_server.register_core_endpoint('metadata/',
403
                                               Controller.metadata_endpoint)
404 1
        self.api_server.register_core_endpoint(
405
            'reload/<username>/<napp_name>/',
406
            self.rest_reload_napp)
407 1
        self.api_server.register_core_endpoint('reload/all',
408
                                               self.rest_reload_all_napps)
409 1
        self.auth.register_core_auth_services()
410 1
        self.dead_letter.register_endpoints()
411
412 1
    def register_rest_endpoint(self, url, function, methods):
413
        """Deprecate in favor of @rest decorator."""
414 1
        self.api_server.register_rest_endpoint(url, function, methods)
415
416 1
    @classmethod
417 1
    def metadata_endpoint(cls):
418
        """Return the Kytos metadata.
419
420
        Returns:
421
            string: Json with current kytos metadata.
422
423
        """
424 1
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
425 1
        with open(meta_path, encoding="utf8") as file:
426 1
            meta_file = file.read()
427 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
428 1
        return json.dumps(metadata)
429
430 1
    def configuration_endpoint(self):
431
        """Return the configuration options used by Kytos.
432
433
        Returns:
434
            string: Json with current configurations used by kytos.
435
436
        """
437 1
        return json.dumps(self.options.__dict__)
438
439 1
    def restart(self, graceful=True):
440
        """Restart Kytos SDN Controller.
441
442
        Args:
443
            graceful(bool): Represents the way that Kytos will restart.
444
        """
445 1
        if self.started_at is not None:
446 1
            self.stop(graceful)
447 1
            self.__init__(self.options)
448
449 1
        self.start(restart=True)
450
451 1
    def stop(self, graceful=True):
452
        """Shutdown all services used by kytos.
453
454
        This method should:
455
            - stop all Websockets
456
            - stop the API Server
457
            - stop the Controller
458
        """
459 1
        if self.started_at:
460 1
            self.stop_controller(graceful)
461
462 1
    def stop_controller(self, graceful=True):
463
        """Stop the controller.
464
465
        This method should:
466
            - announce on the network that the controller will shutdown;
467
            - stop receiving incoming packages;
468
            - call the 'shutdown' method of each KytosNApp that is running;
469
            - finish reading the events on all buffers;
470
            - stop each running handler;
471
            - stop all running threads;
472
            - stop the KytosServer;
473
        """
474 1
        self.log.info("Stopping Kytos")
475
476 1
        self.buffers.send_stop_signal()
477 1
        self.napp_dir_listener.stop()
478
479 1
        for pool_name in executors:
480 1
            self.log.info("Stopping threadpool: %s", pool_name)
481
482 1
        threads = threading.enumerate()
483 1
        self.log.debug("%s threads before threadpool shutdown: %s",
484
                       len(threads), threads)
485
486 1
        for executor_pool in executors.values():
487 1
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
488
489
        # self.server.socket.shutdown()
490
        # self.server.socket.close()
491
492
        # for thread in self._threads.values():
493
        #     self.log.info("Stopping thread: %s", thread.name)
494
        #     thread.join()
495
496
        # for thread in self._threads.values():
497
        #     while thread.is_alive():
498
        #         self.log.info("Thread is alive: %s", thread.name)
499
        #         pass
500
501 1
        self.started_at = None
502 1
        self.unload_napps()
503 1
        self.buffers = KytosBuffers()
504
505
        # Cancel all async tasks (event handlers and servers)
506 1
        for task in self._tasks:
507
            task.cancel()
508
509
        # ASYNC TODO: close connections
510
        # self.server.server_close()
511
512 1
        self.log.info("Stopping API Server: %s", self._pool)
513 1
        self.api_server.stop_api_server()
514 1
        self.log.info("Stopping API Server threadpool: %s", self._pool)
515 1
        self._pool.shutdown(wait=graceful, cancel_futures=True)
516
        # Shutdown the TCP server and the main asyncio loop
517 1
        self.server.shutdown()
518
519 1
    def status(self):
520
        """Return status of Kytos Server.
521
522
        If the controller kytos is running this method will be returned
523
        "Running since 'Started_At'", otherwise "Stopped".
524
525
        Returns:
526
            string: String with kytos status.
527
528
        """
529 1
        if self.started_at:
530 1
            return f"Running since {self.started_at}"
531 1
        return "Stopped"
532
533 1
    def uptime(self):
534
        """Return the uptime of kytos server.
535
536
        This method should return:
537
            - 0 if Kytos Server is stopped.
538
            - (kytos.start_at - datetime.now) if Kytos Server is running.
539
540
        Returns:
541
           datetime.timedelta: The uptime interval.
542
543
        """
544 1
        return now() - self.started_at if self.started_at else 0
545
546 1
    def notify_listeners(self, event):
547
        """Send the event to the specified listeners.
548
549
        Loops over self.events_listeners matching (by regexp) the attribute
550
        name of the event with the keys of events_listeners. If a match occurs,
551
        then send the event to each registered listener.
552
553
        Args:
554
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
555
        """
556
557 1
        self.log.debug("looking for listeners for %s", event)
558 1
        for event_regex, listeners in dict(self.events_listeners).items():
559
            # self.log.debug("listeners found for %s: %r => %s", event,
560
            #                event_regex, [l.__qualname__ for l in listeners])
561
            # Do not match if the event has more characters
562
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
563 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
564 1
                event_regex += '$'
565 1
            if re.match(event_regex, event.name):
566 1
                self.log.debug('Calling listeners for %s', event)
567 1
                for listener in listeners:
568 1
                    if asyncio.iscoroutinefunction(listener):
569
                        task = asyncio.create_task(listener(event))
570
                        self._alisten_tasks.add(task)
571
                        task.add_done_callback(self._alisten_tasks.discard)
572
                    else:
573 1
                        listener(event)
574
575 1
    async def event_handler(self, buffer_name: str):
576
        """Default event handler that gets from an event buffer."""
577 1
        event_buffer = getattr(self.buffers, buffer_name)
578 1
        self.log.info(f"Event handler {buffer_name} started")
579
        while True:
580 1
            event = await event_buffer.aget()
581 1
            self.notify_listeners(event)
582
583 1
            if event.name == "kytos/core.shutdown":
584 1
                self.log.debug(f"Event handler {buffer_name} stopped")
585 1
                break
586
587 1
    async def publish_connection_error(self, event):
588
        """Publish connection error event.
589
590
        Args:
591
            event (KytosEvent): Event that triggered for error propagation.
592
        """
593
        event.name = \
594
            f"kytos/core.{event.destination.protocol.name}.connection.error"
595
        error_msg = f"Connection state: {event.destination.state}"
596
        event.content["exception"] = error_msg
597
        await self.buffers.app.aput(event)
598
599 1
    async def msg_out_event_handler(self):
600
        """Handle msg_out events.
601
602
        Listen to the msg_out buffer and send all its events to the
603
        corresponding listeners.
604
        """
605 1
        self.log.info("Event handler msg_out started")
606
        while True:
607 1
            triggered_event = await self.buffers.msg_out.aget()
608
609 1
            if triggered_event.name == "kytos/core.shutdown":
610 1
                self.log.debug("Message Out Event handler stopped")
611 1
                break
612
613 1
            message = triggered_event.content['message']
614 1
            destination = triggered_event.destination
615 1
            try:
616 1
                if (destination and
617
                        not destination.state == ConnectionState.FINISHED):
618 1
                    packet = message.pack()
619 1
                    destination.send(packet)
620 1
                    self.log.debug('Connection %s: OUT OFP, '
621
                                   'version: %s, type: %s, xid: %s - %s',
622
                                   destination.id,
623
                                   message.header.version,
624
                                   message.header.message_type,
625
                                   message.header.xid,
626
                                   packet.hex())
627 1
                    self.notify_listeners(triggered_event)
628 1
                    continue
629
630
            except (OSError, SocketError):
631
                pass
632
633
            await self.publish_connection_error(triggered_event)
634
            self.log.info("connection closed. Cannot send message")
635
636 1
    def get_interface_by_id(self, interface_id):
637
        """Find a Interface  with interface_id.
638
639
        Args:
640
            interface_id(str): Interface Identifier.
641
642
        Returns:
643
            Interface: Instance of Interface with the id given.
644
645
        """
646 1
        if interface_id is None:
647 1
            return None
648
649 1
        switch_id = ":".join(interface_id.split(":")[:-1])
650 1
        interface_number = int(interface_id.split(":")[-1])
651
652 1
        switch = self.switches.get(switch_id)
653
654 1
        if not switch:
655 1
            return None
656
657 1
        return switch.interfaces.get(interface_number, None)
658
659 1
    def get_switch_by_dpid(self, dpid):
660
        """Return a specific switch by dpid.
661
662
        Args:
663
            dpid (|DPID|): dpid object used to identify a switch.
664
665
        Returns:
666
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
667
668
        """
669 1
        return self.switches.get(dpid)
670
671 1
    def get_switch_or_create(self, dpid, connection=None):
672
        """Return switch or create it if necessary.
673
674
        Args:
675
            dpid (|DPID|): dpid object used to identify a switch.
676
            connection (:class:`~kytos.core.connection.Connection`):
677
                connection used by switch. If a switch has a connection that
678
                will be updated.
679
680
        Returns:
681
            :class:`~kytos.core.switch.Switch`: new or existent switch.
682
683
        """
684 1
        with self._switches_lock:
685 1
            if connection:
686 1
                self.create_or_update_connection(connection)
687
688 1
            switch = self.get_switch_by_dpid(dpid)
689 1
            event_name = 'kytos/core.switch.'
690
691 1
            if switch is None:
692 1
                switch = Switch(dpid=dpid)
693 1
                self.add_new_switch(switch)
694 1
                event_name += 'new'
695
            else:
696 1
                event_name += 'reconnected'
697
698 1
            self.set_switch_options(dpid=dpid)
699 1
            event = KytosEvent(name=event_name, content={'switch': switch})
700
701 1
            if connection:
702 1
                old_connection = switch.connection
703 1
                switch.update_connection(connection)
704
705 1
                if old_connection is not connection:
706 1
                    self.remove_connection(old_connection)
707
708 1
            self.buffers.app.put(event)
709
710 1
            return switch
711
712 1
    def set_switch_options(self, dpid):
713
        """Update the switch settings based on kytos.conf options.
714
715
        Args:
716
            dpid (str): dpid used to identify a switch.
717
718
        """
719 1
        switch = self.switches.get(dpid)
720 1
        if not switch:
721
            return
722
723 1
        vlan_pool = {}
724 1
        vlan_pool = self.options.vlan_pool
725 1
        if not vlan_pool:
726 1
            return
727
728 1
        if vlan_pool.get(dpid):
729 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
730 1
            for intf_num, port_list in vlan_pool[dpid].items():
731 1
                if not switch.interfaces.get((intf_num)):
732 1
                    vlan_ids = set()
733 1
                    for vlan_range in port_list:
734 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
735 1
                        for vlan_id in range(vlan_begin, vlan_end):
736 1
                            vlan_ids.add(vlan_id)
737 1
                    intf_num = int(intf_num)
738 1
                    intf = Interface(name=intf_num, port_number=intf_num,
739
                                     switch=switch)
740 1
                    intf.set_available_tags(vlan_ids)
741 1
                    switch.update_interface(intf)
742
743 1
    def create_or_update_connection(self, connection):
744
        """Update a connection.
745
746
        Args:
747
            connection (:class:`~kytos.core.connection.Connection`):
748
                Instance of connection that will be updated.
749
        """
750 1
        self.connections[connection.id] = connection
751
752 1
    def get_connection_by_id(self, conn_id):
753
        """Return a existent connection by id.
754
755
        Args:
756
            id (int): id from a connection.
757
758
        Returns:
759
            :class:`~kytos.core.connection.Connection`: Instance of connection
760
                or None Type.
761
762
        """
763 1
        return self.connections.get(conn_id)
764
765 1
    def remove_connection(self, connection):
766
        """Close a existent connection and remove it.
767
768
        Args:
769
            connection (:class:`~kytos.core.connection.Connection`):
770
                Instance of connection that will be removed.
771
        """
772 1
        if connection is None:
773 1
            return False
774
775 1
        try:
776 1
            connection.close()
777 1
            del self.connections[connection.id]
778 1
        except KeyError:
779 1
            return False
780 1
        return True
781
782 1
    def remove_switch(self, switch):
783
        """Remove an existent switch.
784
785
        Args:
786
            switch (:class:`~kytos.core.switch.Switch`):
787
                Instance of switch that will be removed.
788
        """
789 1
        try:
790 1
            del self.switches[switch.dpid]
791 1
        except KeyError:
792 1
            return False
793 1
        return True
794
795 1
    def new_connection(self, event):
796
        """Handle a kytos/core.connection.new event.
797
798
        This method will read new connection event and store the connection
799
        (socket) into the connections attribute on the controller.
800
801
        It also clear all references to the connection since it is a new
802
        connection on the same ip:port.
803
804
        Args:
805
            event (~kytos.core.KytosEvent):
806
                The received event (``kytos/core.connection.new``) with the
807
                needed info.
808
        """
809 1
        self.log.info("Handling %s...", event)
810
811 1
        connection = event.source
812 1
        self.log.debug("Event source: %s", event.source)
813
814
        # Remove old connection (aka cleanup) if it exists
815 1
        if self.get_connection_by_id(connection.id):
816
            self.remove_connection(connection.id)
817
818
        # Update connections with the new connection
819 1
        self.create_or_update_connection(connection)
820
821 1
    def add_new_switch(self, switch):
822
        """Add a new switch on the controller.
823
824
        Args:
825
            switch (Switch): A Switch object
826
        """
827 1
        self.switches[switch.dpid] = switch
828
829 1
    def _import_napp(self, username, napp_name):
830
        """Import a NApp module.
831
832
        Raises:
833
            FileNotFoundError: if NApp's main.py is not found.
834
            ModuleNotFoundError: if any NApp requirement is not installed.
835
836
        """
837 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
838 1
        path = os.path.join(self.options.napps, username, napp_name,
839
                            'main.py')
840 1
        napp_spec = spec_from_file_location(mod_name, path)
841 1
        napp_module = module_from_spec(napp_spec)
842 1
        sys.modules[napp_spec.name] = napp_module
843 1
        napp_spec.loader.exec_module(napp_module)
844 1
        return napp_module
845
846 1
    def load_napp(self, username, napp_name):
847
        """Load a single NApp.
848
849
        Args:
850
            username (str): NApp username (makes up NApp's path).
851
            napp_name (str): Name of the NApp to be loaded.
852
        """
853 1
        if (username, napp_name) in self.napps:
854 1
            message = 'NApp %s/%s was already loaded'
855 1
            self.log.warning(message, username, napp_name)
856 1
            return
857
858 1
        try:
859 1
            napp_module = self._import_napp(username, napp_name)
860 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
861 1
            self.log.error("Error loading NApp '%s/%s': %s",
862
                           username, napp_name, err)
863 1
            return
864 1
        except FileNotFoundError as err:
865 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
866 1
            self.log.warning(msg, err.filename)
867 1
            return
868
869 1
        try:
870 1
            napp = napp_module.Main(controller=self)
871 1
        except:  # noqa pylint: disable=bare-except
872 1
            self.log.critical("NApp initialization failed: %s/%s",
873
                              username, napp_name, exc_info=True)
874 1
            return
875
876 1
        self.napps[(username, napp_name)] = napp
877
878 1
        napp.start()
879 1
        self.api_server.authenticate_endpoints(napp)
880 1
        self.api_server.register_napp_endpoints(napp)
881
882
        # pylint: disable=protected-access
883 1
        for event, listeners in napp._listeners.items():
884
            self.events_listeners.setdefault(event, []).extend(listeners)
885
        # pylint: enable=protected-access
886
887 1
    def pre_install_napps(self, napps, enable=True):
888
        """Pre install and enable NApps.
889
890
        Before installing, it'll check if it's installed yet.
891
892
        Args:
893
            napps ([str]): List of NApps to be pre-installed and enabled.
894
        """
895 1
        all_napps = self.napps_manager.get_installed_napps()
896 1
        installed = [str(napp) for napp in all_napps]
897 1
        napps_diff = [napp for napp in napps if napp not in installed]
898 1
        for napp in napps_diff:
899 1
            self.napps_manager.install(napp, enable=enable)
900
901 1
    def load_napps(self):
902
        """Load all NApps enabled on the NApps dir."""
903 1
        for napp in self.napps_manager.get_enabled_napps():
904 1
            try:
905 1
                self.log.info("Loading NApp %s", napp.id)
906 1
                self.load_napp(napp.username, napp.name)
907
            except FileNotFoundError as exception:
908
                self.log.error("Could not load NApp %s: %s",
909
                               napp.id, exception)
910
911 1
    def unload_napp(self, username, napp_name):
912
        """Unload a specific NApp.
913
914
        Args:
915
            username (str): NApp username.
916
            napp_name (str): Name of the NApp to be unloaded.
917
        """
918 1
        napp = self.napps.pop((username, napp_name), None)
919
920 1
        if napp is None:
921
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
922
        else:
923 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
924 1
            napp_id = NApp(username, napp_name).id
925 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
926 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
927
            # Call listener before removing it from events_listeners
928 1
            napp_shutdown_fn(event)
929
930
            # Remove rest endpoints from that napp
931 1
            self.api_server.remove_napp_endpoints(napp)
932
933
            # Removing listeners from that napp
934
            # pylint: disable=protected-access
935 1
            for event_type, napp_listeners in napp._listeners.items():
936
                event_listeners = self.events_listeners[event_type]
937
                for listener in napp_listeners:
938
                    event_listeners.remove(listener)
939
                if not event_listeners:
940
                    del self.events_listeners[event_type]
941
            # pylint: enable=protected-access
942
943 1
    def unload_napps(self):
944
        """Unload all loaded NApps that are not core NApps
945
946
        NApps are unloaded in the reverse order that they are enabled to
947
        facilitate to shutdown gracefully.
948
        """
949 1
        for napp in reversed(self.napps_manager.get_enabled_napps()):
950 1
            self.unload_napp(napp.username, napp.name)
951
952 1
    def reload_napp_module(self, username, napp_name, napp_file):
953
        """Reload a NApp Module."""
954 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
955 1
        try:
956 1
            napp_module = import_module(mod_name)
957 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
958 1
            self.log.error("Module '%s' not found", mod_name)
959 1
            raise
960 1
        try:
961 1
            napp_module = reload_module(napp_module)
962 1
        except ImportError as err:
963 1
            self.log.error("Error reloading NApp '%s/%s': %s",
964
                           username, napp_name, err)
965 1
            raise
966
967 1
    def reload_napp(self, username, napp_name):
968
        """Reload a NApp."""
969 1
        self.unload_napp(username, napp_name)
970 1
        try:
971 1
            self.reload_napp_module(username, napp_name, 'settings')
972 1
            self.reload_napp_module(username, napp_name, 'main')
973 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
974 1
            return 400
975 1
        self.log.info("NApp '%s/%s' successfully reloaded",
976
                      username, napp_name)
977 1
        self.load_napp(username, napp_name)
978 1
        return 200
979
980 1
    def rest_reload_napp(self, username, napp_name):
981
        """Request reload a NApp."""
982 1
        res = self.reload_napp(username, napp_name)
983 1
        return 'reloaded', res
984
985 1
    def rest_reload_all_napps(self):
986
        """Request reload all NApps."""
987 1
        for napp in self.napps:
988 1
            self.reload_napp(*napp)
989
        return 'reloaded', 200
990