Passed
Push — master ( ef0e11...1539d7 )
by Vinicius
03:40 queued 14s
created

Controller.publish_connection_error()   A

Complexity

Conditions 1

Size

Total Lines 11
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
eloc 6
nop 2
dl 0
loc 11
ccs 1
cts 5
cp 0.2
crap 1.512
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from kytos.core.api_server import APIServer
33 1
from kytos.core.apm import init_apm
34
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
35 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
36 1
from kytos.core.auth import Auth
37 1
from kytos.core.buffers import KytosBuffers
38 1
from kytos.core.config import KytosConfig
39 1
from kytos.core.connection import ConnectionState
40 1
from kytos.core.db import db_conn_wait
41 1
from kytos.core.dead_letter import DeadLetter
42 1
from kytos.core.events import KytosEvent
43 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
44 1
from kytos.core.helpers import executors, now
45 1
from kytos.core.interface import Interface
46 1
from kytos.core.logs import LogManager
47 1
from kytos.core.napps.base import NApp
48 1
from kytos.core.napps.manager import NAppsManager
49 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
50 1
from kytos.core.switch import Switch
51
52 1
__all__ = ('Controller',)
53
54
55 1
def exc_handler(_, exc, __):
56
    """Log uncaught exceptions.
57
58
    Args:
59
        exc_type (ignored): exception type
60
        exc: exception instance
61
        tb (ignored): traceback
62
    """
63
    logging.basicConfig(filename='errlog.log',
64
                        format='%(asctime)s:%(pathname)'
65
                        's:%(levelname)s:%(message)s')
66
    logging.exception('Uncaught Exception: %s', exc)
67
68
69 1
class Controller:
70
    """Main class of Kytos.
71
72
    The main responsibilities of this class are:
73
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
74
        - manage KytosNApps (install, load and unload);
75
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
76
        - manage which event should be sent to NApps methods;
77
        - manage the buffers handlers, considering one thread per handler.
78
    """
79
80
    # Created issue #568 for the disabled checks.
81
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
82 1
    def __init__(self, options=None, loop=None):
83
        """Init method of Controller class takes the parameters below.
84
85
        Args:
86
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
87
                instance of :class:`~kytos.core.config.KytosConfig` class.
88
        """
89 1
        if options is None:
90
            options = KytosConfig().options['daemon']
91
92 1
        self._loop = loop or asyncio.get_event_loop()
93 1
        self._pool = ThreadPoolExecutor(max_workers=1)
94
95
        # asyncio tasks
96 1
        self._tasks = []
97
98
        #: dict: keep the main threads of the controller (buffers and handler)
99 1
        self._threads = {}
100
        #: KytosBuffers: KytosBuffer object with Controller buffers
101 1
        self.buffers = KytosBuffers(loop=self._loop)
102
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
103
        #:
104
        #: This dict stores all connections between the controller and the
105
        #: switches. The key for this dict is a tuple (ip, port). The content
106
        #: is a Connection
107 1
        self.connections = {}
108
        #: dict: mapping of events and event listeners.
109
        #:
110
        #: The key of the dict is a KytosEvent (or a string that represent a
111
        #: regex to match against KytosEvents) and the value is a list of
112
        #: methods that will receive the referenced event
113 1
        self.events_listeners = {'kytos/core.connection.new':
114
                                 [self.new_connection]}
115
116
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
117
        #:
118
        #: The key is the napp name (string), while the value is the napp
119
        #: instance itself.
120 1
        self.napps = {}
121
        #: Object generated by ParseArgs on config.py file
122 1
        self.options = options
123
        #: KytosServer: Instance of KytosServer that will be listening to TCP
124
        #: connections.
125 1
        self.server = None
126
        #: dict: Current existing switches.
127
        #:
128
        #: The key is the switch dpid, while the value is a Switch object.
129 1
        self.switches = {}  # dpid: Switch()
130 1
        self._switches_lock = threading.Lock()
131
132
        #: datetime.datetime: Time when the controller finished starting.
133 1
        self.started_at = None
134
135
        #: logging.Logger: Logger instance used by Kytos.
136 1
        self.log = None
137
138
        #: Observer that handle NApps when they are enabled or disabled.
139 1
        self.napp_dir_listener = NAppDirListener(self)
140
141 1
        self.napps_manager = NAppsManager(self)
