Passed
Pull Request — master (#284)
by Vinicius
07:43
created

kytos.core.controller.Controller._import_napp()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 9
nop 3
dl 0
loc 16
rs 9.95
c 0
b 0
f 0
ccs 8
cts 8
cp 1
crap 1
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.core.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from kytos.core.api_server import APIServer
33 1
from kytos.core.apm import init_apm
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executors, now
44 1
from kytos.core.interface import Interface
45 1
from kytos.core.logs import LogManager
46 1
from kytos.core.napps.base import NApp
47 1
from kytos.core.napps.manager import NAppsManager
48 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 1
from kytos.core.switch import Switch
50
51 1
__all__ = ('Controller',)
52
53
54 1
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 1
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods,
81
    # pylint: disable=consider-using-with,unnecessary-dunder-call
82 1
    def __init__(self, options=None):
83
        """Init method of Controller class takes the parameters below.
84
85
        Args:
86
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
87
                instance of :class:`~kytos.core.config.KytosConfig` class.
88
        """
89 1
        if options is None:
90
            options = KytosConfig().options['daemon']
91
92 1
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 1
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 1
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 1
        self.buffers: KytosBuffers = None
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 1
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 1
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 1
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 1
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 1
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 1
        self.switches = {}  # dpid: Switch()
129 1
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 1
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 1
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 1
        self.napp_dir_listener = NAppDirListener(self)
139
140 1
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 1
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 1
        self.auth = Auth(self)
148 1
        self.dead_letter = DeadLetter(self)
149 1
        self._alisten_tasks = set()
150
151 1
        self._register_endpoints()
152
        #: Adding the napps 'enabled' directory into the PATH
153
        #: Now you can access the enabled napps with:
154
        #: from napps.<username>.<napp_name> import ?....
155 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
156 1
        sys.excepthook = exc_handler
157
158 1
    def enable_logs(self):
159
        """Register kytos log and enable the logs."""
160 1
        decorators = self.options.logger_decorators
161 1
        try:
162 1
            decorators = [self._resolve(deco) for deco in decorators]
163
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
164
            print(f'Failed to resolve decorator module: {err.name}')
165
            sys.exit(1)
166
        except AttributeError:
167
            print(f'Failed to resolve decorator name: {decorators}')
168
            sys.exit(1)
169 1
        LogManager.decorate_logger_class(*decorators)
170 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
171 1
        LogManager.enable_websocket(self.api_server.server)
172 1
        self.log = logging.getLogger(__name__)
173 1
        self._patch_core_loggers()
174
175 1
    @staticmethod
176 1
    def _resolve(name):
177
        """Resolve a dotted name to a global object."""
178
        mod, _, attr = name.rpartition('.')
179
        mod = importlib.import_module(mod)
180
        return getattr(mod, attr)
181
182 1
    @staticmethod
183 1
    def _patch_core_loggers():
184
        """Patch in updated loggers to 'kytos.core.*' modules"""
185 1
        match_str = 'kytos.core.'
186 1
        str_len = len(match_str)
187 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
188
                           if mod_name[:str_len] == match_str]
189 1
        for module in reloadable_mods:
190 1
            module.LOG = logging.getLogger(module.__name__)
191
192 1
    @staticmethod
193 1
    def loggers():
194
        """List all logging Loggers.
195
196
        Return a list of Logger objects, with name and logging level.
197
        """
198
        # pylint: disable=no-member
199 1
        return [logging.getLogger(name)
200
                for name in logging.root.manager.loggerDict
201
                if "kytos" in name]
202
203 1
    def toggle_debug(self, name=None):
204
        """Enable/disable logging debug messages to a given logger name.
205
206
        If the name parameter is not specified the debug will be
207
        enabled/disabled following the initial config file. It will decide
208
        to enable/disable using the 'kytos' name to find the current
209
        logger level.
210
        Obs: To disable the debug the logging will be set to NOTSET
211
212
        Args:
213
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
214
        """
