Passed
Push — master ( 1b986a...ef0e11 )
by Vinicius
03:22 queued 14s
created

kytos.core.controller.Controller.loggers()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 0
dl 0
loc 9
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import importlib
19 1
import json
20 1
import logging
21 1
import os
22 1
import re
23 1
import sys
24 1
import threading
25 1
from concurrent.futures import ThreadPoolExecutor
26 1
from importlib import import_module
27 1
from importlib import reload as reload_module
28 1
from importlib.util import module_from_spec, spec_from_file_location
29 1
from pathlib import Path
30 1
from socket import error as SocketError
31
32 1
from kytos.core.api_server import APIServer
33 1
from kytos.core.apm import init_apm
34
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
35 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
36 1
from kytos.core.auth import Auth
37 1
from kytos.core.buffers import KytosBuffers
38 1
from kytos.core.config import KytosConfig
39 1
from kytos.core.connection import ConnectionState
40 1
from kytos.core.db import db_conn_wait
41 1
from kytos.core.dead_letter import DeadLetter
42 1
from kytos.core.events import KytosEvent
43 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
44 1
from kytos.core.helpers import executors, now
45 1
from kytos.core.interface import Interface
46 1
from kytos.core.logs import LogManager
47 1
from kytos.core.napps.base import NApp
48 1
from kytos.core.napps.manager import NAppsManager
49 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
50 1
from kytos.core.switch import Switch
51
52 1
__all__ = ('Controller',)
53
54
55 1
def exc_handler(_, exc, __):
56
    """Log uncaught exceptions.
57
58
    Args:
59
        exc_type (ignored): exception type
60
        exc: exception instance
61
        tb (ignored): traceback
62
    """
63
    logging.basicConfig(filename='errlog.log',
64
                        format='%(asctime)s:%(pathname)'
65
                        's:%(levelname)s:%(message)s')
66
    logging.exception('Uncaught Exception: %s', exc)
67
68
69 1
class Controller:
70
    """Main class of Kytos.
71
72
    The main responsibilities of this class are:
73
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
74
        - manage KytosNApps (install, load and unload);
75
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
76
        - manage which event should be sent to NApps methods;
77
        - manage the buffers handlers, considering one thread per handler.
78
    """
79
80
    # Created issue #568 for the disabled checks.
81
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
82 1
    def __init__(self, options=None, loop=None):
83
        """Init method of Controller class takes the parameters below.
84
85
        Args:
86
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
87
                instance of :class:`~kytos.core.config.KytosConfig` class.
88
        """
89 1
        if options is None:
90
            options = KytosConfig().options['daemon']
91
92 1
        self._loop = loop or asyncio.get_event_loop()
93 1
        self._pool = ThreadPoolExecutor(max_workers=1)
94
95
        # asyncio tasks
96 1
        self._tasks = []
97
98
        #: dict: keep the main threads of the controller (buffers and handler)
99 1
        self._threads = {}
100
        #: KytosBuffers: KytosBuffer object with Controller buffers
101 1
        self.buffers = KytosBuffers(loop=self._loop)
102
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
103
        #:
104
        #: This dict stores all connections between the controller and the
105
        #: switches. The key for this dict is a tuple (ip, port). The content
106
        #: is a Connection
107 1
        self.connections = {}
108
        #: dict: mapping of events and event listeners.
109
        #:
110
        #: The key of the dict is a KytosEvent (or a string that represent a
111
        #: regex to match against KytosEvents) and the value is a list of
112
        #: methods that will receive the referenced event
113 1
        self.events_listeners = {'kytos/core.connection.new':
114
                                 [self.new_connection]}
115
116
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
117
        #:
118
        #: The key is the napp name (string), while the value is the napp
119
        #: instance itself.
120 1
        self.napps = {}
121
        #: Object generated by ParseArgs on config.py file
122 1
        self.options = options
123
        #: KytosServer: Instance of KytosServer that will be listening to TCP
124
        #: connections.
125 1
        self.server = None
126
        #: dict: Current existing switches.
127
        #:
128
        #: The key is the switch dpid, while the value is a Switch object.
129 1
        self.switches = {}  # dpid: Switch()
130 1
        self._switches_lock = threading.Lock()
131
132
        #: datetime.datetime: Time when the controller finished starting.
133 1
        self.started_at = None
134
135
        #: logging.Logger: Logger instance used by Kytos.