142
143
        #: API Server used to expose rest endpoints.
144 1
        self.api_server = APIServer(__name__, self.options.listen,
145
                                    self.options.api_port,
146
                                    self.napps_manager, self.options.napps)
147
148 1
        self.auth = Auth(self)
149 1
        self.dead_letter = DeadLetter(self)
150 1
        self._alisten_tasks = set()
151
152 1
        self._register_endpoints()
153
        #: Adding the napps 'enabled' directory into the PATH
154
        #: Now you can access the enabled napps with:
155
        #: from napps.<username>.<napp_name> import ?....
156 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
157 1
        sys.excepthook = exc_handler
158
159 1
    def enable_logs(self):
160
        """Register kytos log and enable the logs."""
161 1
        decorators = self.options.logger_decorators
162 1
        try:
163 1
            decorators = [self._resolve(deco) for deco in decorators]
164
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
165
            print(f'Failed to resolve decorator module: {err.name}')
166
            sys.exit(1)
167
        except AttributeError:
168
            print(f'Failed to resolve decorator name: {decorators}')
169
            sys.exit(1)
170 1
        LogManager.decorate_logger_class(*decorators)
171 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
172 1
        LogManager.enable_websocket(self.api_server.server)
173 1
        self.log = logging.getLogger(__name__)
174 1
        self._patch_core_loggers()
175
176 1
    @staticmethod
177 1
    def _resolve(name):
178
        """Resolve a dotted name to a global object."""
179
        mod, _, attr = name.rpartition('.')
180
        mod = importlib.import_module(mod)
181
        return getattr(mod, attr)
182
183 1
    @staticmethod
184 1
    def _patch_core_loggers():
185
        """Patch in updated loggers to 'kytos.core.*' modules"""
186 1
        match_str = 'kytos.core.'
187 1
        str_len = len(match_str)
188 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
189
                           if mod_name[:str_len] == match_str]
190 1
        for module in reloadable_mods:
191 1
            module.LOG = logging.getLogger(module.__name__)
192
193 1
    @staticmethod
194 1
    def loggers():
195
        """List all logging Loggers.
196
197
        Return a list of Logger objects, with name and logging level.
198
        """
199 1
        return [logging.getLogger(name)
200
                for name in logging.root.manager.loggerDict
201
                if "kytos" in name]
202
203 1
    def toggle_debug(self, name=None):
204
        """Enable/disable logging debug messages to a given logger name.
205
206
        If the name parameter is not specified the debug will be
207
        enabled/disabled following the initial config file. It will decide
208
        to enable/disable using the 'kytos' name to find the current
209
        logger level.
210
        Obs: To disable the debug the logging will be set to NOTSET
211
212
        Args:
213
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
214
        """
215 1
        if name and name not in logging.root.manager.loggerDict:
216
            # A Logger name that is not declared in logging will raise an error
217
            # otherwise logging would create a new Logger.
218 1
            raise ValueError(f"Invalid logger name: {name}")
219
220 1
        if not name:
221
            # Logger name not specified.
222 1
            level = logging.getLogger('kytos').getEffectiveLevel()
223 1
            enable_debug = level != logging.DEBUG
224
225
            # Enable/disable default Loggers
226 1
            LogManager.load_config_file(self.options.logging, enable_debug)
227 1
            return
228
229
        # Get effective logger level for the name
230 1
        level = logging.getLogger(name).getEffectiveLevel()
231 1
        logger = logging.getLogger(name)
232
233 1
        if level == logging.DEBUG:
234
            # disable debug
235 1
            logger.setLevel(logging.NOTSET)
236
        else:
237
            # enable debug
238 1
            logger.setLevel(logging.DEBUG)
239
240 1
    def start(self, restart=False):
241
        """Create pidfile and call start_controller method."""
242 1
        self.enable_logs()
243 1
        if self.options.database:
244 1
            self.db_conn_or_core_shutdown()
245 1
        if self.options.apm:
246 1
            self.init_apm_or_core_shutdown()
247 1
        if not restart:
248 1
            self.create_pidfile()
249 1
        self.start_controller()
250
251 1
    def create_pidfile(self):
252
        """Create a pidfile."""
253 1
        pid = os.getpid()
254
255
        # Creates directory if it doesn't exist
