Passed
Pull Request — master (#213)
by Vinicius
10:19 queued 06:10
created

kytos.core.controller   F

Complexity

Total Complexity 120

Size/Duplication

Total Lines 986
Duplicated Lines 0 %

Test Coverage

Coverage 88.42%

Importance

Changes 0
Metric Value
eloc 489
dl 0
loc 986
ccs 397
cts 449
cp 0.8842
rs 2
c 0
b 0
f 0
wmc 120

44 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.__init__() 0 75 2
A Controller.toggle_debug() 0 36 5
A Controller.enable_logs() 0 5 1
A Controller.loggers() 0 9 1
A Controller.rest_reload_napp() 0 4 1
A Controller.publish_connection_error() 0 11 1
A Controller.start() 0 10 4
B Controller.notify_listeners() 0 22 6
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
B Controller.create_pidfile() 0 47 5
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller.init_apm_or_core_shutdown() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
B Controller.get_switch_or_create() 0 40 6
A Controller.raw_event_handler() 0 15 3
A Controller.configuration_endpoint() 0 8 1
A Controller.msg_in_event_handler() 0 15 3
B Controller.start_controller() 0 64 1
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
B Controller.set_switch_options() 0 30 8
A Controller.metadata_endpoint() 0 12 1
A Controller.remove_switch() 0 12 2
A Controller.db_conn_or_core_shutdown() 0 6 2
B Controller.msg_out_event_handler() 0 37 6
B Controller.stop_controller() 0 62 6
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller.status() 0 13 2
A Controller.pre_install_napps() 0 13 2
A Controller.app_event_handler() 0 15 3
A Controller._register_endpoints() 0 20 1
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 39 6
A Controller.unload_napp() 0 30 5

1 Function

Rating   Name   Duplication   Size   Complexity  
A exc_handler() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29 1
from socket import error as SocketError
30
31 1
from kytos.core.api_server import APIServer
32 1
from kytos.core.apm import init_apm
33
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executors, now
44 1
from kytos.core.interface import Interface
45 1
from kytos.core.logs import LogManager
46 1
from kytos.core.napps.base import NApp
47 1
from kytos.core.napps.manager import NAppsManager
48 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 1
from kytos.core.switch import Switch
50
51 1
__all__ = ('Controller',)
52
53
54 1
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 1
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
81 1
    def __init__(self, options=None, loop=None):
82
        """Init method of Controller class takes the parameters below.
83
84
        Args:
85
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
86
                instance of :class:`~kytos.core.config.KytosConfig` class.
87
        """
88 1
        if options is None:
89
            options = KytosConfig().options['daemon']
90
91 1
        self._loop = loop or asyncio.get_event_loop()
92 1
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 1
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 1
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 1
        self.buffers = KytosBuffers(loop=self._loop)
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 1
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 1
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 1
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 1
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 1
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 1
        self.switches = {}  # dpid: Switch()
129 1
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 1
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 1
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 1
        self.napp_dir_listener = NAppDirListener(self)
139
140 1
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 1
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 1
        self.auth = Auth(self)
148 1
        self.dead_letter = DeadLetter(self)
149
150 1
        self._register_endpoints()
151
        #: Adding the napps 'enabled' directory into the PATH
152
        #: Now you can access the enabled napps with:
153
        #: from napps.<username>.<napp_name> import ?....
154 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
155 1
        sys.excepthook = exc_handler
156
157 1
    def enable_logs(self):
158
        """Register kytos log and enable the logs."""
159 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
160 1
        LogManager.enable_websocket(self.api_server.server)
161 1
        self.log = logging.getLogger(__name__)
162
163 1
    @staticmethod
164 1
    def loggers():
165
        """List all logging Loggers.
166
167
        Return a list of Logger objects, with name and logging level.
168
        """
169 1
        return [logging.getLogger(name)
170
                for name in logging.root.manager.loggerDict
171
                if "kytos" in name]
172
173 1
    def toggle_debug(self, name=None):
174
        """Enable/disable logging debug messages to a given logger name.
175
176
        If the name parameter is not specified the debug will be
177
        enabled/disabled following the initial config file. It will decide
178
        to enable/disable using the 'kytos' name to find the current
179
        logger level.
180
        Obs: To disable the debug the logging will be set to NOTSET
181
182
        Args:
183
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
184
        """
185 1
        if name and name not in logging.root.manager.loggerDict:
186
            # A Logger name that is not declared in logging will raise an error
187
            # otherwise logging would create a new Logger.
188 1
            raise ValueError(f"Invalid logger name: {name}")
189
190 1
        if not name:
191
            # Logger name not specified.
192 1
            level = logging.getLogger('kytos').getEffectiveLevel()
193 1
            enable_debug = level != logging.DEBUG
194
195
            # Enable/disable default Loggers
196 1
            LogManager.load_config_file(self.options.logging, enable_debug)
197 1
            return
198
199
        # Get effective logger level for the name
200 1
        level = logging.getLogger(name).getEffectiveLevel()
201 1
        logger = logging.getLogger(name)
202
203 1
        if level == logging.DEBUG:
204
            # disable debug
205 1
            logger.setLevel(logging.NOTSET)
206
        else:
207
            # enable debug
208 1
            logger.setLevel(logging.DEBUG)
209
210 1
    def start(self, restart=False):
211
        """Create pidfile and call start_controller method."""
212 1
        self.enable_logs()
213 1
        if self.options.database:
214 1
            self.db_conn_or_core_shutdown()
215 1
        if self.options.apm:
216 1
            self.init_apm_or_core_shutdown()
217 1
        if not restart:
218 1
            self.create_pidfile()
219 1
        self.start_controller()
220
221 1
    def create_pidfile(self):
222
        """Create a pidfile."""
223 1
        pid = os.getpid()
224
225
        # Creates directory if it doesn't exist
226
        # System can erase /var/run's content
227 1
        pid_folder = Path(self.options.pidfile).parent
228 1
        self.log.info(pid_folder)
229
230
        # Pylint incorrectly infers Path objects
231
        # https://github.com/PyCQA/pylint/issues/224
232
        # pylint: disable=no-member
233 1
        if not pid_folder.exists():
234
            pid_folder.mkdir()
235
            pid_folder.chmod(0o1777)
236
        # pylint: enable=no-member
237
238
        # Make sure the file is deleted when controller stops
239 1
        atexit.register(Path(self.options.pidfile).unlink)
240
241
        # Checks if a pidfile exists. Creates a new file.
242 1
        try:
243 1
            pidfile = open(self.options.pidfile, mode='x')
244 1
        except OSError:
245
            # This happens if there is a pidfile already.
246
            # We shall check if the process that created the pidfile is still
247
            # running.
248 1
            try:
249 1
                existing_file = open(self.options.pidfile, mode='r')
250 1
                old_pid = int(existing_file.read())
251 1
                os.kill(old_pid, 0)
252
                # If kill() doesn't return an error, there's a process running
253
                # with the same PID. We assume it is Kytos and quit.
254
                # Otherwise, overwrite the file and proceed.
255
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
256
                             "running. Aborting.")