136 1
        self.log = None
137
138
        #: Observer that handle NApps when they are enabled or disabled.
139 1
        self.napp_dir_listener = NAppDirListener(self)
140
141 1
        self.napps_manager = NAppsManager(self)
142
143
        #: API Server used to expose rest endpoints.
144 1
        self.api_server = APIServer(__name__, self.options.listen,
145
                                    self.options.api_port,
146
                                    self.napps_manager, self.options.napps)
147
148 1
        self.auth = Auth(self)
149 1
        self.dead_letter = DeadLetter(self)
150 1
        self._alisten_tasks = set()
151
152 1
        self._register_endpoints()
153
        #: Adding the napps 'enabled' directory into the PATH
154
        #: Now you can access the enabled napps with:
155
        #: from napps.<username>.<napp_name> import ?....
156 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
157 1
        sys.excepthook = exc_handler
158
159 1
    def enable_logs(self):
160
        """Register kytos log and enable the logs."""
161 1
        decorators = self.options.logger_decorators
162 1
        try:
163 1
            decorators = [self._resolve(deco) for deco in decorators]
164
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
165
            print(f'Failed to resolve decorator module: {err.name}')
166
            sys.exit(1)
167
        except AttributeError:
168
            print(f'Failed to resolve decorator name: {decorators}')
169
            sys.exit(1)
170 1
        LogManager.decorate_logger_class(*decorators)
171 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
172 1
        LogManager.enable_websocket(self.api_server.server)
173 1
        self.log = logging.getLogger(__name__)
174 1
        self._patch_core_loggers()
175
176 1
    @staticmethod
177 1
    def _resolve(name):
178
        """Resolve a dotted name to a global object."""
179
        mod, _, attr = name.rpartition('.')
180
        mod = importlib.import_module(mod)
181
        return getattr(mod, attr)
182
183 1
    @staticmethod
184 1
    def _patch_core_loggers():
185
        """Patch in updated loggers to 'kytos.core.*' modules"""
186 1
        match_str = 'kytos.core.'
187 1
        str_len = len(match_str)
188 1
        reloadable_mods = [module for mod_name, module in sys.modules.items()
189
                           if mod_name[:str_len] == match_str]
190 1
        for module in reloadable_mods:
191 1
            module.LOG = logging.getLogger(module.__name__)
192
193 1
    @staticmethod
194 1
    def loggers():
195
        """List all logging Loggers.
196
197
        Return a list of Logger objects, with name and logging level.
198
        """
199 1
        return [logging.getLogger(name)
200
                for name in logging.root.manager.loggerDict
201
                if "kytos" in name]
202
203 1
    def toggle_debug(self, name=None):
204
        """Enable/disable logging debug messages to a given logger name.
205
206
        If the name parameter is not specified the debug will be
207
        enabled/disabled following the initial config file. It will decide
208
        to enable/disable using the 'kytos' name to find the current
209
        logger level.
210
        Obs: To disable the debug the logging will be set to NOTSET
211
212
        Args:
213
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
214
        """
215 1
        if name and name not in logging.root.manager.loggerDict:
216
            # A Logger name that is not declared in logging will raise an error
217
            # otherwise logging would create a new Logger.
218 1
            raise ValueError(f"Invalid logger name: {name}")
219
220 1
        if not name:
221
            # Logger name not specified.
222 1
            level = logging.getLogger('kytos').getEffectiveLevel()
223 1
            enable_debug = level != logging.DEBUG
224
225
            # Enable/disable default Loggers
226 1
            LogManager.load_config_file(self.options.logging, enable_debug)
227 1
            return
228
229
        # Get effective logger level for the name
230 1
        level = logging.getLogger(name).getEffectiveLevel()
231 1
        logger = logging.getLogger(name)
232
233 1
        if level == logging.DEBUG:
234
            # disable debug
235 1
            logger.setLevel(logging.NOTSET)
236
        else:
237
            # enable debug
238 1
            logger.setLevel(logging.DEBUG)
239
240 1
    def start(self, restart=False):
241
        """Create pidfile and call start_controller method."""
242 1
        self.enable_logs()
243 1
        if self.options.database:
244 1
            self.db_conn_or_core_shutdown()
245 1
        if self.options.apm:
246 1
            self.init_apm_or_core_shutdown()
247 1
        if not restart:
248 1
            self.create_pidfile()