256
        # System can erase /var/run's content
257 1
        pid_folder = Path(self.options.pidfile).parent
258 1
        self.log.info(pid_folder)
259
260
        # Pylint incorrectly infers Path objects
261
        # https://github.com/PyCQA/pylint/issues/224
262
        # pylint: disable=no-member
263 1
        if not pid_folder.exists():
264
            pid_folder.mkdir()
265
            pid_folder.chmod(0o1777)
266
        # pylint: enable=no-member
267
268
        # Make sure the file is deleted when controller stops
269 1
        atexit.register(Path(self.options.pidfile).unlink)
270
271
        # Checks if a pidfile exists. Creates a new file.
272 1
        try:
273 1
            pidfile = open(self.options.pidfile, mode='x')
274 1
        except OSError:
275
            # This happens if there is a pidfile already.
276
            # We shall check if the process that created the pidfile is still
277
            # running.
278 1
            try:
279 1
                existing_file = open(self.options.pidfile, mode='r')
280 1
                old_pid = int(existing_file.read())
281 1
                os.kill(old_pid, 0)
282
                # If kill() doesn't return an error, there's a process running
283
                # with the same PID. We assume it is Kytos and quit.
284
                # Otherwise, overwrite the file and proceed.
285
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
286
                             "running. Aborting.")
287
                sys.exit(error_msg.format(self.options.pidfile))
288 1
            except OSError:
289 1
                try:
290 1
                    pidfile = open(self.options.pidfile, mode='w')
291
                except OSError as exception:
292
                    error_msg = "Failed to create pidfile {}: {}."
293
                    sys.exit(error_msg.format(self.options.pidfile, exception))
294
295
        # Identifies the process that created the pidfile.
296 1
        pidfile.write(str(pid))
297 1
        pidfile.close()
298
299 1
    def start_controller(self):
300
        """Start the controller.
301
302
        Starts the KytosServer (TCP Server) coroutine.
303
        Starts a thread for each buffer handler.
304
        Load the installed apps.
305
        """
306 1
        self.log.info("Starting Kytos - Kytos Controller")
307 1
        self.server = KytosServer((self.options.listen,
308
                                   int(self.options.port)),
309
                                  KytosServerProtocol,
310
                                  self,
311
                                  self.options.protocol_name)
312
313 1
        self.log.info("Starting TCP server: %s", self.server)
314 1
        self.server.serve_forever()
315
316 1
        def _stop_loop(_):
317
            loop = asyncio.get_event_loop()
318
            # print(_.result())
319
            threads = threading.enumerate()
320
            self.log.debug("%s threads before loop.stop: %s",
321
                           len(threads), threads)
322
            loop.stop()
323
324 1
        async def _run_api_server_thread(executor):
325
            log = logging.getLogger('kytos.core.controller.api_server_thread')
326
            log.debug('starting')
327
            # log.debug('creating tasks')
328
            loop = asyncio.get_event_loop()
329
            blocking_tasks = [
330
                loop.run_in_executor(executor, self.api_server.run)
331
            ]
332
            # log.debug('waiting for tasks')
333
            completed, pending = await asyncio.wait(blocking_tasks)
334
            # results = [t.result() for t in completed]
335
            # log.debug('results: {!r}'.format(results))
336
            log.debug('completed: %d, pending: %d',
337
                      len(completed), len(pending))
338
339 1
        task = self._loop.create_task(self.event_handler("conn"))
340 1
        self._tasks.append(task)
341 1
        task = self._loop.create_task(self.event_handler("raw"))
342 1
        self._tasks.append(task)
343 1
        task = self._loop.create_task(self.event_handler("msg_in"))
344 1
        self._tasks.append(task)
345 1
        task = self._loop.create_task(self.msg_out_event_handler())
346 1
        self._tasks.append(task)
347 1
        task = self._loop.create_task(self.event_handler("app"))
348 1
        self._tasks.append(task)
349 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
350 1
        task.add_done_callback(_stop_loop)
351 1
        self._tasks.append(task)
352
353 1
        self.log.info("ThreadPool started: %s", self._pool)
354
355
        # ASYNC TODO: ensure all threads started correctly
356
        # This is critical, if any of them failed starting we should exit.
357
        # sys.exit(error_msg.format(thread, exception))
358
359 1
        self.log.info("Loading Kytos NApps...")