257
                sys.exit(error_msg.format(self.options.pidfile))
258 1
            except OSError:
259 1
                try:
260 1
                    pidfile = open(self.options.pidfile, mode='w')
261
                except OSError as exception:
262
                    error_msg = "Failed to create pidfile {}: {}."
263
                    sys.exit(error_msg.format(self.options.pidfile, exception))
264
265
        # Identifies the process that created the pidfile.
266 1
        pidfile.write(str(pid))
267 1
        pidfile.close()
268
269 1
    def start_controller(self):
270
        """Start the controller.
271
272
        Starts the KytosServer (TCP Server) coroutine.
273
        Starts a thread for each buffer handler.
274
        Load the installed apps.
275
        """
276 1
        self.log.info("Starting Kytos - Kytos Controller")
277 1
        self.server = KytosServer((self.options.listen,
278
                                   int(self.options.port)),
279
                                  KytosServerProtocol,
280
                                  self,
281
                                  self.options.protocol_name)
282
283 1
        self.log.info("Starting TCP server: %s", self.server)
284 1
        self.server.serve_forever()
285
286 1
        def _stop_loop(_):
287
            loop = asyncio.get_event_loop()
288
            # print(_.result())
289
            threads = threading.enumerate()
290
            self.log.debug("%s threads before loop.stop: %s",
291
                           len(threads), threads)