249 1
        self.start_controller()
250
251 1
    def create_pidfile(self):
252
        """Create a pidfile."""
253 1
        pid = os.getpid()
254
255
        # Creates directory if it doesn't exist
256
        # System can erase /var/run's content
257 1
        pid_folder = Path(self.options.pidfile).parent
258 1
        self.log.info(pid_folder)
259
260
        # Pylint incorrectly infers Path objects
261
        # https://github.com/PyCQA/pylint/issues/224
262
        # pylint: disable=no-member
263 1
        if not pid_folder.exists():
264
            pid_folder.mkdir()
265
            pid_folder.chmod(0o1777)
266
        # pylint: enable=no-member
267
268
        # Make sure the file is deleted when controller stops
269 1
        atexit.register(Path(self.options.pidfile).unlink)
270
271
        # Checks if a pidfile exists. Creates a new file.
272 1
        try:
273 1
            pidfile = open(self.options.pidfile, mode='x')
274 1
        except OSError:
275
            # This happens if there is a pidfile already.
276
            # We shall check if the process that created the pidfile is still
277
            # running.
278 1
            try:
279 1
                existing_file = open(self.options.pidfile, mode='r')
280 1
                old_pid = int(existing_file.read())
281 1
                os.kill(old_pid, 0)
282
                # If kill() doesn't return an error, there's a process running
283
                # with the same PID. We assume it is Kytos and quit.
284
                # Otherwise, overwrite the file and proceed.
285
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
286
                             "running. Aborting.")
287
                sys.exit(error_msg.format(self.options.pidfile))
288 1
            except OSError:
289 1
                try:
290 1
                    pidfile = open(self.options.pidfile, mode='w')
291
                except OSError as exception:
292
                    error_msg = "Failed to create pidfile {}: {}."
293
                    sys.exit(error_msg.format(self.options.pidfile, exception))
294
295
        # Identifies the process that created the pidfile.
296 1
        pidfile.write(str(pid))
297 1
        pidfile.close()
298
299 1
    def start_controller(self):
300
        """Start the controller.
301
302
        Starts the KytosServer (TCP Server) coroutine.
303
        Starts a thread for each buffer handler.
304
        Load the installed apps.
305
        """
306 1
        self.log.info("Starting Kytos - Kytos Controller")
307 1
        self.server = KytosServer((self.options.listen,
308
                                   int(self.options.port)),
309
                                  KytosServerProtocol,
310
                                  self,
311
                                  self.options.protocol_name)
312
313 1
        self.log.info("Starting TCP server: %s", self.server)
314 1
        self.server.serve_forever()
315
316 1
        def _stop_loop(_):
317
            loop = asyncio.get_event_loop()
318
            # print(_.result())
319
            threads = threading.enumerate()
320
            self.log.debug("%s threads before loop.stop: %s",
321
                           len(threads), threads)
322
            loop.stop()
323
324 1
        async def _run_api_server_thread(executor):
325
            log = logging.getLogger('kytos.core.controller.api_server_thread')
326
            log.debug('starting')
327
            # log.debug('creating tasks')
328
            loop = asyncio.get_event_loop()
329
            blocking_tasks = [
330
                loop.run_in_executor(executor, self.api_server.run)
331
            ]
332
            # log.debug('waiting for tasks')
333
            completed, pending = await asyncio.wait(blocking_tasks)
334
            # results = [t.result() for t in completed]
335
            # log.debug('results: {!r}'.format(results))
336
            log.debug('completed: %d, pending: %d',
337
                      len(completed), len(pending))
338
339 1
        task = self._loop.create_task(self.event_handler("conn"))
340 1
        self._tasks.append(task)
341 1
        task = self._loop.create_task(self.event_handler("raw"))
342 1
        self._tasks.append(task)
343 1
        task = self._loop.create_task(self.event_handler("msg_in"))
344 1
        self._tasks.append(task)
345 1
        task = self._loop.create_task(self.msg_out_event_handler())
346 1
        self._tasks.append(task)
347 1
        task = self._loop.create_task(self.event_handler("app"))
348 1
        self._tasks.append(task)
349 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
350 1
        task.add_done_callback(_stop_loop)
351 1
        self._tasks.append(task)
352
353 1
        self.log.info("ThreadPool started: %s", self._pool)
354
355
        # ASYNC TODO: ensure all threads started correctly