360 1
        self.napp_dir_listener.start()
361 1
        self.pre_install_napps(self.options.napps_pre_installed)
362 1
        self.load_napps()
363
364 1
        self.started_at = now()
365
366 1
    def db_conn_or_core_shutdown(self):
367
        """Ensure db connection or core shutdown."""
368 1
        try:
369 1
            db_conn_wait(db_backend=self.options.database)
370 1
        except KytosDBInitException as exc:
371 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
372
373 1
    def init_apm_or_core_shutdown(self, **kwargs):
374
        """Init APM instrumentation or core shutdown."""
375
        if not self.options.apm:
376
            return
377
        try:
378
            init_apm(self.options.apm, app=self.api_server.app,
379
                     **kwargs)
380
        except KytosAPMInitException as exc:
381
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
382
383 1
    def _register_endpoints(self):
384
        """Register all rest endpoint served by kytos.
385
386
        -   Register APIServer endpoints
387
        -   Register WebUI endpoints
388
        -   Register ``/api/kytos/core/config`` endpoint
389
        """
390 1
        self.api_server.start_api()
391
        # Register controller endpoints as /api/kytos/core/...
392 1
        self.api_server.register_core_endpoint('config/',
393
                                               self.configuration_endpoint)
394 1
        self.api_server.register_core_endpoint('metadata/',
395
                                               Controller.metadata_endpoint)
396 1
        self.api_server.register_core_endpoint(
397
            'reload/<username>/<napp_name>/',
398
            self.rest_reload_napp)
399 1
        self.api_server.register_core_endpoint('reload/all',
400
                                               self.rest_reload_all_napps)
401 1
        self.auth.register_core_auth_services()
402 1
        self.dead_letter.register_endpoints()
403
404 1
    def register_rest_endpoint(self, url, function, methods):
405
        """Deprecate in favor of @rest decorator."""
406 1
        self.api_server.register_rest_endpoint(url, function, methods)
407
408 1
    @classmethod
409 1
    def metadata_endpoint(cls):
410
        """Return the Kytos metadata.
411
412
        Returns:
413
            string: Json with current kytos metadata.
414
415
        """
416 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
417 1
        meta_file = open(meta_path).read()
418 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
419 1
        return json.dumps(metadata)
420
421 1
    def configuration_endpoint(self):
422
        """Return the configuration options used by Kytos.
423
424
        Returns:
425
            string: Json with current configurations used by kytos.
426
427
        """
428 1
        return json.dumps(self.options.__dict__)
429
430 1
    def restart(self, graceful=True):
431
        """Restart Kytos SDN Controller.
432
433
        Args:
434
            graceful(bool): Represents the way that Kytos will restart.
435
        """
436 1
        if self.started_at is not None:
437 1
            self.stop(graceful)
438 1
            self.__init__(self.options)
439
440 1
        self.start(restart=True)
441
442 1
    def stop(self, graceful=True):
443
        """Shutdown all services used by kytos.
444
445
        This method should:
446
            - stop all Websockets
447
            - stop the API Server
448
            - stop the Controller
449
        """
450 1
        if self.started_at:
451 1
            self.stop_controller(graceful)
452
453 1
    def stop_controller(self, graceful=True):
454
        """Stop the controller.
455
456
        This method should:
457
            - announce on the network that the controller will shutdown;
458
            - stop receiving incoming packages;
459
            - call the 'shutdown' method of each KytosNApp that is running;
460
            - finish reading the events on all buffers;
461
            - stop each running handler;
462
            - stop all running threads;
463
            - stop the KytosServer;
464
        """
465 1
        self.log.info("Stopping Kytos")
466
467 1
        self.buffers.send_stop_signal()
468 1
        self.api_server.stop_api_server()
469 1
        self.napp_dir_listener.stop()
470
471 1
        self.log.info("Stopping threadpool: %s", self._pool)
472 1
        for pool_name in executors:
473 1
            self.log.info("Stopping threadpool: %s", pool_name)
474
475 1
        threads = threading.enumerate()
476 1
        self.log.debug("%s threads before threadpool shutdown: %s",
477
                       len(threads), threads)
478
479 1
        try:
480
            # Python >= 3.9
481
            # pylint: disable=unexpected-keyword-arg