215
        # pylint: disable=no-member
216 1
        if name and name not in logging.root.manager.loggerDict:
217
            # A Logger name that is not declared in logging will raise an error
218
            # otherwise logging would create a new Logger.
219 1
            raise ValueError(f"Invalid logger name: {name}")
220
221 1
        if not name:
222
            # Logger name not specified.
223 1
            level = logging.getLogger('kytos').getEffectiveLevel()
224 1
            enable_debug = level != logging.DEBUG
225
226
            # Enable/disable default Loggers
227 1
            LogManager.load_config_file(self.options.logging, enable_debug)
228 1
            return
229
230
        # Get effective logger level for the name
231 1
        level = logging.getLogger(name).getEffectiveLevel()
232 1
        logger = logging.getLogger(name)
233
234 1
        if level == logging.DEBUG:
235
            # disable debug
236 1
            logger.setLevel(logging.NOTSET)
237
        else:
238
            # enable debug
239 1
            logger.setLevel(logging.DEBUG)
240
241 1
    def start(self, restart=False):
242
        """Create pidfile and call start_controller method."""
243 1
        self.enable_logs()
244 1
        if self.options.database:
245 1
            self.db_conn_or_core_shutdown()
246 1
        if self.options.apm:
247 1
            self.init_apm_or_core_shutdown()
248 1
        if not restart:
249 1
            self.create_pidfile()
250 1
        self.start_controller()
251
252 1
    def create_pidfile(self):
253
        """Create a pidfile."""
254 1
        pid = os.getpid()
255
256
        # Creates directory if it doesn't exist
257
        # System can erase /var/run's content
258 1
        pid_folder = Path(self.options.pidfile).parent
259 1
        self.log.info(pid_folder)
260
261
        # Pylint incorrectly infers Path objects
262
        # https://github.com/PyCQA/pylint/issues/224
263
        # pylint: disable=no-member
264 1
        if not pid_folder.exists():
265
            pid_folder.mkdir()
266
            pid_folder.chmod(0o1777)
267
        # pylint: enable=no-member
268
269
        # Make sure the file is deleted when controller stops
270 1
        atexit.register(Path(self.options.pidfile).unlink)
271
272
        # Checks if a pidfile exists. Creates a new file.
273 1
        try:
274 1
            pidfile = open(self.options.pidfile, mode='x', encoding="utf8")
275 1
        except OSError:
276
            # This happens if there is a pidfile already.
277
            # We shall check if the process that created the pidfile is still
278
            # running.
279 1
            try:
280 1
                existing_file = open(self.options.pidfile, mode='r',
281
                                     encoding="utf8")
282 1
                old_pid = int(existing_file.read())
283 1
                os.kill(old_pid, 0)
284
                # If kill() doesn't return an error, there's a process running
285
                # with the same PID. We assume it is Kytos and quit.
286
                # Otherwise, overwrite the file and proceed.
287
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
288
                             "running. Aborting.")
289
                sys.exit(error_msg.format(self.options.pidfile))
290 1
            except OSError:
291 1
                try:
292 1
                    pidfile = open(self.options.pidfile, mode='w',
293
                                   encoding="utf8")
294
                except OSError as exception:
295
                    error_msg = "Failed to create pidfile {}: {}."
296
                    sys.exit(error_msg.format(self.options.pidfile, exception))
297
298
        # Identifies the process that created the pidfile.
299 1
        pidfile.write(str(pid))
300 1
        pidfile.close()
301
302 1
    def start_controller(self):
303
        """Start the controller.
304
305
        Starts the KytosServer (TCP Server) coroutine.
306
        Starts a thread for each buffer handler.
307
        Load the installed apps.
308
        """
309 1
        self.log.info("Starting Kytos - Kytos Controller")
310 1
        self.buffers = KytosBuffers()
311 1
        self.server = KytosServer((self.options.listen,
312
                                   int(self.options.port)),
313
                                  KytosServerProtocol,
314
                                  self,
315
                                  self.options.protocol_name)