356
        # This is critical, if any of them failed starting we should exit.
357
        # sys.exit(error_msg.format(thread, exception))
358
359 1
        self.log.info("Loading Kytos NApps...")
360 1
        self.napp_dir_listener.start()
361 1
        self.pre_install_napps(self.options.napps_pre_installed)
362 1
        self.load_napps()
363
364 1
        self.started_at = now()
365
366 1
    def db_conn_or_core_shutdown(self):
367
        """Ensure db connection or core shutdown."""
368 1
        try:
369 1
            db_conn_wait(db_backend=self.options.database)
370 1
        except KytosDBInitException as exc:
371 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
372
373 1
    def init_apm_or_core_shutdown(self, **kwargs):
374
        """Init APM instrumentation or core shutdown."""
375
        if not self.options.apm:
376
            return
377
        try:
378
            init_apm(self.options.apm, app=self.api_server.app,
379
                     **kwargs)
380
        except KytosAPMInitException as exc:
381
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
382
383 1
    def _register_endpoints(self):
384
        """Register all rest endpoint served by kytos.
385
386
        -   Register APIServer endpoints
387
        -   Register WebUI endpoints
388
        -   Register ``/api/kytos/core/config`` endpoint
389
        """
390 1
        self.api_server.start_api()
391
        # Register controller endpoints as /api/kytos/core/...
392 1
        self.api_server.register_core_endpoint('config/',
393
                                               self.configuration_endpoint)
394 1
        self.api_server.register_core_endpoint('metadata/',
395
                                               Controller.metadata_endpoint)
396 1
        self.api_server.register_core_endpoint(
397
            'reload/<username>/<napp_name>/',
398
            self.rest_reload_napp)
399 1
        self.api_server.register_core_endpoint('reload/all',
400
                                               self.rest_reload_all_napps)
401 1
        self.auth.register_core_auth_services()
402 1
        self.dead_letter.register_endpoints()
403
404 1
    def register_rest_endpoint(self, url, function, methods):
405
        """Deprecate in favor of @rest decorator."""
406 1
        self.api_server.register_rest_endpoint(url, function, methods)
407
408 1
    @classmethod
409 1
    def metadata_endpoint(cls):
410
        """Return the Kytos metadata.
411
412
        Returns:
413
            string: Json with current kytos metadata.
414
415
        """
416 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
417 1
        meta_file = open(meta_path).read()
418 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
419 1
        return json.dumps(metadata)
420
421 1
    def configuration_endpoint(self):
422
        """Return the configuration options used by Kytos.
423
424
        Returns:
425
            string: Json with current configurations used by kytos.
426
427
        """
428 1
        return json.dumps(self.options.__dict__)
429
430 1
    def restart(self, graceful=True):
431
        """Restart Kytos SDN Controller.
432
433
        Args:
434
            graceful(bool): Represents the way that Kytos will restart.
435
        """
436 1
        if self.started_at is not None:
437 1
            self.stop(graceful)
438 1
            self.__init__(self.options)
439
440 1
        self.start(restart=True)
441
442 1
    def stop(self, graceful=True):
443
        """Shutdown all services used by kytos.
444
445
        This method should:
446
            - stop all Websockets
447
            - stop the API Server
448
            - stop the Controller
449
        """
450 1
        if self.started_at:
451 1
            self.stop_controller(graceful)
452
453 1
    def stop_controller(self, graceful=True):
454
        """Stop the controller.
455
456
        This method should:
457
            - announce on the network that the controller will shutdown;
458
            - stop receiving incoming packages;
459
            - call the 'shutdown' method of each KytosNApp that is running;
460
            - finish reading the events on all buffers;
461
            - stop each running handler;
462
            - stop all running threads;
463
            - stop the KytosServer;
464
        """
465 1
        self.log.info("Stopping Kytos")
466
467 1
        self.buffers.send_stop_signal()
468 1
        self.api_server.stop_api_server()
469 1
        self.napp_dir_listener.stop()
470
471 1
        self.log.info("Stopping threadpool: %s", self._pool)
472 1
        for pool_name in executors:
473 1
            self.log.info("Stopping threadpool: %s", pool_name)
474
475 1
        threads = threading.enumerate()
476 1
        self.log.debug("%s threads before threadpool shutdown: %s",
477
                       len(threads), threads)
478
479 1
        try:
480
            # Python >= 3.9