482 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
483 1
            for executor_pool in executors.values():
484 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
485
        except TypeError:
486
            self._pool.shutdown(wait=graceful)
487
            for executor_pool in executors.values():
488
                executor_pool.shutdown(wait=graceful)
489
490
        # self.server.socket.shutdown()
491
        # self.server.socket.close()
492
493
        # for thread in self._threads.values():
494
        #     self.log.info("Stopping thread: %s", thread.name)
495
        #     thread.join()
496
497
        # for thread in self._threads.values():
498
        #     while thread.is_alive():
499
        #         self.log.info("Thread is alive: %s", thread.name)
500
        #         pass
501
502 1
        self.started_at = None
503 1
        self.unload_napps()
504 1
        self.buffers = KytosBuffers()
505
506
        # Cancel all async tasks (event handlers and servers)
507 1
        for task in self._tasks:
508
            task.cancel()
509
510
        # ASYNC TODO: close connections
511
        # self.server.server_close()
512
513
        # Shutdown the TCP server and the main asyncio loop
514 1
        self.server.shutdown()
515
516 1
    def status(self):
517
        """Return status of Kytos Server.
518
519
        If the controller kytos is running this method will be returned
520
        "Running since 'Started_At'", otherwise "Stopped".
521
522
        Returns:
523
            string: String with kytos status.
524
525
        """
526 1
        if self.started_at:
527 1
            return "Running since %s" % self.started_at
528 1
        return "Stopped"
529
530 1
    def uptime(self):
531
        """Return the uptime of kytos server.
532
533
        This method should return:
534
            - 0 if Kytos Server is stopped.
535
            - (kytos.start_at - datetime.now) if Kytos Server is running.
536
537
        Returns:
538
           datetime.timedelta: The uptime interval.
539
540
        """
541 1
        return now() - self.started_at if self.started_at else 0
542
543 1
    def notify_listeners(self, event):
544
        """Send the event to the specified listeners.
545
546
        Loops over self.events_listeners matching (by regexp) the attribute
547
        name of the event with the keys of events_listeners. If a match occurs,
548
        then send the event to each registered listener.
549
550
        Args:
551
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
552
        """
553
554 1
        self.log.debug("looking for listeners for %s", event)
555 1
        for event_regex, listeners in dict(self.events_listeners).items():
556
            # self.log.debug("listeners found for %s: %r => %s", event,
557
            #                event_regex, [l.__qualname__ for l in listeners])
558
            # Do not match if the event has more characters
559
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
560 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
561 1
                event_regex += '$'
562 1
            if re.match(event_regex, event.name):
563 1
                self.log.debug('Calling listeners for %s', event)
564 1
                for listener in listeners:
565 1
                    if asyncio.iscoroutinefunction(listener):
566
                        task = asyncio.create_task(listener(event))
567
                        self._alisten_tasks.add(task)
568
                        task.add_done_callback(self._alisten_tasks.discard)
569
                    else:
570 1
                        listener(event)
571
572 1
    async def event_handler(self, buffer_name: str):
573
        """Default event handler that gets from an event buffer."""
574 1
        event_buffer = getattr(self.buffers, buffer_name)
575
        while True:
576 1
            event = await event_buffer.aget()
577 1
            self.notify_listeners(event)
578
579 1
            if event.name == "kytos/core.shutdown":
580 1
                self.log.debug("Raw Event handler stopped")
581 1
                break
582
583 1
    async def publish_connection_error(self, event):
584
        """Publish connection error event.
585
586
        Args:
587
            event (KytosEvent): Event that triggered for error propagation.
588
        """
589
        event.name = \
590
            f"kytos/core.{event.destination.protocol.name}.connection.error"
591
        error_msg = f"Connection state: {event.destination.state}"
592
        event.content["exception"] = error_msg
593
        await self.buffers.app.aput(event)
594
595 1
    async def msg_out_event_handler(self):
596
        """Handle msg_out events.
597
598
        Listen to the msg_out buffer and send all its events to the
599
        corresponding listeners.
600
        """
601 1
        self.log.info("Message Out Event Handler started")
602
        while True:
603 1
            triggered_event = await self.buffers.msg_out.aget()
604
605 1
            if triggered_event.name == "kytos/core.shutdown":
606 1
                self.log.debug("Message Out Event handler stopped")
607 1
                break