316
317 1
        self.log.info("Starting TCP server: %s", self.server)
318 1
        self.server.serve_forever()
319
320 1
        def _stop_loop(_):
321
            loop = asyncio.get_running_loop()
322
            # print(_.result())
323
            threads = threading.enumerate()
324
            self.log.debug("%s threads before loop.stop: %s",
325
                           len(threads), threads)
326
            loop.stop()
327
328 1
        async def _run_api_server_thread(executor):
329
            log = logging.getLogger('kytos.core.controller.api_server_thread')
330
            log.debug('starting')
331
            # log.debug('creating tasks')
332
            loop = asyncio.get_running_loop()
333
            blocking_tasks = [
334
                loop.run_in_executor(executor, self.api_server.run)
335
            ]
336
            # log.debug('waiting for tasks')
337
            completed, pending = await asyncio.wait(blocking_tasks)
338
            # results = [t.result() for t in completed]
339
            # log.debug('results: {!r}'.format(results))
340
            log.debug('completed: %d, pending: %d',
341
                      len(completed), len(pending))
342
343 1
        loop = asyncio.get_running_loop()
344 1
        task = loop.create_task(self.event_handler("conn"))
345 1
        self._tasks.append(task)
346 1
        task = loop.create_task(self.event_handler("raw"))
347 1
        self._tasks.append(task)
348 1
        task = loop.create_task(self.event_handler("msg_in"))
349 1
        self._tasks.append(task)
350 1
        task = loop.create_task(self.msg_out_event_handler())
351 1
        self._tasks.append(task)
352 1
        task = loop.create_task(self.event_handler("app"))
353 1
        self._tasks.append(task)
354 1
        task = loop.create_task(_run_api_server_thread(self._pool))
355 1
        task.add_done_callback(_stop_loop)
356 1
        self._tasks.append(task)
357
358 1
        self.log.info("ThreadPool started: %s", self._pool)
359
360
        # ASYNC TODO: ensure all threads started correctly
361
        # This is critical, if any of them failed starting we should exit.
362
        # sys.exit(error_msg.format(thread, exception))
363
364 1
        self.log.info("Loading Kytos NApps...")
365 1
        self.napp_dir_listener.start()
366 1
        self.pre_install_napps(self.options.napps_pre_installed)
367 1
        self.load_napps()
368
369 1
        self.started_at = now()
370
371 1
    def db_conn_or_core_shutdown(self):
372
        """Ensure db connection or core shutdown."""
373 1
        try:
374 1
            db_conn_wait(db_backend=self.options.database)
375 1
        except KytosDBInitException as exc:
376 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
377
378 1
    def init_apm_or_core_shutdown(self, **kwargs):
379
        """Init APM instrumentation or core shutdown."""
380
        if not self.options.apm:
381
            return
382
        try:
383
            init_apm(self.options.apm, app=self.api_server.app,
384
                     **kwargs)
385
        except KytosAPMInitException as exc:
386
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
387
388 1
    def _register_endpoints(self):
389
        """Register all rest endpoint served by kytos.
390
391
        -   Register APIServer endpoints
392
        -   Register WebUI endpoints
393
        -   Register ``/api/kytos/core/config`` endpoint
394
        """
395 1
        self.api_server.start_api()
396
        # Register controller endpoints as /api/kytos/core/...
397 1
        self.api_server.register_core_endpoint('config/',
398
                                               self.configuration_endpoint)
399 1
        self.api_server.register_core_endpoint('metadata/',
400
                                               Controller.metadata_endpoint)
401 1
        self.api_server.register_core_endpoint(
402
            'reload/<username>/<napp_name>/',
403
            self.rest_reload_napp)
404 1
        self.api_server.register_core_endpoint('reload/all',
405
                                               self.rest_reload_all_napps)
406 1
        self.auth.register_core_auth_services()
407 1
        self.dead_letter.register_endpoints()