292
            loop.stop()
293
294 1
        async def _run_api_server_thread(executor):
295
            log = logging.getLogger('kytos.core.controller.api_server_thread')
296
            log.debug('starting')
297
            # log.debug('creating tasks')
298
            loop = asyncio.get_event_loop()
299
            blocking_tasks = [
300
                loop.run_in_executor(executor, self.api_server.run)
301
            ]
302
            # log.debug('waiting for tasks')
303
            completed, pending = await asyncio.wait(blocking_tasks)
304
            # results = [t.result() for t in completed]
305
            # log.debug('results: {!r}'.format(results))
306
            log.debug('completed: %d, pending: %d',
307
                      len(completed), len(pending))
308
309 1
        task = self._loop.create_task(self.raw_event_handler())
310 1
        self._tasks.append(task)
311 1
        task = self._loop.create_task(self.msg_in_event_handler())
312 1
        self._tasks.append(task)
313 1
        task = self._loop.create_task(self.msg_out_event_handler())
314 1
        self._tasks.append(task)
315 1
        task = self._loop.create_task(self.app_event_handler())
316 1
        self._tasks.append(task)
317 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
318 1
        task.add_done_callback(_stop_loop)
319 1
        self._tasks.append(task)
320
321 1
        self.log.info("ThreadPool started: %s", self._pool)
322
323
        # ASYNC TODO: ensure all threads started correctly
324
        # This is critical, if any of them failed starting we should exit.
325
        # sys.exit(error_msg.format(thread, exception))
326
327 1
        self.log.info("Loading Kytos NApps...")
328 1
        self.napp_dir_listener.start()
329 1
        self.pre_install_napps(self.options.napps_pre_installed)
330 1
        self.load_napps()
331
332 1
        self.started_at = now()
333
334 1
    def db_conn_or_core_shutdown(self):
335
        """Ensure db connection or core shutdown."""
336 1
        try:
337 1
            db_conn_wait(db_backend=self.options.database)
338 1
        except KytosDBInitException as exc:
339 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
340
341 1
    def init_apm_or_core_shutdown(self, **kwargs):
342
        """Init APM instrumentation or core shutdown."""
343
        if not self.options.apm:
344
            return
345
        try:
346
            init_apm(self.options.apm, app=self.api_server.app,
347
                     **kwargs)
348
        except KytosAPMInitException as exc:
349
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
350
351 1
    def _register_endpoints(self):
352
        """Register all rest endpoint served by kytos.
353
354
        -   Register APIServer endpoints
355
        -   Register WebUI endpoints
356
        -   Register ``/api/kytos/core/config`` endpoint
357
        """
358 1
        self.api_server.start_api()
359
        # Register controller endpoints as /api/kytos/core/...
360 1
        self.api_server.register_core_endpoint('config/',
361
                                               self.configuration_endpoint)
362 1
        self.api_server.register_core_endpoint('metadata/',
363
                                               Controller.metadata_endpoint)
364 1
        self.api_server.register_core_endpoint(
365
            'reload/<username>/<napp_name>/',
366
            self.rest_reload_napp)
367 1
        self.api_server.register_core_endpoint('reload/all',
368
                                               self.rest_reload_all_napps)
369 1
        self.auth.register_core_auth_services()
370 1
        self.dead_letter.register_endpoints()
371
372 1
    def register_rest_endpoint(self, url, function, methods):
373
        """Deprecate in favor of @rest decorator."""
374 1
        self.api_server.register_rest_endpoint(url, function, methods)
375
376 1
    @classmethod
377 1
    def metadata_endpoint(cls):
378
        """Return the Kytos metadata.
379
380
        Returns:
381
            string: Json with current kytos metadata.
382
383
        """