608
609 1
            message = triggered_event.content['message']
610 1
            destination = triggered_event.destination
611 1
            try:
612 1
                if (destination and
613
                        not destination.state == ConnectionState.FINISHED):
614 1
                    packet = message.pack()
615 1
                    destination.send(packet)
616 1
                    self.log.debug('Connection %s: OUT OFP, '
617
                                   'version: %s, type: %s, xid: %s - %s',
618
                                   destination.id,
619
                                   message.header.version,
620
                                   message.header.message_type,
621
                                   message.header.xid,
622
                                   packet.hex())
623 1
                    self.notify_listeners(triggered_event)
624 1
                    continue
625
626
            except (OSError, SocketError):
627
                pass
628
629
            await self.publish_connection_error(triggered_event)
630
            self.log.info("connection closed. Cannot send message")
631
632 1
    def get_interface_by_id(self, interface_id):
633
        """Find a Interface  with interface_id.
634
635
        Args:
636
            interface_id(str): Interface Identifier.
637
638
        Returns:
639
            Interface: Instance of Interface with the id given.
640
641
        """
642 1
        if interface_id is None:
643 1
            return None
644
645 1
        switch_id = ":".join(interface_id.split(":")[:-1])
646 1
        interface_number = int(interface_id.split(":")[-1])
647
648 1
        switch = self.switches.get(switch_id)
649
650 1
        if not switch:
651 1
            return None
652
653 1
        return switch.interfaces.get(interface_number, None)
654
655 1
    def get_switch_by_dpid(self, dpid):
656
        """Return a specific switch by dpid.
657
658
        Args:
659
            dpid (|DPID|): dpid object used to identify a switch.
660
661
        Returns:
662
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
663
664
        """
665 1
        return self.switches.get(dpid)
666
667 1
    def get_switch_or_create(self, dpid, connection=None):
668
        """Return switch or create it if necessary.
669
670
        Args:
671
            dpid (|DPID|): dpid object used to identify a switch.
672
            connection (:class:`~kytos.core.connection.Connection`):
673
                connection used by switch. If a switch has a connection that
674
                will be updated.
675
676
        Returns:
677
            :class:`~kytos.core.switch.Switch`: new or existent switch.
678
679
        """
680 1
        with self._switches_lock:
681 1
            if connection:
682 1
                self.create_or_update_connection(connection)
683
684 1
            switch = self.get_switch_by_dpid(dpid)
685 1
            event_name = 'kytos/core.switch.'
686
687 1
            if switch is None:
688 1
                switch = Switch(dpid=dpid)
689 1
                self.add_new_switch(switch)
690 1
                event_name += 'new'
691
            else:
692 1
                event_name += 'reconnected'
693
694 1
            self.set_switch_options(dpid=dpid)
695 1
            event = KytosEvent(name=event_name, content={'switch': switch})
696
697 1
            if connection:
698 1
                old_connection = switch.connection
699 1
                switch.update_connection(connection)
700
701 1
                if old_connection is not connection:
702 1
                    self.remove_connection(old_connection)
703
704 1
            self.buffers.app.put(event)
705
706 1
            return switch
707
708 1
    def set_switch_options(self, dpid):
709
        """Update the switch settings based on kytos.conf options.
710
711
        Args:
712
            dpid (str): dpid used to identify a switch.
713
714
        """
715 1
        switch = self.switches.get(dpid)
716 1
        if not switch:
717
            return
718
719 1
        vlan_pool = {}
720 1
        vlan_pool = self.options.vlan_pool
721 1
        if not vlan_pool:
722 1
            return
723
724 1
        if vlan_pool.get(dpid):
725 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
726 1
            for intf_num, port_list in vlan_pool[dpid].items():
727 1
                if not switch.interfaces.get((intf_num)):
728 1
                    vlan_ids = set()
729 1
                    for vlan_range in port_list:
730 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
731 1
                        for vlan_id in range(vlan_begin, vlan_end):
732 1
                            vlan_ids.add(vlan_id)
733 1
                    intf_num = int(intf_num)
734 1
                    intf = Interface(name=intf_num, port_number=intf_num,
735
                                     switch=switch)
736 1
                    intf.set_available_tags(vlan_ids)
737 1
                    switch.update_interface(intf)
738
739 1
    def create_or_update_connection(self, connection):