408
409 1
    def register_rest_endpoint(self, url, function, methods):
410
        """Deprecate in favor of @rest decorator."""
411 1
        self.api_server.register_rest_endpoint(url, function, methods)
412
413 1
    @classmethod
414 1
    def metadata_endpoint(cls):
415
        """Return the Kytos metadata.
416
417
        Returns:
418
            string: Json with current kytos metadata.
419
420
        """
421 1
        meta_path = f"{os.path.dirname(__file__)}/metadata.py"
422 1
        with open(meta_path, encoding="utf8") as file:
423 1
            meta_file = file.read()
424 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
425 1
        return json.dumps(metadata)
426
427 1
    def configuration_endpoint(self):
428
        """Return the configuration options used by Kytos.
429
430
        Returns:
431
            string: Json with current configurations used by kytos.
432
433
        """
434 1
        return json.dumps(self.options.__dict__)
435
436 1
    def restart(self, graceful=True):
437
        """Restart Kytos SDN Controller.
438
439
        Args:
440
            graceful(bool): Represents the way that Kytos will restart.
441
        """
442 1
        if self.started_at is not None:
443 1
            self.stop(graceful)
444 1
            self.__init__(self.options)
445
446 1
        self.start(restart=True)
447
448 1
    def stop(self, graceful=True):
449
        """Shutdown all services used by kytos.
450
451
        This method should:
452
            - stop all Websockets
453
            - stop the API Server
454
            - stop the Controller
455
        """
456 1
        if self.started_at:
457 1
            self.stop_controller(graceful)
458
459 1
    def stop_controller(self, graceful=True):
460
        """Stop the controller.
461
462
        This method should:
463
            - announce on the network that the controller will shutdown;
464
            - stop receiving incoming packages;
465
            - call the 'shutdown' method of each KytosNApp that is running;
466
            - finish reading the events on all buffers;
467
            - stop each running handler;
468
            - stop all running threads;
469
            - stop the KytosServer;
470
        """
471 1
        self.log.info("Stopping Kytos")
472
473 1
        self.buffers.send_stop_signal()
474 1
        self.napp_dir_listener.stop()
475
476 1
        for pool_name in executors:
477 1
            self.log.info("Stopping threadpool: %s", pool_name)
478
479 1
        threads = threading.enumerate()
480 1
        self.log.debug("%s threads before threadpool shutdown: %s",
481
                       len(threads), threads)
482
483 1
        for executor_pool in executors.values():
484 1
            executor_pool.shutdown(wait=graceful, cancel_futures=True)
485
486
        # self.server.socket.shutdown()
487
        # self.server.socket.close()
488
489
        # for thread in self._threads.values():
490
        #     self.log.info("Stopping thread: %s", thread.name)
491
        #     thread.join()
492
493
        # for thread in self._threads.values():
494
        #     while thread.is_alive():
495
        #         self.log.info("Thread is alive: %s", thread.name)
496
        #         pass
497
498 1
        self.started_at = None
499 1
        self.unload_napps()
500 1
        self.buffers = KytosBuffers()
501
502
        # Cancel all async tasks (event handlers and servers)
503 1
        for task in self._tasks:
504
            task.cancel()
505
506
        # ASYNC TODO: close connections
507
        # self.server.server_close()
508
509 1
        self.log.info("Stopping API Server: %s", self._pool)
510 1
        self.api_server.stop_api_server()
511 1
        self.log.info("Stopping API Server threadpool: %s", self._pool)
512 1
        self._pool.shutdown(wait=graceful, cancel_futures=True)
513
        # Shutdown the TCP server and the main asyncio loop
514 1
        self.server.shutdown()
515
516 1
    def status(self):
517
        """Return status of Kytos Server.
518
519
        If the controller kytos is running this method will be returned
520
        "Running since 'Started_At'", otherwise "Stopped".
521
522
        Returns:
523
            string: String with kytos status.
524
525
        """
526 1
        if self.started_at:
527 1
            return f"Running since {self.started_at}"