384 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
385 1
        meta_file = open(meta_path).read()
386 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
387 1
        return json.dumps(metadata)
388
389 1
    def configuration_endpoint(self):
390
        """Return the configuration options used by Kytos.
391
392
        Returns:
393
            string: Json with current configurations used by kytos.
394
395
        """
396 1
        return json.dumps(self.options.__dict__)
397
398 1
    def restart(self, graceful=True):
399
        """Restart Kytos SDN Controller.
400
401
        Args:
402
            graceful(bool): Represents the way that Kytos will restart.
403
        """
404 1
        if self.started_at is not None:
405 1
            self.stop(graceful)
406 1
            self.__init__(self.options)
407
408 1
        self.start(restart=True)
409
410 1
    def stop(self, graceful=True):
411
        """Shutdown all services used by kytos.
412
413
        This method should:
414
            - stop all Websockets
415
            - stop the API Server
416
            - stop the Controller
417
        """
418 1
        if self.started_at:
419 1
            self.stop_controller(graceful)
420
421 1
    def stop_controller(self, graceful=True):
422
        """Stop the controller.
423
424
        This method should:
425
            - announce on the network that the controller will shutdown;
426
            - stop receiving incoming packages;
427
            - call the 'shutdown' method of each KytosNApp that is running;
428
            - finish reading the events on all buffers;
429
            - stop each running handler;
430
            - stop all running threads;
431
            - stop the KytosServer;
432
        """
433 1
        self.log.info("Stopping Kytos")
434
435 1
        self.buffers.send_stop_signal()
436 1
        self.api_server.stop_api_server()
437 1
        self.napp_dir_listener.stop()
438
439 1
        self.log.info("Stopping threadpool: %s", self._pool)
440 1
        for pool_name in executors:
441 1
            self.log.info("Stopping threadpool: %s", pool_name)
442
443 1
        threads = threading.enumerate()
444 1
        self.log.debug("%s threads before threadpool shutdown: %s",
445
                       len(threads), threads)
446
447 1
        try:
448
            # Python >= 3.9
449
            # pylint: disable=unexpected-keyword-arg
450 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
451 1
            for executor_pool in executors.values():
452 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
453
        except TypeError:
454
            self._pool.shutdown(wait=graceful)
455
            for executor_pool in executors.values():
456
                executor_pool.shutdown(wait=graceful)
457
458
        # self.server.socket.shutdown()
459
        # self.server.socket.close()
460
461
        # for thread in self._threads.values():
462
        #     self.log.info("Stopping thread: %s", thread.name)
463
        #     thread.join()
464
465
        # for thread in self._threads.values():
466
        #     while thread.is_alive():
467
        #         self.log.info("Thread is alive: %s", thread.name)
468
        #         pass
469
470 1
        self.started_at = None
471 1
        self.unload_napps()
472 1
        self.buffers = KytosBuffers()
473
474
        # Cancel all async tasks (event handlers and servers)
475 1
        for task in self._tasks:
476
            task.cancel()
477
478
        # ASYNC TODO: close connections
479
        # self.server.server_close()
480
481
        # Shutdown the TCP server and the main asyncio loop
482 1
        self.server.shutdown()
483
484 1
    def status(self):
485
        """Return status of Kytos Server.
486
487
        If the controller kytos is running this method will be returned
488
        "Running since 'Started_At'", otherwise "Stopped".
489
490
        Returns:
491
            string: String with kytos status.
492
493
        """
494 1
        if self.started_at:
495 1
            return "Running since %s" % self.started_at
496 1
        return "Stopped"
497
498 1
    def uptime(self):
499
        """Return the uptime of kytos server.
500
501
        This method should return:
502
            - 0 if Kytos Server is stopped.
503
            - (kytos.start_at - datetime.now) if Kytos Server is running.
504
505
        Returns:
506
           datetime.timedelta: The uptime interval.
507
508
        """
509 1
        return now() - self.started_at if self.started_at else 0
510
511 1
    def notify_listeners(self, event):