740
        """Update a connection.
741
742
        Args:
743
            connection (:class:`~kytos.core.connection.Connection`):
744
                Instance of connection that will be updated.
745
        """
746 1
        self.connections[connection.id] = connection
747
748 1
    def get_connection_by_id(self, conn_id):
749
        """Return a existent connection by id.
750
751
        Args:
752
            id (int): id from a connection.
753
754
        Returns:
755
            :class:`~kytos.core.connection.Connection`: Instance of connection
756
                or None Type.
757
758
        """
759 1
        return self.connections.get(conn_id)
760
761 1
    def remove_connection(self, connection):
762
        """Close a existent connection and remove it.
763
764
        Args:
765
            connection (:class:`~kytos.core.connection.Connection`):
766
                Instance of connection that will be removed.
767
        """
768 1
        if connection is None:
769 1
            return False
770
771 1
        try:
772 1
            connection.close()
773 1
            del self.connections[connection.id]
774 1
        except KeyError:
775 1
            return False
776 1
        return True
777
778 1
    def remove_switch(self, switch):
779
        """Remove an existent switch.
780
781
        Args:
782
            switch (:class:`~kytos.core.switch.Switch`):
783
                Instance of switch that will be removed.
784
        """
785 1
        try:
786 1
            del self.switches[switch.dpid]
787 1
        except KeyError:
788 1
            return False
789 1
        return True
790
791 1
    def new_connection(self, event):
792
        """Handle a kytos/core.connection.new event.
793
794
        This method will read new connection event and store the connection
795
        (socket) into the connections attribute on the controller.
796
797
        It also clear all references to the connection since it is a new
798
        connection on the same ip:port.
799
800
        Args:
801
            event (~kytos.core.KytosEvent):
802
                The received event (``kytos/core.connection.new``) with the
803
                needed info.
804
        """
805 1
        self.log.info("Handling %s...", event)
806
807 1
        connection = event.source
808 1
        self.log.debug("Event source: %s", event.source)
809
810
        # Remove old connection (aka cleanup) if it exists
811 1
        if self.get_connection_by_id(connection.id):
812
            self.remove_connection(connection.id)
813
814
        # Update connections with the new connection
815 1
        self.create_or_update_connection(connection)
816
817 1
    def add_new_switch(self, switch):
818
        """Add a new switch on the controller.
819
820
        Args:
821
            switch (Switch): A Switch object
822
        """
823 1
        self.switches[switch.dpid] = switch
824
825 1
    def _import_napp(self, username, napp_name):
826
        """Import a NApp module.
827
828
        Raises:
829
            FileNotFoundError: if NApp's main.py is not found.
830
            ModuleNotFoundError: if any NApp requirement is not installed.
831
832
        """
833 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
834 1
        path = os.path.join(self.options.napps, username, napp_name,
835
                            'main.py')
836 1
        napp_spec = spec_from_file_location(mod_name, path)
837 1
        napp_module = module_from_spec(napp_spec)
838 1
        sys.modules[napp_spec.name] = napp_module
839 1
        napp_spec.loader.exec_module(napp_module)
840 1
        return napp_module
841
842 1
    def load_napp(self, username, napp_name):
843
        """Load a single NApp.
844
845
        Args:
846
            username (str): NApp username (makes up NApp's path).
847
            napp_name (str): Name of the NApp to be loaded.
848
        """
849 1
        if (username, napp_name) in self.napps:
850 1
            message = 'NApp %s/%s was already loaded'
851 1
            self.log.warning(message, username, napp_name)
852 1
            return
853
854 1
        try:
855 1
            napp_module = self._import_napp(username, napp_name)
856 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
857 1
            self.log.error("Error loading NApp '%s/%s': %s",
858
                           username, napp_name, err)
859 1
            return
860 1
        except FileNotFoundError as err:
861 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
862 1
            self.log.warning(msg, err.filename)
863 1
            return
864
865 1
        try:
866 1
            napp = napp_module.Main(controller=self)
867 1
        except:  # noqa pylint: disable=bare-except
868 1
            self.log.critical("NApp initialization failed: %s/%s",
869
                              username, napp_name, exc_info=True)
870 1
            return
871
872 1
        self.napps[(username, napp_name)] = napp
873
874 1
        napp.start()