528 1
        return "Stopped"
529
530 1
    def uptime(self):
531
        """Return the uptime of kytos server.
532
533
        This method should return:
534
            - 0 if Kytos Server is stopped.
535
            - (kytos.start_at - datetime.now) if Kytos Server is running.
536
537
        Returns:
538
           datetime.timedelta: The uptime interval.
539
540
        """
541 1
        return now() - self.started_at if self.started_at else 0
542
543 1
    def notify_listeners(self, event):
544
        """Send the event to the specified listeners.
545
546
        Loops over self.events_listeners matching (by regexp) the attribute
547
        name of the event with the keys of events_listeners. If a match occurs,
548
        then send the event to each registered listener.
549
550
        Args:
551
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
552
        """
553
554 1
        self.log.debug("looking for listeners for %s", event)
555 1
        for event_regex, listeners in dict(self.events_listeners).items():
556
            # self.log.debug("listeners found for %s: %r => %s", event,
557
            #                event_regex, [l.__qualname__ for l in listeners])
558
            # Do not match if the event has more characters
559
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
560 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
561 1
                event_regex += '$'
562 1
            if re.match(event_regex, event.name):
563 1
                self.log.debug('Calling listeners for %s', event)
564 1
                for listener in listeners:
565 1
                    if asyncio.iscoroutinefunction(listener):
566
                        task = asyncio.create_task(listener(event))
567
                        self._alisten_tasks.add(task)
568
                        task.add_done_callback(self._alisten_tasks.discard)
569
                    else:
570 1
                        listener(event)
571
572 1
    async def event_handler(self, buffer_name: str):
573
        """Default event handler that gets from an event buffer."""
574 1
        event_buffer = getattr(self.buffers, buffer_name)
575
        while True:
576 1
            event = await event_buffer.aget()
577 1
            self.notify_listeners(event)
578
579 1
            if event.name == "kytos/core.shutdown":
580 1
                self.log.debug(f"Event handler {buffer_name} stopped")
581 1
                break
582
583 1
    async def publish_connection_error(self, event):
584
        """Publish connection error event.
585
586
        Args:
587
            event (KytosEvent): Event that triggered for error propagation.
588
        """
589
        event.name = \
590
            f"kytos/core.{event.destination.protocol.name}.connection.error"
591
        error_msg = f"Connection state: {event.destination.state}"
592
        event.content["exception"] = error_msg
593
        await self.buffers.app.aput(event)
594
595 1
    async def msg_out_event_handler(self):
596
        """Handle msg_out events.
597
598
        Listen to the msg_out buffer and send all its events to the
599
        corresponding listeners.
600
        """
601 1
        self.log.info("Message Out Event Handler started")
602
        while True:
603 1
            triggered_event = await self.buffers.msg_out.aget()
604
605 1
            if triggered_event.name == "kytos/core.shutdown":
606 1
                self.log.debug("Message Out Event handler stopped")
607 1
                break
608
609 1
            message = triggered_event.content['message']
610 1
            destination = triggered_event.destination
611 1
            try:
612 1
                if (destination and
613
                        not destination.state == ConnectionState.FINISHED):
614 1
                    packet = message.pack()
615 1
                    destination.send(packet)
616 1
                    self.log.debug('Connection %s: OUT OFP, '
617
                                   'version: %s, type: %s, xid: %s - %s',
618
                                   destination.id,
619
                                   message.header.version,
620
                                   message.header.message_type,
621
                                   message.header.xid,
622
                                   packet.hex())
623 1
                    self.notify_listeners(triggered_event)
624 1
                    continue
625
626
            except (OSError, SocketError):
627
                pass
628
629
            await self.publish_connection_error(triggered_event)
630
            self.log.info("connection closed. Cannot send message")
631
632 1
    def get_interface_by_id(self, interface_id):
633
        """Find a Interface  with interface_id.
634
635
        Args:
636
            interface_id(str): Interface Identifier.
637
638
        Returns:
639
            Interface: Instance of Interface with the id given.
640
641
        """