512
        """Send the event to the specified listeners.
513
514
        Loops over self.events_listeners matching (by regexp) the attribute
515
        name of the event with the keys of events_listeners. If a match occurs,
516
        then send the event to each registered listener.
517
518
        Args:
519
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
520
        """
521 1
        self.log.debug("looking for listeners for %s", event)
522 1
        for event_regex, listeners in dict(self.events_listeners).items():
523
            # self.log.debug("listeners found for %s: %r => %s", event,
524
            #                event_regex, [l.__qualname__ for l in listeners])
525
            # Do not match if the event has more characters
526
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
527 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
528 1
                event_regex += '$'
529 1
            if re.match(event_regex, event.name):
530
                # self.log.debug('Calling listeners for %s', event)
531 1
                for listener in listeners:
532 1
                    listener(event)
533
534 1
    async def raw_event_handler(self):
535
        """Handle raw events.
536
537
        Listen to the raw_buffer and send all its events to the
538
        corresponding listeners.
539
        """
540 1
        self.log.info("Raw Event Handler started")
541
        while True:
542 1
            event = await self.buffers.raw.aget()
543 1
            self.notify_listeners(event)
544 1
            self.log.debug("Raw Event handler called")
545
546 1
            if event.name == "kytos/core.shutdown":
547 1
                self.log.debug("Raw Event handler stopped")
548 1
                break
549
550 1
    async def msg_in_event_handler(self):
551
        """Handle msg_in events.
552
553
        Listen to the msg_in buffer and send all its events to the
554
        corresponding listeners.
555
        """
556 1
        self.log.info("Message In Event Handler started")
557
        while True:
558 1
            event = await self.buffers.msg_in.aget()
559 1
            self.notify_listeners(event)
560 1
            self.log.debug("Message In Event handler called")
561
562 1
            if event.name == "kytos/core.shutdown":
563 1
                self.log.debug("Message In Event handler stopped")
564 1
                break
565
566 1
    async def publish_connection_error(self, event):
567
        """Publish connection error event.
568
569
        Args:
570
            event (KytosEvent): Event that triggered for error propagation.
571
        """
572
        event.name = \
573
            f"kytos/core.{event.destination.protocol.name}.connection.error"
574
        error_msg = f"Connection state: {event.destination.state}"
575
        event.content["exception"] = error_msg
576
        await self.buffers.app.aput(event)
577
578 1
    async def msg_out_event_handler(self):
579
        """Handle msg_out events.
580
581
        Listen to the msg_out buffer and send all its events to the
582
        corresponding listeners.
583
        """
584 1
        self.log.info("Message Out Event Handler started")
585
        while True:
586 1
            triggered_event = await self.buffers.msg_out.aget()
587
588 1
            if triggered_event.name == "kytos/core.shutdown":
589 1
                self.log.debug("Message Out Event handler stopped")
590 1
                break
591
592 1
            message = triggered_event.content['message']
593 1
            destination = triggered_event.destination
594 1
            try:
595 1
                if (destination and
596
                        not destination.state == ConnectionState.FINISHED):
597 1
                    packet = message.pack()
598 1
                    destination.send(packet)
599 1
                    self.log.debug('Connection %s: OUT OFP, '
600
                                   'version: %s, type: %s, xid: %s - %s',
601
                                   destination.id,
602
                                   message.header.version,
603
                                   message.header.message_type,
604
                                   message.header.xid,
605
                                   packet.hex())
606 1
                    self.notify_listeners(triggered_event)
607 1
                    self.log.debug("Message Out Event handler called")
608 1
                    continue
609
610
            except (OSError, SocketError):
611
                pass
612
613
            await self.publish_connection_error(triggered_event)
614
            self.log.info("connection closed. Cannot send message")
615
616 1
    async def app_event_handler(self):
617
        """Handle app events.
618
619
        Listen to the app buffer and send all its events to the
620
        corresponding listeners.
621
        """
622 1
        self.log.info("App Event Handler started")
623
        while True:
624 1
            event = await self.buffers.app.aget()
625 1
            self.notify_listeners(event)
626 1
            self.log.debug("App Event handler called")