481
            # pylint: disable=unexpected-keyword-arg
482 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
483 1
            for executor_pool in executors.values():
484 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
485
        except TypeError:
486
            self._pool.shutdown(wait=graceful)
487
            for executor_pool in executors.values():
488
                executor_pool.shutdown(wait=graceful)
489
490
        # self.server.socket.shutdown()
491
        # self.server.socket.close()
492
493
        # for thread in self._threads.values():
494
        #     self.log.info("Stopping thread: %s", thread.name)
495
        #     thread.join()
496
497
        # for thread in self._threads.values():
498
        #     while thread.is_alive():
499
        #         self.log.info("Thread is alive: %s", thread.name)
500
        #         pass
501
502 1
        self.started_at = None
503 1
        self.unload_napps()
504 1
        self.buffers = KytosBuffers()
505
506
        # Cancel all async tasks (event handlers and servers)
507 1
        for task in self._tasks:
508
            task.cancel()
509
510
        # ASYNC TODO: close connections
511
        # self.server.server_close()
512
513
        # Shutdown the TCP server and the main asyncio loop
514 1
        self.server.shutdown()
515
516 1
    def status(self):
517
        """Return status of Kytos Server.
518
519
        If the controller kytos is running this method will be returned
520
        "Running since 'Started_At'", otherwise "Stopped".
521
522
        Returns:
523
            string: String with kytos status.
524
525
        """
526 1
        if self.started_at:
527 1
            return "Running since %s" % self.started_at
528 1
        return "Stopped"
529
530 1
    def uptime(self):
531
        """Return the uptime of kytos server.
532
533
        This method should return:
534
            - 0 if Kytos Server is stopped.
535
            - (kytos.start_at - datetime.now) if Kytos Server is running.
536
537
        Returns:
538
           datetime.timedelta: The uptime interval.
539
540
        """
541 1
        return now() - self.started_at if self.started_at else 0
542
543 1
    def notify_listeners(self, event):
544
        """Send the event to the specified listeners.
545
546
        Loops over self.events_listeners matching (by regexp) the attribute
547
        name of the event with the keys of events_listeners. If a match occurs,
548
        then send the event to each registered listener.
549
550
        Args:
551
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
552
        """
553
554 1
        if logging.DEBUG >= self.log.getEffectiveLevel():
555
            self.log.debug("looking for listeners for %s", event)
556 1
        for event_regex, listeners in dict(self.events_listeners).items():
557
            # self.log.debug("listeners found for %s: %r => %s", event,
558
            #                event_regex, [l.__qualname__ for l in listeners])
559
            # Do not match if the event has more characters
560
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
561 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
562 1
                event_regex += '$'
563 1
            if re.match(event_regex, event.name):
564 1
                if logging.DEBUG >= self.log.getEffectiveLevel():
565
                    self.log.debug('Calling listeners for %s', event)
566 1
                for listener in listeners:
567 1
                    if asyncio.iscoroutinefunction(listener):
568
                        task = asyncio.create_task(listener(event))
569
                        self._alisten_tasks.add(task)
570
                        task.add_done_callback(self._alisten_tasks.discard)
571
                    else:
572 1
                        listener(event)
573
574 1
    async def event_handler(self, buffer_name: str):
575
        """Default event handler that gets from an event buffer."""
576 1
        event_buffer = getattr(self.buffers, buffer_name)
577
        while True:
578 1
            event = await event_buffer.aget()
579 1
            self.notify_listeners(event)
580
581 1
            if event.name == "kytos/core.shutdown":
582 1
                self.log.debug("Raw Event handler stopped")
583 1
                break
584
585 1
    async def publish_connection_error(self, event):
586
        """Publish connection error event.
587
588
        Args:
589
            event (KytosEvent): Event that triggered for error propagation.
590
        """
591
        event.name = \
592
            f"kytos/core.{event.destination.protocol.name}.connection.error"
593
        error_msg = f"Connection state: {event.destination.state}"
594
        event.content["exception"] = error_msg
595
        await self.buffers.app.aput(event)
596
597 1
    async def msg_out_event_handler(self):
598
        """Handle msg_out events.
599
600
        Listen to the msg_out buffer and send all its events to the
601
        corresponding listeners.
602
        """
603 1
        self.log.info("Message Out Event Handler started")
604
        while True:
605 1
            triggered_event = await self.buffers.msg_out.aget()