642 1
        if interface_id is None:
643 1
            return None
644
645 1
        switch_id = ":".join(interface_id.split(":")[:-1])
646 1
        interface_number = int(interface_id.split(":")[-1])
647
648 1
        switch = self.switches.get(switch_id)
649
650 1
        if not switch:
651 1
            return None
652
653 1
        return switch.interfaces.get(interface_number, None)
654
655 1
    def get_switch_by_dpid(self, dpid):
656
        """Return a specific switch by dpid.
657
658
        Args:
659
            dpid (|DPID|): dpid object used to identify a switch.
660
661
        Returns:
662
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
663
664
        """
665 1
        return self.switches.get(dpid)
666
667 1
    def get_switch_or_create(self, dpid, connection=None):
668
        """Return switch or create it if necessary.
669
670
        Args:
671
            dpid (|DPID|): dpid object used to identify a switch.
672
            connection (:class:`~kytos.core.connection.Connection`):
673
                connection used by switch. If a switch has a connection that
674
                will be updated.
675
676
        Returns:
677
            :class:`~kytos.core.switch.Switch`: new or existent switch.
678
679
        """
680 1
        with self._switches_lock:
681 1
            if connection:
682 1
                self.create_or_update_connection(connection)
683
684 1
            switch = self.get_switch_by_dpid(dpid)
685 1
            event_name = 'kytos/core.switch.'
686
687 1
            if switch is None:
688 1
                switch = Switch(dpid=dpid)
689 1
                self.add_new_switch(switch)
690 1
                event_name += 'new'
691
            else:
692 1
                event_name += 'reconnected'
693
694 1
            self.set_switch_options(dpid=dpid)
695 1
            event = KytosEvent(name=event_name, content={'switch': switch})
696
697 1
            if connection:
698 1
                old_connection = switch.connection
699 1
                switch.update_connection(connection)
700
701 1
                if old_connection is not connection:
702 1
                    self.remove_connection(old_connection)
703
704 1
            self.buffers.app.put(event)
705
706 1
            return switch
707
708 1
    def set_switch_options(self, dpid):
709
        """Update the switch settings based on kytos.conf options.
710
711
        Args:
712
            dpid (str): dpid used to identify a switch.
713
714
        """
715 1
        switch = self.switches.get(dpid)
716 1
        if not switch:
717
            return
718
719 1
        vlan_pool = {}
720 1
        vlan_pool = self.options.vlan_pool
721 1
        if not vlan_pool:
722 1
            return
723
724 1
        if vlan_pool.get(dpid):
725 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
726 1
            for intf_num, port_list in vlan_pool[dpid].items():
727 1
                if not switch.interfaces.get((intf_num)):
728 1
                    vlan_ids = set()
729 1
                    for vlan_range in port_list:
730 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
731 1
                        for vlan_id in range(vlan_begin, vlan_end):
732 1
                            vlan_ids.add(vlan_id)
733 1
                    intf_num = int(intf_num)
734 1
                    intf = Interface(name=intf_num, port_number=intf_num,
735
                                     switch=switch)
736 1
                    intf.set_available_tags(vlan_ids)
737 1
                    switch.update_interface(intf)
738
739 1
    def create_or_update_connection(self, connection):
740
        """Update a connection.
741
742
        Args:
743
            connection (:class:`~kytos.core.connection.Connection`):
744
                Instance of connection that will be updated.
745
        """
746 1
        self.connections[connection.id] = connection
747
748 1
    def get_connection_by_id(self, conn_id):
749
        """Return a existent connection by id.
750
751
        Args:
752
            id (int): id from a connection.
753
754
        Returns:
755
            :class:`~kytos.core.connection.Connection`: Instance of connection
756
                or None Type.
757
758
        """
759 1
        return self.connections.get(conn_id)
760
761 1
    def remove_connection(self, connection):