627
628 1
            if event.name == "kytos/core.shutdown":
629 1
                self.log.debug("App Event handler stopped")
630 1
                break
631
632 1
    def get_interface_by_id(self, interface_id):
633
        """Find a Interface  with interface_id.
634
635
        Args:
636
            interface_id(str): Interface Identifier.
637
638
        Returns:
639
            Interface: Instance of Interface with the id given.
640
641
        """
642 1
        if interface_id is None:
643 1
            return None
644
645 1
        switch_id = ":".join(interface_id.split(":")[:-1])
646 1
        interface_number = int(interface_id.split(":")[-1])
647
648 1
        switch = self.switches.get(switch_id)
649
650 1
        if not switch:
651 1
            return None
652
653 1
        return switch.interfaces.get(interface_number, None)
654
655 1
    def get_switch_by_dpid(self, dpid):
656
        """Return a specific switch by dpid.
657
658
        Args:
659
            dpid (|DPID|): dpid object used to identify a switch.
660
661
        Returns:
662
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
663
664
        """
665 1
        return self.switches.get(dpid)
666
667 1
    def get_switch_or_create(self, dpid, connection=None):
668
        """Return switch or create it if necessary.
669
670
        Args:
671
            dpid (|DPID|): dpid object used to identify a switch.
672
            connection (:class:`~kytos.core.connection.Connection`):
673
                connection used by switch. If a switch has a connection that
674
                will be updated.
675
676
        Returns:
677
            :class:`~kytos.core.switch.Switch`: new or existent switch.
678
679
        """
680 1
        with self._switches_lock:
681 1
            if connection:
682 1
                self.create_or_update_connection(connection)
683
684 1
            switch = self.get_switch_by_dpid(dpid)
685 1
            event_name = 'kytos/core.switch.'
686
687 1
            if switch is None:
688 1
                switch = Switch(dpid=dpid)
689 1
                self.add_new_switch(switch)
690 1
                event_name += 'new'
691
            else:
692 1
                event_name += 'reconnected'
693
694 1
            self.set_switch_options(dpid=dpid)
695 1
            event = KytosEvent(name=event_name, content={'switch': switch})
696
697 1
            if connection:
698 1
                old_connection = switch.connection
699 1
                switch.update_connection(connection)
700
701 1
                if old_connection is not connection:
702 1
                    self.remove_connection(old_connection)
703
704 1
            self.buffers.app.put(event)
705
706 1
            return switch
707
708 1
    def set_switch_options(self, dpid):
709
        """Update the switch settings based on kytos.conf options.
710
711
        Args:
712
            dpid (str): dpid used to identify a switch.
713
714
        """
715 1
        switch = self.switches.get(dpid)
716 1
        if not switch:
717
            return
718
719 1
        vlan_pool = {}
720 1
        vlan_pool = self.options.vlan_pool
721 1
        if not vlan_pool:
722 1
            return
723
724 1
        if vlan_pool.get(dpid):
725 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
726 1
            for intf_num, port_list in vlan_pool[dpid].items():
727 1
                if not switch.interfaces.get((intf_num)):
728 1
                    vlan_ids = set()
729 1
                    for vlan_range in port_list:
730 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
731 1
                        for vlan_id in range(vlan_begin, vlan_end):
732 1
                            vlan_ids.add(vlan_id)
733 1
                    intf_num = int(intf_num)
734 1
                    intf = Interface(name=intf_num, port_number=intf_num,
735
                                     switch=switch)
736 1
                    intf.set_available_tags(vlan_ids)
737 1
                    switch.update_interface(intf)
738
739 1
    def create_or_update_connection(self, connection):
740
        """Update a connection.
741
742
        Args:
743
            connection (:class:`~kytos.core.connection.Connection`):
744
                Instance of connection that will be updated.
745
        """
746 1
        self.connections[connection.id] = connection
747
748 1
    def get_connection_by_id(self, conn_id):
749
        """Return a existent connection by id.
750
751
        Args:
752
            id (int): id from a connection.
753
754
        Returns:
755
            :class:`~kytos.core.connection.Connection`: Instance of connection
756
                or None Type.
757
758
        """