606
607 1
            if triggered_event.name == "kytos/core.shutdown":
608 1
                self.log.debug("Message Out Event handler stopped")
609 1
                break
610
611 1
            message = triggered_event.content['message']
612 1
            destination = triggered_event.destination
613 1
            try:
614 1
                if (destination and
615
                        not destination.state == ConnectionState.FINISHED):
616 1
                    packet = message.pack()
617 1
                    destination.send(packet)
618 1
                    if logging.DEBUG >= self.log.getEffectiveLevel():
619
                        self.log.debug('Connection %s: OUT OFP, '
620
                                       'version: %s, type: %s, xid: %s - %s',
621
                                       destination.id,
622
                                       message.header.version,
623
                                       message.header.message_type,
624
                                       message.header.xid,
625
                                       packet.hex())
626 1
                    self.notify_listeners(triggered_event)
627 1
                    continue
628
629
            except (OSError, SocketError):
630
                pass
631
632
            await self.publish_connection_error(triggered_event)
633
            self.log.info("connection closed. Cannot send message")
634
635 1
    def get_interface_by_id(self, interface_id):
636
        """Find a Interface  with interface_id.
637
638
        Args:
639
            interface_id(str): Interface Identifier.
640
641
        Returns:
642
            Interface: Instance of Interface with the id given.
643
644
        """
645 1
        if interface_id is None:
646 1
            return None
647
648 1
        switch_id = ":".join(interface_id.split(":")[:-1])
649 1
        interface_number = int(interface_id.split(":")[-1])
650
651 1
        switch = self.switches.get(switch_id)
652
653 1
        if not switch:
654 1
            return None
655
656 1
        return switch.interfaces.get(interface_number, None)
657
658 1
    def get_switch_by_dpid(self, dpid):
659
        """Return a specific switch by dpid.
660
661
        Args:
662
            dpid (|DPID|): dpid object used to identify a switch.
663
664
        Returns:
665
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
666
667
        """
668 1
        return self.switches.get(dpid)
669
670 1
    def get_switch_or_create(self, dpid, connection=None):
671
        """Return switch or create it if necessary.
672
673
        Args:
674
            dpid (|DPID|): dpid object used to identify a switch.
675
            connection (:class:`~kytos.core.connection.Connection`):
676
                connection used by switch. If a switch has a connection that
677
                will be updated.
678
679
        Returns:
680
            :class:`~kytos.core.switch.Switch`: new or existent switch.
681
682
        """
683 1
        with self._switches_lock:
684 1
            if connection:
685 1
                self.create_or_update_connection(connection)
686
687 1
            switch = self.get_switch_by_dpid(dpid)
688 1
            event_name = 'kytos/core.switch.'
689
690 1
            if switch is None:
691 1
                switch = Switch(dpid=dpid)
692 1
                self.add_new_switch(switch)
693 1
                event_name += 'new'
694
            else:
695 1
                event_name += 'reconnected'
696
697 1
            self.set_switch_options(dpid=dpid)
698 1
            event = KytosEvent(name=event_name, content={'switch': switch})
699
700 1
            if connection:
701 1
                old_connection = switch.connection
702 1
                switch.update_connection(connection)
703
704 1
                if old_connection is not connection:
705 1
                    self.remove_connection(old_connection)
706
707 1
            self.buffers.app.put(event)
708
709 1
            return switch
710
711 1
    def set_switch_options(self, dpid):
712
        """Update the switch settings based on kytos.conf options.
713
714
        Args:
715
            dpid (str): dpid used to identify a switch.
716
717
        """
718 1
        switch = self.switches.get(dpid)
719 1
        if not switch:
720
            return
721
722 1
        vlan_pool = {}
723 1
        vlan_pool = self.options.vlan_pool
724 1
        if not vlan_pool:
725 1
            return
726
727 1
        if vlan_pool.get(dpid):
728 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
729 1
            for intf_num, port_list in vlan_pool[dpid].items():
730 1
                if not switch.interfaces.get((intf_num)):
731 1
                    vlan_ids = set()
732 1
                    for vlan_range in port_list:
733 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
734 1
                        for vlan_id in range(vlan_begin, vlan_end):
735 1
                            vlan_ids.add(vlan_id)
736 1
                    intf_num = int(intf_num)
737 1
                    intf = Interface(name=intf_num, port_number=intf_num,
738
                                     switch=switch)