762
        """Close a existent connection and remove it.
763
764
        Args:
765
            connection (:class:`~kytos.core.connection.Connection`):
766
                Instance of connection that will be removed.
767
        """
768 1
        if connection is None:
769 1
            return False
770
771 1
        try:
772 1
            connection.close()
773 1
            del self.connections[connection.id]
774 1
        except KeyError:
775 1
            return False
776 1
        return True
777
778 1
    def remove_switch(self, switch):
779
        """Remove an existent switch.
780
781
        Args:
782
            switch (:class:`~kytos.core.switch.Switch`):
783
                Instance of switch that will be removed.
784
        """
785 1
        try:
786 1
            del self.switches[switch.dpid]
787 1
        except KeyError:
788 1
            return False
789 1
        return True
790
791 1
    def new_connection(self, event):
792
        """Handle a kytos/core.connection.new event.
793
794
        This method will read new connection event and store the connection
795
        (socket) into the connections attribute on the controller.
796
797
        It also clear all references to the connection since it is a new
798
        connection on the same ip:port.
799
800
        Args:
801
            event (~kytos.core.KytosEvent):
802
                The received event (``kytos/core.connection.new``) with the
803
                needed info.
804
        """
805 1
        self.log.info("Handling %s...", event)
806
807 1
        connection = event.source
808 1
        self.log.debug("Event source: %s", event.source)
809
810
        # Remove old connection (aka cleanup) if it exists
811 1
        if self.get_connection_by_id(connection.id):
812
            self.remove_connection(connection.id)
813
814
        # Update connections with the new connection
815 1
        self.create_or_update_connection(connection)
816
817 1
    def add_new_switch(self, switch):
818
        """Add a new switch on the controller.
819
820
        Args:
821
            switch (Switch): A Switch object
822
        """
823 1
        self.switches[switch.dpid] = switch
824
825 1
    def _import_napp(self, username, napp_name):
826
        """Import a NApp module.
827
828
        Raises:
829
            FileNotFoundError: if NApp's main.py is not found.
830
            ModuleNotFoundError: if any NApp requirement is not installed.
831
832
        """
833 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
834 1
        path = os.path.join(self.options.napps, username, napp_name,
835
                            'main.py')
836 1
        napp_spec = spec_from_file_location(mod_name, path)
837 1
        napp_module = module_from_spec(napp_spec)
838 1
        sys.modules[napp_spec.name] = napp_module
839 1
        napp_spec.loader.exec_module(napp_module)
840 1
        return napp_module
841
842 1
    def load_napp(self, username, napp_name):
843
        """Load a single NApp.
844
845
        Args:
846
            username (str): NApp username (makes up NApp's path).
847
            napp_name (str): Name of the NApp to be loaded.
848
        """
849 1
        if (username, napp_name) in self.napps:
850 1
            message = 'NApp %s/%s was already loaded'
851 1
            self.log.warning(message, username, napp_name)
852 1
            return
853
854 1
        try:
855 1
            napp_module = self._import_napp(username, napp_name)
856 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
857 1
            self.log.error("Error loading NApp '%s/%s': %s",
858
                           username, napp_name, err)
859 1
            return
860 1
        except FileNotFoundError as err:
861 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
862 1
            self.log.warning(msg, err.filename)
863 1
            return
864
865 1
        try:
866 1
            napp = napp_module.Main(controller=self)
867 1
        except:  # noqa pylint: disable=bare-except
868 1
            self.log.critical("NApp initialization failed: %s/%s",
869
                              username, napp_name, exc_info=True)
870 1
            return
871
872 1
        self.napps[(username, napp_name)] = napp
873
874 1
        napp.start()
875 1
        self.api_server.authenticate_endpoints(napp)
876 1
        self.api_server.register_napp_endpoints(napp)
877
878
        # pylint: disable=protected-access
879 1
        for event, listeners in napp._listeners.items():
880
            self.events_listeners.setdefault(event, []).extend(listeners)
881
        # pylint: enable=protected-access