875 1
        self.api_server.authenticate_endpoints(napp)
876 1
        self.api_server.register_napp_endpoints(napp)
877
878
        # pylint: disable=protected-access
879 1
        for event, listeners in napp._listeners.items():
880
            self.events_listeners.setdefault(event, []).extend(listeners)
881
        # pylint: enable=protected-access
882
883 1
    def pre_install_napps(self, napps, enable=True):
884
        """Pre install and enable NApps.
885
886
        Before installing, it'll check if it's installed yet.
887
888
        Args:
889
            napps ([str]): List of NApps to be pre-installed and enabled.
890
        """
891 1
        all_napps = self.napps_manager.get_installed_napps()
892 1
        installed = [str(napp) for napp in all_napps]
893 1
        napps_diff = [napp for napp in napps if napp not in installed]
894 1
        for napp in napps_diff:
895 1
            self.napps_manager.install(napp, enable=enable)
896
897 1
    def load_napps(self):
898
        """Load all NApps enabled on the NApps dir."""
899 1
        for napp in self.napps_manager.get_enabled_napps():
900 1
            try:
901 1
                self.log.info("Loading NApp %s", napp.id)
902 1
                self.load_napp(napp.username, napp.name)
903
            except FileNotFoundError as exception:
904
                self.log.error("Could not load NApp %s: %s",
905
                               napp.id, exception)
906
907 1
    def unload_napp(self, username, napp_name):
908
        """Unload a specific NApp.
909
910
        Args:
911
            username (str): NApp username.
912
            napp_name (str): Name of the NApp to be unloaded.
913
        """
914 1
        napp = self.napps.pop((username, napp_name), None)
915
916 1
        if napp is None:
917
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
918
        else:
919 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
920 1
            napp_id = NApp(username, napp_name).id
921 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
922 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
923
            # Call listener before removing it from events_listeners
924 1
            napp_shutdown_fn(event)
925
926
            # Remove rest endpoints from that napp
927 1
            self.api_server.remove_napp_endpoints(napp)
928
929
            # Removing listeners from that napp
930
            # pylint: disable=protected-access
931 1
            for event_type, napp_listeners in napp._listeners.items():
932
                event_listeners = self.events_listeners[event_type]
933
                for listener in napp_listeners:
934
                    event_listeners.remove(listener)
935
                if not event_listeners:
936
                    del self.events_listeners[event_type]
937
            # pylint: enable=protected-access
938
939 1
    def unload_napps(self):
940
        """Unload all loaded NApps that are not core NApps."""
941
        # list() is used here to avoid the error:
942
        # 'RuntimeError: dictionary changed size during iteration'
943
        # This is caused by looping over an dictionary while removing
944
        # items from it.
945
        for (username, napp_name) in list(self.napps.keys()):  # noqa
946
            self.unload_napp(username, napp_name)
947
948 1
    def reload_napp_module(self, username, napp_name, napp_file):
949
        """Reload a NApp Module."""
950 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
951 1
        try:
952 1
            napp_module = import_module(mod_name)
953 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
954 1
            self.log.error("Module '%s' not found", mod_name)
955 1
            raise
956 1
        try:
957 1
            napp_module = reload_module(napp_module)
958 1
        except ImportError as err:
959 1
            self.log.error("Error reloading NApp '%s/%s': %s",
960
                           username, napp_name, err)
961 1
            raise
962
963 1
    def reload_napp(self, username, napp_name):
964
        """Reload a NApp."""
965 1
        self.unload_napp(username, napp_name)
966 1
        try:
967 1
            self.reload_napp_module(username, napp_name, 'settings')
968 1
            self.reload_napp_module(username, napp_name, 'main')
969 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
970 1
            return 400
971 1
        self.log.info("NApp '%s/%s' successfully reloaded",
972
                      username, napp_name)
973 1
        self.load_napp(username, napp_name)
974 1
        return 200
975
976 1
    def rest_reload_napp(self, username, napp_name):
977
        """Request reload a NApp."""
978 1
        res = self.reload_napp(username, napp_name)
979 1
        return 'reloaded', res
980
981 1
    def rest_reload_all_napps(self):
982
        """Request reload all NApps."""
983 1
        for napp in self.napps:
984 1
            self.reload_napp(*napp)
985
        return 'reloaded', 200
986