759 1
        return self.connections.get(conn_id)
760
761 1
    def remove_connection(self, connection):
762
        """Close a existent connection and remove it.
763
764
        Args:
765
            connection (:class:`~kytos.core.connection.Connection`):
766
                Instance of connection that will be removed.
767
        """
768 1
        if connection is None:
769 1
            return False
770
771 1
        try:
772 1
            connection.close()
773 1
            del self.connections[connection.id]
774 1
        except KeyError:
775 1
            return False
776 1
        return True
777
778 1
    def remove_switch(self, switch):
779
        """Remove an existent switch.
780
781
        Args:
782
            switch (:class:`~kytos.core.switch.Switch`):
783
                Instance of switch that will be removed.
784
        """
785 1
        try:
786 1
            del self.switches[switch.dpid]
787 1
        except KeyError:
788 1
            return False
789 1
        return True
790
791 1
    def new_connection(self, event):
792
        """Handle a kytos/core.connection.new event.
793
794
        This method will read new connection event and store the connection
795
        (socket) into the connections attribute on the controller.
796
797
        It also clear all references to the connection since it is a new
798
        connection on the same ip:port.
799
800
        Args:
801
            event (~kytos.core.KytosEvent):
802
                The received event (``kytos/core.connection.new``) with the
803
                needed info.
804
        """
805 1
        self.log.info("Handling %s...", event)
806
807 1
        connection = event.source
808 1
        self.log.debug("Event source: %s", event.source)
809
810
        # Remove old connection (aka cleanup) if it exists
811 1
        if self.get_connection_by_id(connection.id):
812
            self.remove_connection(connection.id)
813
814
        # Update connections with the new connection
815 1
        self.create_or_update_connection(connection)
816
817 1
    def add_new_switch(self, switch):
818
        """Add a new switch on the controller.
819
820
        Args:
821
            switch (Switch): A Switch object
822
        """
823 1
        self.switches[switch.dpid] = switch
824
825 1
    def _import_napp(self, username, napp_name):
826
        """Import a NApp module.
827
828
        Raises:
829
            FileNotFoundError: if NApp's main.py is not found.
830
            ModuleNotFoundError: if any NApp requirement is not installed.
831
832
        """
833 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
834 1
        path = os.path.join(self.options.napps, username, napp_name,
835
                            'main.py')
836 1
        napp_spec = spec_from_file_location(mod_name, path)
837 1
        napp_module = module_from_spec(napp_spec)
838 1
        sys.modules[napp_spec.name] = napp_module
839 1
        napp_spec.loader.exec_module(napp_module)
840 1
        return napp_module
841
842 1
    def load_napp(self, username, napp_name):
843
        """Load a single NApp.
844
845
        Args:
846
            username (str): NApp username (makes up NApp's path).
847
            napp_name (str): Name of the NApp to be loaded.
848
        """
849 1
        if (username, napp_name) in self.napps:
850 1
            message = 'NApp %s/%s was already loaded'
851 1
            self.log.warning(message, username, napp_name)
852 1
            return
853
854 1
        try:
855 1
            napp_module = self._import_napp(username, napp_name)
856 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
857 1
            self.log.error("Error loading NApp '%s/%s': %s",
858
                           username, napp_name, err)
859 1
            return
860 1
        except FileNotFoundError as err:
861 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
862 1
            self.log.warning(msg, err.filename)
863 1
            return
864
865 1
        try:
866 1
            napp = napp_module.Main(controller=self)
867 1
        except:  # noqa pylint: disable=bare-except
868 1
            self.log.critical("NApp initialization failed: %s/%s",
869
                              username, napp_name, exc_info=True)
870 1
            return
871
872 1
        self.napps[(username, napp_name)] = napp
873
874 1
        napp.start()
875 1
        self.api_server.authenticate_endpoints(napp)
876 1
        self.api_server.register_napp_endpoints(napp)
877
878
        # pylint: disable=protected-access
879 1
        for event, listeners in napp._listeners.items():