882
883 1
    def pre_install_napps(self, napps, enable=True):
884
        """Pre install and enable NApps.
885
886
        Before installing, it'll check if it's installed yet.
887
888
        Args:
889
            napps ([str]): List of NApps to be pre-installed and enabled.
890
        """
891 1
        all_napps = self.napps_manager.get_installed_napps()
892 1
        installed = [str(napp) for napp in all_napps]
893 1
        napps_diff = [napp for napp in napps if napp not in installed]
894 1
        for napp in napps_diff:
895 1
            self.napps_manager.install(napp, enable=enable)
896
897 1
    def load_napps(self):
898
        """Load all NApps enabled on the NApps dir."""
899 1
        for napp in self.napps_manager.get_enabled_napps():
900 1
            try:
901 1
                self.log.info("Loading NApp %s", napp.id)
902 1
                self.load_napp(napp.username, napp.name)
903
            except FileNotFoundError as exception:
904
                self.log.error("Could not load NApp %s: %s",
905
                               napp.id, exception)
906
907 1
    def unload_napp(self, username, napp_name):
908
        """Unload a specific NApp.
909
910
        Args:
911
            username (str): NApp username.
912
            napp_name (str): Name of the NApp to be unloaded.
913
        """
914 1
        napp = self.napps.pop((username, napp_name), None)
915
916 1
        if napp is None:
917
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
918
        else:
919 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
920 1
            napp_id = NApp(username, napp_name).id
921 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
922 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
923
            # Call listener before removing it from events_listeners
924 1
            napp_shutdown_fn(event)
925
926
            # Remove rest endpoints from that napp
927 1
            self.api_server.remove_napp_endpoints(napp)
928
929
            # Removing listeners from that napp
930
            # pylint: disable=protected-access
931 1
            for event_type, napp_listeners in napp._listeners.items():
932
                event_listeners = self.events_listeners[event_type]
933
                for listener in napp_listeners:
934
                    event_listeners.remove(listener)
935
                if not event_listeners:
936
                    del self.events_listeners[event_type]
937
            # pylint: enable=protected-access
938
939 1
    def unload_napps(self):
940
        """Unload all loaded NApps that are not core NApps."""
941
        # list() is used here to avoid the error:
942
        # 'RuntimeError: dictionary changed size during iteration'
943
        # This is caused by looping over an dictionary while removing
944
        # items from it.
945
        for (username, napp_name) in list(self.napps.keys()):  # noqa
946
            self.unload_napp(username, napp_name)
947
948 1
    def reload_napp_module(self, username, napp_name, napp_file):
949
        """Reload a NApp Module."""
950 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
951 1
        try:
952 1
            napp_module = import_module(mod_name)
953 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
954 1
            self.log.error("Module '%s' not found", mod_name)
955 1
            raise
956 1
        try:
957 1
            napp_module = reload_module(napp_module)
958 1
        except ImportError as err:
959 1
            self.log.error("Error reloading NApp '%s/%s': %s",
960
                           username, napp_name, err)
961 1
            raise
962
963 1
    def reload_napp(self, username, napp_name):
964
        """Reload a NApp."""
965 1
        self.unload_napp(username, napp_name)
966 1
        try:
967 1
            self.reload_napp_module(username, napp_name, 'settings')
968 1
            self.reload_napp_module(username, napp_name, 'main')
969 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
970 1
            return 400
971 1
        self.log.info("NApp '%s/%s' successfully reloaded",
972
                      username, napp_name)
973 1
        self.load_napp(username, napp_name)
974 1
        return 200
975
976 1
    def rest_reload_napp(self, username, napp_name):
977
        """Request reload a NApp."""
978 1
        res = self.reload_napp(username, napp_name)
979 1
        return 'reloaded', res
980
981 1
    def rest_reload_all_napps(self):
982
        """Request reload all NApps."""
983 1
        for napp in self.napps:
984 1
            self.reload_napp(*napp)
985
        return 'reloaded', 200
986