739 1
                    intf.set_available_tags(vlan_ids)
740 1
                    switch.update_interface(intf)
741
742 1
    def create_or_update_connection(self, connection):
743
        """Update a connection.
744
745
        Args:
746
            connection (:class:`~kytos.core.connection.Connection`):
747
                Instance of connection that will be updated.
748
        """
749 1
        self.connections[connection.id] = connection
750
751 1
    def get_connection_by_id(self, conn_id):
752
        """Return a existent connection by id.
753
754
        Args:
755
            id (int): id from a connection.
756
757
        Returns:
758
            :class:`~kytos.core.connection.Connection`: Instance of connection
759
                or None Type.
760
761
        """
762 1
        return self.connections.get(conn_id)
763
764 1
    def remove_connection(self, connection):
765
        """Close a existent connection and remove it.
766
767
        Args:
768
            connection (:class:`~kytos.core.connection.Connection`):
769
                Instance of connection that will be removed.
770
        """
771 1
        if connection is None:
772 1
            return False
773
774 1
        try:
775 1
            connection.close()
776 1
            del self.connections[connection.id]
777 1
        except KeyError:
778 1
            return False
779 1
        return True
780
781 1
    def remove_switch(self, switch):
782
        """Remove an existent switch.
783
784
        Args:
785
            switch (:class:`~kytos.core.switch.Switch`):
786
                Instance of switch that will be removed.
787
        """
788 1
        try:
789 1
            del self.switches[switch.dpid]
790 1
        except KeyError:
791 1
            return False
792 1
        return True
793
794 1
    def new_connection(self, event):
795
        """Handle a kytos/core.connection.new event.
796
797
        This method will read new connection event and store the connection
798
        (socket) into the connections attribute on the controller.
799
800
        It also clear all references to the connection since it is a new
801
        connection on the same ip:port.
802
803
        Args:
804
            event (~kytos.core.KytosEvent):
805
                The received event (``kytos/core.connection.new``) with the
806
                needed info.
807
        """
808 1
        self.log.info("Handling %s...", event)
809
810 1
        connection = event.source
811 1
        self.log.debug("Event source: %s", event.source)
812
813
        # Remove old connection (aka cleanup) if it exists
814 1
        if self.get_connection_by_id(connection.id):
815
            self.remove_connection(connection.id)
816
817
        # Update connections with the new connection
818 1
        self.create_or_update_connection(connection)
819
820 1
    def add_new_switch(self, switch):
821
        """Add a new switch on the controller.
822
823
        Args:
824
            switch (Switch): A Switch object
825
        """
826 1
        self.switches[switch.dpid] = switch
827
828 1
    def _import_napp(self, username, napp_name):
829
        """Import a NApp module.
830
831
        Raises:
832
            FileNotFoundError: if NApp's main.py is not found.
833
            ModuleNotFoundError: if any NApp requirement is not installed.
834
835
        """
836 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
837 1
        path = os.path.join(self.options.napps, username, napp_name,
838
                            'main.py')
839 1
        napp_spec = spec_from_file_location(mod_name, path)
840 1
        napp_module = module_from_spec(napp_spec)
841 1
        sys.modules[napp_spec.name] = napp_module
842 1
        napp_spec.loader.exec_module(napp_module)
843 1
        return napp_module
844
845 1
    def load_napp(self, username, napp_name):
846
        """Load a single NApp.
847
848
        Args:
849
            username (str): NApp username (makes up NApp's path).
850
            napp_name (str): Name of the NApp to be loaded.
851
        """
852 1
        if (username, napp_name) in self.napps:
853 1
            message = 'NApp %s/%s was already loaded'
854 1
            self.log.warning(message, username, napp_name)
855 1
            return
856
857 1
        try:
858 1
            napp_module = self._import_napp(username, napp_name)
859 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
860 1
            self.log.error("Error loading NApp '%s/%s': %s",
861
                           username, napp_name, err)
862 1
            return
863 1
        except FileNotFoundError as err:
864 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
865 1
            self.log.warning(msg, err.filename)
866 1
            return
867
868 1
        try:
869 1
            napp = napp_module.Main(controller=self)
870 1
        except:  # noqa pylint: disable=bare-except
871 1
            self.log.critical("NApp initialization failed: %s/%s",
872
                              username, napp_name, exc_info=True)
873 1
            return