880
            self.events_listeners.setdefault(event, []).extend(listeners)
881
        # pylint: enable=protected-access
882
883 1
    def pre_install_napps(self, napps, enable=True):
884
        """Pre install and enable NApps.
885
886
        Before installing, it'll check if it's installed yet.
887
888
        Args:
889
            napps ([str]): List of NApps to be pre-installed and enabled.
890
        """
891 1
        all_napps = self.napps_manager.get_installed_napps()
892 1
        installed = [str(napp) for napp in all_napps]
893 1
        napps_diff = [napp for napp in napps if napp not in installed]
894 1
        for napp in napps_diff:
895 1
            self.napps_manager.install(napp, enable=enable)
896
897 1
    def load_napps(self):
898
        """Load all NApps enabled on the NApps dir."""
899 1
        for napp in self.napps_manager.get_enabled_napps():
900 1
            try:
901 1
                self.log.info("Loading NApp %s", napp.id)
902 1
                self.load_napp(napp.username, napp.name)
903
            except FileNotFoundError as exception:
904
                self.log.error("Could not load NApp %s: %s",
905
                               napp.id, exception)
906
907 1
    def unload_napp(self, username, napp_name):
908
        """Unload a specific NApp.
909
910
        Args:
911
            username (str): NApp username.
912
            napp_name (str): Name of the NApp to be unloaded.
913
        """
914 1
        napp = self.napps.pop((username, napp_name), None)
915
916 1
        if napp is None:
917
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
918
        else:
919 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
920 1
            napp_id = NApp(username, napp_name).id
921 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
922 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
923
            # Call listener before removing it from events_listeners
924 1
            napp_shutdown_fn(event)
925
926
            # Remove rest endpoints from that napp
927 1
            self.api_server.remove_napp_endpoints(napp)
928
929
            # Removing listeners from that napp
930
            # pylint: disable=protected-access
931 1
            for event_type, napp_listeners in napp._listeners.items():
932
                event_listeners = self.events_listeners[event_type]
933
                for listener in napp_listeners:
934
                    event_listeners.remove(listener)
935
                if not event_listeners:
936
                    del self.events_listeners[event_type]
937
            # pylint: enable=protected-access
938
939 1
    def unload_napps(self):
940
        """Unload all loaded NApps that are not core NApps."""
941
        # list() is used here to avoid the error:
942
        # 'RuntimeError: dictionary changed size during iteration'
943
        # This is caused by looping over an dictionary while removing
944
        # items from it.
945
        for (username, napp_name) in list(self.napps.keys()):  # noqa
946
            self.unload_napp(username, napp_name)
947
948 1
    def reload_napp_module(self, username, napp_name, napp_file):
949
        """Reload a NApp Module."""
950 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
951 1
        try:
952 1
            napp_module = import_module(mod_name)
953 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
954 1
            self.log.error("Module '%s' not found", mod_name)
955 1
            raise
956 1
        try:
957 1
            napp_module = reload_module(napp_module)
958 1
        except ImportError as err:
959 1
            self.log.error("Error reloading NApp '%s/%s': %s",
960
                           username, napp_name, err)
961 1
            raise
962
963 1
    def reload_napp(self, username, napp_name):
964
        """Reload a NApp."""
965 1
        self.unload_napp(username, napp_name)
966 1
        try:
967 1
            self.reload_napp_module(username, napp_name, 'settings')
968 1
            self.reload_napp_module(username, napp_name, 'main')
969 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
970 1
            return 400
971 1
        self.log.info("NApp '%s/%s' successfully reloaded",
972
                      username, napp_name)
973 1
        self.load_napp(username, napp_name)
974 1
        return 200
975
976 1
    def rest_reload_napp(self, username, napp_name):
977
        """Request reload a NApp."""
978 1
        res = self.reload_napp(username, napp_name)
979 1
        return 'reloaded', res
980
981 1
    def rest_reload_all_napps(self):
982
        """Request reload all NApps."""
983 1
        for napp in self.napps:
984 1
            self.reload_napp(*napp)
985
        return 'reloaded', 200
986