874
875 1
        self.napps[(username, napp_name)] = napp
876
877 1
        napp.start()
878 1
        self.api_server.authenticate_endpoints(napp)
879 1
        self.api_server.register_napp_endpoints(napp)
880
881
        # pylint: disable=protected-access
882 1
        for event, listeners in napp._listeners.items():
883
            self.events_listeners.setdefault(event, []).extend(listeners)
884
        # pylint: enable=protected-access
885
886 1
    def pre_install_napps(self, napps, enable=True):
887
        """Pre install and enable NApps.
888
889
        Before installing, it'll check if it's installed yet.
890
891
        Args:
892
            napps ([str]): List of NApps to be pre-installed and enabled.
893
        """
894 1
        all_napps = self.napps_manager.get_installed_napps()
895 1
        installed = [str(napp) for napp in all_napps]
896 1
        napps_diff = [napp for napp in napps if napp not in installed]
897 1
        for napp in napps_diff:
898 1
            self.napps_manager.install(napp, enable=enable)
899
900 1
    def load_napps(self):
901
        """Load all NApps enabled on the NApps dir."""
902 1
        for napp in self.napps_manager.get_enabled_napps():
903 1
            try:
904 1
                self.log.info("Loading NApp %s", napp.id)
905 1
                self.load_napp(napp.username, napp.name)
906
            except FileNotFoundError as exception:
907
                self.log.error("Could not load NApp %s: %s",
908
                               napp.id, exception)
909
910 1
    def unload_napp(self, username, napp_name):
911
        """Unload a specific NApp.
912
913
        Args:
914
            username (str): NApp username.
915
            napp_name (str): Name of the NApp to be unloaded.
916
        """
917 1
        napp = self.napps.pop((username, napp_name), None)
918
919 1
        if napp is None:
920
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
921
        else:
922 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
923 1
            napp_id = NApp(username, napp_name).id
924 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
925 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
926
            # Call listener before removing it from events_listeners
927 1
            napp_shutdown_fn(event)
928
929
            # Remove rest endpoints from that napp
930 1
            self.api_server.remove_napp_endpoints(napp)
931
932
            # Removing listeners from that napp
933
            # pylint: disable=protected-access
934 1
            for event_type, napp_listeners in napp._listeners.items():
935
                event_listeners = self.events_listeners[event_type]
936
                for listener in napp_listeners:
937
                    event_listeners.remove(listener)
938
                if not event_listeners:
939
                    del self.events_listeners[event_type]
940
            # pylint: enable=protected-access
941
942 1
    def unload_napps(self):
943
        """Unload all loaded NApps that are not core NApps."""
944
        # list() is used here to avoid the error:
945
        # 'RuntimeError: dictionary changed size during iteration'
946
        # This is caused by looping over an dictionary while removing
947
        # items from it.
948
        for (username, napp_name) in list(self.napps.keys()):  # noqa
949
            self.unload_napp(username, napp_name)
950
951 1
    def reload_napp_module(self, username, napp_name, napp_file):
952
        """Reload a NApp Module."""
953 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
954 1
        try:
955 1
            napp_module = import_module(mod_name)
956 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
957 1
            self.log.error("Module '%s' not found", mod_name)
958 1
            raise
959 1
        try:
960 1
            napp_module = reload_module(napp_module)
961 1
        except ImportError as err:
962 1
            self.log.error("Error reloading NApp '%s/%s': %s",
963
                           username, napp_name, err)
964 1
            raise
965
966 1
    def reload_napp(self, username, napp_name):
967
        """Reload a NApp."""
968 1
        self.unload_napp(username, napp_name)
969 1
        try:
970 1
            self.reload_napp_module(username, napp_name, 'settings')
971 1
            self.reload_napp_module(username, napp_name, 'main')
972 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
973 1
            return 400
974 1
        self.log.info("NApp '%s/%s' successfully reloaded",
975
                      username, napp_name)
976 1
        self.load_napp(username, napp_name)
977 1
        return 200
978
979 1
    def rest_reload_napp(self, username, napp_name):
980
        """Request reload a NApp."""
981 1
        res = self.reload_napp(username, napp_name)
982 1
        return 'reloaded', res
983
984 1
    def rest_reload_all_napps(self):
985
        """Request reload all NApps."""
986 1
        for napp in self.napps:
987 1
            self.reload_napp(*napp)
988
        return 'reloaded', 200
989