Passed
Pull Request — master (#258)
by Vinicius
04:43
created

kytos.core.controller   F

Complexity

Total Complexity 115

Size/Duplication

Total Lines 957
Duplicated Lines 0 %

Test Coverage

Coverage 87.47%

Importance

Changes 0
Metric Value
eloc 477
dl 0
loc 957
ccs 384
cts 439
cp 0.8747
rs 2
c 0
b 0
f 0
wmc 115

42 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.publish_connection_error() 0 11 1
A Controller.start() 0 10 4
B Controller.create_pidfile() 0 47 5
A Controller.toggle_debug() 0 36 5
A Controller.enable_logs() 0 5 1
A Controller.loggers() 0 9 1
A Controller.restart() 0 11 2
A Controller.init_apm_or_core_shutdown() 0 9 3
A Controller.configuration_endpoint() 0 8 1
B Controller.start_controller() 0 66 1
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.__init__() 0 76 2
A Controller.metadata_endpoint() 0 12 1
A Controller.db_conn_or_core_shutdown() 0 6 2
B Controller.stop_controller() 0 62 6
A Controller.stop() 0 10 2
A Controller.status() 0 13 2
A Controller._register_endpoints() 0 20 1
A Controller.rest_reload_napp() 0 4 1
B Controller.notify_listeners() 0 28 7
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller.event_handler() 0 10 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
B Controller.get_switch_or_create() 0 40 6
A Controller.get_interface_by_id() 0 22 3
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
B Controller.set_switch_options() 0 30 8
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 36 6
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.pre_install_napps() 0 13 2
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 39 6
A Controller.unload_napp() 0 30 5

1 Function

Rating   Name   Duplication   Size   Complexity  
A exc_handler() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29 1
from socket import error as SocketError
30
31 1
from kytos.core.api_server import APIServer
32 1
from kytos.core.apm import init_apm
33
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executors, now
44 1
from kytos.core.interface import Interface
45 1
from kytos.core.logs import LogManager
46 1
from kytos.core.napps.base import NApp
47 1
from kytos.core.napps.manager import NAppsManager
48 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 1
from kytos.core.switch import Switch
50
51 1
__all__ = ('Controller',)
52
53
54 1
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 1
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
81 1
    def __init__(self, options=None, loop=None):
82
        """Init method of Controller class takes the parameters below.
83
84
        Args:
85
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
86
                instance of :class:`~kytos.core.config.KytosConfig` class.
87
        """
88 1
        if options is None:
89
            options = KytosConfig().options['daemon']
90
91 1
        self._loop = loop or asyncio.get_event_loop()
92 1
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 1
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 1
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 1
        self.buffers = KytosBuffers(loop=self._loop)
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 1
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 1
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 1
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 1
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 1
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 1
        self.switches = {}  # dpid: Switch()
129 1
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 1
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 1
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 1
        self.napp_dir_listener = NAppDirListener(self)
139
140 1
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 1
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 1
        self.auth = Auth(self)
148 1
        self.dead_letter = DeadLetter(self)
149 1
        self._alisten_tasks = set()
150
151 1
        self._register_endpoints()
152
        #: Adding the napps 'enabled' directory into the PATH
153
        #: Now you can access the enabled napps with:
154
        #: from napps.<username>.<napp_name> import ?....
155 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
156 1
        sys.excepthook = exc_handler
157
158 1
    def enable_logs(self):
159
        """Register kytos log and enable the logs."""
160 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
161 1
        LogManager.enable_websocket(self.api_server.server)
162 1
        self.log = logging.getLogger(__name__)
163
164 1
    @staticmethod
165 1
    def loggers():
166
        """List all logging Loggers.
167
168
        Return a list of Logger objects, with name and logging level.
169
        """
170 1
        return [logging.getLogger(name)
171
                for name in logging.root.manager.loggerDict
172
                if "kytos" in name]
173
174 1
    def toggle_debug(self, name=None):
175
        """Enable/disable logging debug messages to a given logger name.
176
177
        If the name parameter is not specified the debug will be
178
        enabled/disabled following the initial config file. It will decide
179
        to enable/disable using the 'kytos' name to find the current
180
        logger level.
181
        Obs: To disable the debug the logging will be set to NOTSET
182
183
        Args:
184
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
185
        """
186 1
        if name and name not in logging.root.manager.loggerDict:
187
            # A Logger name that is not declared in logging will raise an error
188
            # otherwise logging would create a new Logger.
189 1
            raise ValueError(f"Invalid logger name: {name}")
190
191 1
        if not name:
192
            # Logger name not specified.
193 1
            level = logging.getLogger('kytos').getEffectiveLevel()
194 1
            enable_debug = level != logging.DEBUG
195
196
            # Enable/disable default Loggers
197 1
            LogManager.load_config_file(self.options.logging, enable_debug)
198 1
            return
199
200
        # Get effective logger level for the name
201 1
        level = logging.getLogger(name).getEffectiveLevel()
202 1
        logger = logging.getLogger(name)
203
204 1
        if level == logging.DEBUG:
205
            # disable debug
206 1
            logger.setLevel(logging.NOTSET)
207
        else:
208
            # enable debug
209 1
            logger.setLevel(logging.DEBUG)
210
211 1
    def start(self, restart=False):
212
        """Create pidfile and call start_controller method."""
213 1
        self.enable_logs()
214 1
        if self.options.database:
215 1
            self.db_conn_or_core_shutdown()
216 1
        if self.options.apm:
217 1
            self.init_apm_or_core_shutdown()
218 1
        if not restart:
219 1
            self.create_pidfile()
220 1
        self.start_controller()
221
222 1
    def create_pidfile(self):
223
        """Create a pidfile."""
224 1
        pid = os.getpid()
225
226
        # Creates directory if it doesn't exist
227
        # System can erase /var/run's content
228 1
        pid_folder = Path(self.options.pidfile).parent
229 1
        self.log.info(pid_folder)
230
231
        # Pylint incorrectly infers Path objects
232
        # https://github.com/PyCQA/pylint/issues/224
233
        # pylint: disable=no-member
234 1
        if not pid_folder.exists():
235
            pid_folder.mkdir()
236
            pid_folder.chmod(0o1777)
237
        # pylint: enable=no-member
238
239
        # Make sure the file is deleted when controller stops
240 1
        atexit.register(Path(self.options.pidfile).unlink)
241
242
        # Checks if a pidfile exists. Creates a new file.
243 1
        try:
244 1
            pidfile = open(self.options.pidfile, mode='x')
245 1
        except OSError:
246
            # This happens if there is a pidfile already.
247
            # We shall check if the process that created the pidfile is still
248
            # running.
249 1
            try:
250 1
                existing_file = open(self.options.pidfile, mode='r')
251 1
                old_pid = int(existing_file.read())
252 1
                os.kill(old_pid, 0)
253
                # If kill() doesn't return an error, there's a process running
254
                # with the same PID. We assume it is Kytos and quit.
255
                # Otherwise, overwrite the file and proceed.
256
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
257
                             "running. Aborting.")
258
                sys.exit(error_msg.format(self.options.pidfile))
259 1
            except OSError:
260 1
                try:
261 1
                    pidfile = open(self.options.pidfile, mode='w')
262
                except OSError as exception:
263
                    error_msg = "Failed to create pidfile {}: {}."
264
                    sys.exit(error_msg.format(self.options.pidfile, exception))
265
266
        # Identifies the process that created the pidfile.
267 1
        pidfile.write(str(pid))
268 1
        pidfile.close()
269
270 1
    def start_controller(self):
271
        """Start the controller.
272
273
        Starts the KytosServer (TCP Server) coroutine.
274
        Starts a thread for each buffer handler.
275
        Load the installed apps.
276
        """
277 1
        self.log.info("Starting Kytos - Kytos Controller")
278 1
        self.server = KytosServer((self.options.listen,
279
                                   int(self.options.port)),
280
                                  KytosServerProtocol,
281
                                  self,
282
                                  self.options.protocol_name)
283
284 1
        self.log.info("Starting TCP server: %s", self.server)
285 1
        self.server.serve_forever()
286
287 1
        def _stop_loop(_):
288
            loop = asyncio.get_event_loop()
289
            # print(_.result())
290
            threads = threading.enumerate()
291
            self.log.debug("%s threads before loop.stop: %s",
292
                           len(threads), threads)
293
            loop.stop()
294
295 1
        async def _run_api_server_thread(executor):
296
            log = logging.getLogger('kytos.core.controller.api_server_thread')
297
            log.debug('starting')
298
            # log.debug('creating tasks')
299
            loop = asyncio.get_event_loop()
300
            blocking_tasks = [
301
                loop.run_in_executor(executor, self.api_server.run)
302
            ]
303
            # log.debug('waiting for tasks')
304
            completed, pending = await asyncio.wait(blocking_tasks)
305
            # results = [t.result() for t in completed]
306
            # log.debug('results: {!r}'.format(results))
307
            log.debug('completed: %d, pending: %d',
308
                      len(completed), len(pending))
309
310 1
        task = self._loop.create_task(self.event_handler("conn"))
311 1
        self._tasks.append(task)
312 1
        task = self._loop.create_task(self.event_handler("raw"))
313 1
        self._tasks.append(task)
314 1
        task = self._loop.create_task(self.event_handler("msg_in"))
315 1
        self._tasks.append(task)
316 1
        task = self._loop.create_task(self.msg_out_event_handler())
317 1
        self._tasks.append(task)
318 1
        task = self._loop.create_task(self.event_handler("app"))
319 1
        self._tasks.append(task)
320 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
321 1
        task.add_done_callback(_stop_loop)
322 1
        self._tasks.append(task)
323
324 1
        self.log.info("ThreadPool started: %s", self._pool)
325
326
        # ASYNC TODO: ensure all threads started correctly
327
        # This is critical, if any of them failed starting we should exit.
328
        # sys.exit(error_msg.format(thread, exception))
329
330 1
        self.log.info("Loading Kytos NApps...")
331 1
        self.napp_dir_listener.start()
332 1
        self.pre_install_napps(self.options.napps_pre_installed)
333 1
        self.load_napps()
334
335 1
        self.started_at = now()
336
337 1
    def db_conn_or_core_shutdown(self):
338
        """Ensure db connection or core shutdown."""
339 1
        try:
340 1
            db_conn_wait(db_backend=self.options.database)
341 1
        except KytosDBInitException as exc:
342 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
343
344 1
    def init_apm_or_core_shutdown(self, **kwargs):
345
        """Init APM instrumentation or core shutdown."""
346
        if not self.options.apm:
347
            return
348
        try:
349
            init_apm(self.options.apm, app=self.api_server.app,
350
                     **kwargs)
351
        except KytosAPMInitException as exc:
352
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
353
354 1
    def _register_endpoints(self):
355
        """Register all rest endpoint served by kytos.
356
357
        -   Register APIServer endpoints
358
        -   Register WebUI endpoints
359
        -   Register ``/api/kytos/core/config`` endpoint
360
        """
361 1
        self.api_server.start_api()
362
        # Register controller endpoints as /api/kytos/core/...
363 1
        self.api_server.register_core_endpoint('config/',
364
                                               self.configuration_endpoint)
365 1
        self.api_server.register_core_endpoint('metadata/',
366
                                               Controller.metadata_endpoint)
367 1
        self.api_server.register_core_endpoint(
368
            'reload/<username>/<napp_name>/',
369
            self.rest_reload_napp)
370 1
        self.api_server.register_core_endpoint('reload/all',
371
                                               self.rest_reload_all_napps)
372 1
        self.auth.register_core_auth_services()
373 1
        self.dead_letter.register_endpoints()
374
375 1
    def register_rest_endpoint(self, url, function, methods):
376
        """Deprecate in favor of @rest decorator."""
377 1
        self.api_server.register_rest_endpoint(url, function, methods)
378
379 1
    @classmethod
380 1
    def metadata_endpoint(cls):
381
        """Return the Kytos metadata.
382
383
        Returns:
384
            string: Json with current kytos metadata.
385
386
        """
387 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
388 1
        meta_file = open(meta_path).read()
389 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
390 1
        return json.dumps(metadata)
391
392 1
    def configuration_endpoint(self):
393
        """Return the configuration options used by Kytos.
394
395
        Returns:
396
            string: Json with current configurations used by kytos.
397
398
        """
399 1
        return json.dumps(self.options.__dict__)
400
401 1
    def restart(self, graceful=True):
402
        """Restart Kytos SDN Controller.
403
404
        Args:
405
            graceful(bool): Represents the way that Kytos will restart.
406
        """
407 1
        if self.started_at is not None:
408 1
            self.stop(graceful)
409 1
            self.__init__(self.options)
410
411 1
        self.start(restart=True)
412
413 1
    def stop(self, graceful=True):
414
        """Shutdown all services used by kytos.
415
416
        This method should:
417
            - stop all Websockets
418
            - stop the API Server
419
            - stop the Controller
420
        """
421 1
        if self.started_at:
422 1
            self.stop_controller(graceful)
423
424 1
    def stop_controller(self, graceful=True):
425
        """Stop the controller.
426
427
        This method should:
428
            - announce on the network that the controller will shutdown;
429
            - stop receiving incoming packages;
430
            - call the 'shutdown' method of each KytosNApp that is running;
431
            - finish reading the events on all buffers;
432
            - stop each running handler;
433
            - stop all running threads;
434
            - stop the KytosServer;
435
        """
436 1
        self.log.info("Stopping Kytos")
437
438 1
        self.buffers.send_stop_signal()
439 1
        self.api_server.stop_api_server()
440 1
        self.napp_dir_listener.stop()
441
442 1
        self.log.info("Stopping threadpool: %s", self._pool)
443 1
        for pool_name in executors:
444 1
            self.log.info("Stopping threadpool: %s", pool_name)
445
446 1
        threads = threading.enumerate()
447 1
        self.log.debug("%s threads before threadpool shutdown: %s",
448
                       len(threads), threads)
449
450 1
        try:
451
            # Python >= 3.9
452
            # pylint: disable=unexpected-keyword-arg
453 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
454 1
            for executor_pool in executors.values():
455 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
456
        except TypeError:
457
            self._pool.shutdown(wait=graceful)
458
            for executor_pool in executors.values():
459
                executor_pool.shutdown(wait=graceful)
460
461
        # self.server.socket.shutdown()
462
        # self.server.socket.close()
463
464
        # for thread in self._threads.values():
465
        #     self.log.info("Stopping thread: %s", thread.name)
466
        #     thread.join()
467
468
        # for thread in self._threads.values():
469
        #     while thread.is_alive():
470
        #         self.log.info("Thread is alive: %s", thread.name)
471
        #         pass
472
473 1
        self.started_at = None
474 1
        self.unload_napps()
475 1
        self.buffers = KytosBuffers()
476
477
        # Cancel all async tasks (event handlers and servers)
478 1
        for task in self._tasks:
479
            task.cancel()
480
481
        # ASYNC TODO: close connections
482
        # self.server.server_close()
483
484
        # Shutdown the TCP server and the main asyncio loop
485 1
        self.server.shutdown()
486
487 1
    def status(self):
488
        """Return status of Kytos Server.
489
490
        If the controller kytos is running this method will be returned
491
        "Running since 'Started_At'", otherwise "Stopped".
492
493
        Returns:
494
            string: String with kytos status.
495
496
        """
497 1
        if self.started_at:
498 1
            return "Running since %s" % self.started_at
499 1
        return "Stopped"
500
501 1
    def uptime(self):
502
        """Return the uptime of kytos server.
503
504
        This method should return:
505
            - 0 if Kytos Server is stopped.
506
            - (kytos.start_at - datetime.now) if Kytos Server is running.
507
508
        Returns:
509
           datetime.timedelta: The uptime interval.
510
511
        """
512 1
        return now() - self.started_at if self.started_at else 0
513
514 1
    def notify_listeners(self, event):
515
        """Send the event to the specified listeners.
516
517
        Loops over self.events_listeners matching (by regexp) the attribute
518
        name of the event with the keys of events_listeners. If a match occurs,
519
        then send the event to each registered listener.
520
521
        Args:
522
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
523
        """
524
525 1
        self.log.debug("looking for listeners for %s", event)
526 1
        for event_regex, listeners in dict(self.events_listeners).items():
527
            # self.log.debug("listeners found for %s: %r => %s", event,
528
            #                event_regex, [l.__qualname__ for l in listeners])
529
            # Do not match if the event has more characters
530
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
531 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
532 1
                event_regex += '$'
533 1
            if re.match(event_regex, event.name):
534 1
                self.log.debug('Calling listeners for %s', event)
535 1
                for listener in listeners:
536 1
                    if asyncio.iscoroutinefunction(listener):
537
                        task = asyncio.create_task(listener(event))
538
                        self._alisten_tasks.add(task)
539
                        task.add_done_callback(self._alisten_tasks.discard)
540
                    else:
541 1
                        listener(event)
542
543 1
    async def event_handler(self, buffer_name: str):
544
        """Default event handler that gets from an event buffer."""
545 1
        event_buffer = getattr(self.buffers, buffer_name)
546
        while True:
547 1
            event = await event_buffer.aget()
548 1
            self.notify_listeners(event)
549
550 1
            if event.name == "kytos/core.shutdown":
551 1
                self.log.debug("Raw Event handler stopped")
552 1
                break
553
554 1
    async def publish_connection_error(self, event):
555
        """Publish connection error event.
556
557
        Args:
558
            event (KytosEvent): Event that triggered for error propagation.
559
        """
560
        event.name = \
561
            f"kytos/core.{event.destination.protocol.name}.connection.error"
562
        error_msg = f"Connection state: {event.destination.state}"
563
        event.content["exception"] = error_msg
564
        await self.buffers.app.aput(event)
565
566 1
    async def msg_out_event_handler(self):
567
        """Handle msg_out events.
568
569
        Listen to the msg_out buffer and send all its events to the
570
        corresponding listeners.
571
        """
572 1
        self.log.info("Message Out Event Handler started")
573
        while True:
574 1
            triggered_event = await self.buffers.msg_out.aget()
575
576 1
            if triggered_event.name == "kytos/core.shutdown":
577 1
                self.log.debug("Message Out Event handler stopped")
578 1
                break
579
580 1
            message = triggered_event.content['message']
581 1
            destination = triggered_event.destination
582 1
            try:
583 1
                if (destination and
584
                        not destination.state == ConnectionState.FINISHED):
585 1
                    packet = message.pack()
586 1
                    destination.send(packet)
587 1
                    self.log.debug('Connection %s: OUT OFP, '
588
                                   'version: %s, type: %s, xid: %s - %s',
589
                                   destination.id,
590
                                   message.header.version,
591
                                   message.header.message_type,
592
                                   message.header.xid,
593
                                   packet.hex())
594 1
                    self.notify_listeners(triggered_event)
595 1
                    continue
596
597
            except (OSError, SocketError):
598
                pass
599
600
            await self.publish_connection_error(triggered_event)
601
            self.log.info("connection closed. Cannot send message")
602
603 1
    def get_interface_by_id(self, interface_id):
604
        """Find a Interface  with interface_id.
605
606
        Args:
607
            interface_id(str): Interface Identifier.
608
609
        Returns:
610
            Interface: Instance of Interface with the id given.
611
612
        """
613 1
        if interface_id is None:
614 1
            return None
615
616 1
        switch_id = ":".join(interface_id.split(":")[:-1])
617 1
        interface_number = int(interface_id.split(":")[-1])
618
619 1
        switch = self.switches.get(switch_id)
620
621 1
        if not switch:
622 1
            return None
623
624 1
        return switch.interfaces.get(interface_number, None)
625
626 1
    def get_switch_by_dpid(self, dpid):
627
        """Return a specific switch by dpid.
628
629
        Args:
630
            dpid (|DPID|): dpid object used to identify a switch.
631
632
        Returns:
633
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
634
635
        """
636 1
        return self.switches.get(dpid)
637
638 1
    def get_switch_or_create(self, dpid, connection=None):
639
        """Return switch or create it if necessary.
640
641
        Args:
642
            dpid (|DPID|): dpid object used to identify a switch.
643
            connection (:class:`~kytos.core.connection.Connection`):
644
                connection used by switch. If a switch has a connection that
645
                will be updated.
646
647
        Returns:
648
            :class:`~kytos.core.switch.Switch`: new or existent switch.
649
650
        """
651 1
        with self._switches_lock:
652 1
            if connection:
653 1
                self.create_or_update_connection(connection)
654
655 1
            switch = self.get_switch_by_dpid(dpid)
656 1
            event_name = 'kytos/core.switch.'
657
658 1
            if switch is None:
659 1
                switch = Switch(dpid=dpid)
660 1
                self.add_new_switch(switch)
661 1
                event_name += 'new'
662
            else:
663 1
                event_name += 'reconnected'
664
665 1
            self.set_switch_options(dpid=dpid)
666 1
            event = KytosEvent(name=event_name, content={'switch': switch})
667
668 1
            if connection:
669 1
                old_connection = switch.connection
670 1
                switch.update_connection(connection)
671
672 1
                if old_connection is not connection:
673 1
                    self.remove_connection(old_connection)
674
675 1
            self.buffers.app.put(event)
676
677 1
            return switch
678
679 1
    def set_switch_options(self, dpid):
680
        """Update the switch settings based on kytos.conf options.
681
682
        Args:
683
            dpid (str): dpid used to identify a switch.
684
685
        """
686 1
        switch = self.switches.get(dpid)
687 1
        if not switch:
688
            return
689
690 1
        vlan_pool = {}
691 1
        vlan_pool = self.options.vlan_pool
692 1
        if not vlan_pool:
693 1
            return
694
695 1
        if vlan_pool.get(dpid):
696 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
697 1
            for intf_num, port_list in vlan_pool[dpid].items():
698 1
                if not switch.interfaces.get((intf_num)):
699 1
                    vlan_ids = set()
700 1
                    for vlan_range in port_list:
701 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
702 1
                        for vlan_id in range(vlan_begin, vlan_end):
703 1
                            vlan_ids.add(vlan_id)
704 1
                    intf_num = int(intf_num)
705 1
                    intf = Interface(name=intf_num, port_number=intf_num,
706
                                     switch=switch)
707 1
                    intf.set_available_tags(vlan_ids)
708 1
                    switch.update_interface(intf)
709
710 1
    def create_or_update_connection(self, connection):
711
        """Update a connection.
712
713
        Args:
714
            connection (:class:`~kytos.core.connection.Connection`):
715
                Instance of connection that will be updated.
716
        """
717 1
        self.connections[connection.id] = connection
718
719 1
    def get_connection_by_id(self, conn_id):
720
        """Return a existent connection by id.
721
722
        Args:
723
            id (int): id from a connection.
724
725
        Returns:
726
            :class:`~kytos.core.connection.Connection`: Instance of connection
727
                or None Type.
728
729
        """
730 1
        return self.connections.get(conn_id)
731
732 1
    def remove_connection(self, connection):
733
        """Close a existent connection and remove it.
734
735
        Args:
736
            connection (:class:`~kytos.core.connection.Connection`):
737
                Instance of connection that will be removed.
738
        """
739 1
        if connection is None:
740 1
            return False
741
742 1
        try:
743 1
            connection.close()
744 1
            del self.connections[connection.id]
745 1
        except KeyError:
746 1
            return False
747 1
        return True
748
749 1
    def remove_switch(self, switch):
750
        """Remove an existent switch.
751
752
        Args:
753
            switch (:class:`~kytos.core.switch.Switch`):
754
                Instance of switch that will be removed.
755
        """
756 1
        try:
757 1
            del self.switches[switch.dpid]
758 1
        except KeyError:
759 1
            return False
760 1
        return True
761
762 1
    def new_connection(self, event):
763
        """Handle a kytos/core.connection.new event.
764
765
        This method will read new connection event and store the connection
766
        (socket) into the connections attribute on the controller.
767
768
        It also clear all references to the connection since it is a new
769
        connection on the same ip:port.
770
771
        Args:
772
            event (~kytos.core.KytosEvent):
773
                The received event (``kytos/core.connection.new``) with the
774
                needed info.
775
        """
776 1
        self.log.info("Handling %s...", event)
777
778 1
        connection = event.source
779 1
        self.log.debug("Event source: %s", event.source)
780
781
        # Remove old connection (aka cleanup) if it exists
782 1
        if self.get_connection_by_id(connection.id):
783
            self.remove_connection(connection.id)
784
785
        # Update connections with the new connection
786 1
        self.create_or_update_connection(connection)
787
788 1
    def add_new_switch(self, switch):
789
        """Add a new switch on the controller.
790
791
        Args:
792
            switch (Switch): A Switch object
793
        """
794 1
        self.switches[switch.dpid] = switch
795
796 1
    def _import_napp(self, username, napp_name):
797
        """Import a NApp module.
798
799
        Raises:
800
            FileNotFoundError: if NApp's main.py is not found.
801
            ModuleNotFoundError: if any NApp requirement is not installed.
802
803
        """
804 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
805 1
        path = os.path.join(self.options.napps, username, napp_name,
806
                            'main.py')
807 1
        napp_spec = spec_from_file_location(mod_name, path)
808 1
        napp_module = module_from_spec(napp_spec)
809 1
        sys.modules[napp_spec.name] = napp_module
810 1
        napp_spec.loader.exec_module(napp_module)
811 1
        return napp_module
812
813 1
    def load_napp(self, username, napp_name):
814
        """Load a single NApp.
815
816
        Args:
817
            username (str): NApp username (makes up NApp's path).
818
            napp_name (str): Name of the NApp to be loaded.
819
        """
820 1
        if (username, napp_name) in self.napps:
821 1
            message = 'NApp %s/%s was already loaded'
822 1
            self.log.warning(message, username, napp_name)
823 1
            return
824
825 1
        try:
826 1
            napp_module = self._import_napp(username, napp_name)
827 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
828 1
            self.log.error("Error loading NApp '%s/%s': %s",
829
                           username, napp_name, err)
830 1
            return
831 1
        except FileNotFoundError as err:
832 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
833 1
            self.log.warning(msg, err.filename)
834 1
            return
835
836 1
        try:
837 1
            napp = napp_module.Main(controller=self)
838 1
        except:  # noqa pylint: disable=bare-except
839 1
            self.log.critical("NApp initialization failed: %s/%s",
840
                              username, napp_name, exc_info=True)
841 1
            return
842
843 1
        self.napps[(username, napp_name)] = napp
844
845 1
        napp.start()
846 1
        self.api_server.authenticate_endpoints(napp)
847 1
        self.api_server.register_napp_endpoints(napp)
848
849
        # pylint: disable=protected-access
850 1
        for event, listeners in napp._listeners.items():
851
            self.events_listeners.setdefault(event, []).extend(listeners)
852
        # pylint: enable=protected-access
853
854 1
    def pre_install_napps(self, napps, enable=True):
855
        """Pre install and enable NApps.
856
857
        Before installing, it'll check if it's installed yet.
858
859
        Args:
860
            napps ([str]): List of NApps to be pre-installed and enabled.
861
        """
862 1
        all_napps = self.napps_manager.get_installed_napps()
863 1
        installed = [str(napp) for napp in all_napps]
864 1
        napps_diff = [napp for napp in napps if napp not in installed]
865 1
        for napp in napps_diff:
866 1
            self.napps_manager.install(napp, enable=enable)
867
868 1
    def load_napps(self):
869
        """Load all NApps enabled on the NApps dir."""
870 1
        for napp in self.napps_manager.get_enabled_napps():
871 1
            try:
872 1
                self.log.info("Loading NApp %s", napp.id)
873 1
                self.load_napp(napp.username, napp.name)
874
            except FileNotFoundError as exception:
875
                self.log.error("Could not load NApp %s: %s",
876
                               napp.id, exception)
877
878 1
    def unload_napp(self, username, napp_name):
879
        """Unload a specific NApp.
880
881
        Args:
882
            username (str): NApp username.
883
            napp_name (str): Name of the NApp to be unloaded.
884
        """
885 1
        napp = self.napps.pop((username, napp_name), None)
886
887 1
        if napp is None:
888
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
889
        else:
890 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
891 1
            napp_id = NApp(username, napp_name).id
892 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
893 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
894
            # Call listener before removing it from events_listeners
895 1
            napp_shutdown_fn(event)
896
897
            # Remove rest endpoints from that napp
898 1
            self.api_server.remove_napp_endpoints(napp)
899
900
            # Removing listeners from that napp
901
            # pylint: disable=protected-access
902 1
            for event_type, napp_listeners in napp._listeners.items():
903
                event_listeners = self.events_listeners[event_type]
904
                for listener in napp_listeners:
905
                    event_listeners.remove(listener)
906
                if not event_listeners:
907
                    del self.events_listeners[event_type]
908
            # pylint: enable=protected-access
909
910 1
    def unload_napps(self):
911
        """Unload all loaded NApps that are not core NApps."""
912
        # list() is used here to avoid the error:
913
        # 'RuntimeError: dictionary changed size during iteration'
914
        # This is caused by looping over an dictionary while removing
915
        # items from it.
916
        for (username, napp_name) in list(self.napps.keys()):  # noqa
917
            self.unload_napp(username, napp_name)
918
919 1
    def reload_napp_module(self, username, napp_name, napp_file):
920
        """Reload a NApp Module."""
921 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
922 1
        try:
923 1
            napp_module = import_module(mod_name)
924 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
925 1
            self.log.error("Module '%s' not found", mod_name)
926 1
            raise
927 1
        try:
928 1
            napp_module = reload_module(napp_module)
929 1
        except ImportError as err:
930 1
            self.log.error("Error reloading NApp '%s/%s': %s",
931
                           username, napp_name, err)
932 1
            raise
933
934 1
    def reload_napp(self, username, napp_name):
935
        """Reload a NApp."""
936 1
        self.unload_napp(username, napp_name)
937 1
        try:
938 1
            self.reload_napp_module(username, napp_name, 'settings')
939 1
            self.reload_napp_module(username, napp_name, 'main')
940 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
941 1
            return 400
942 1
        self.log.info("NApp '%s/%s' successfully reloaded",
943
                      username, napp_name)
944 1
        self.load_napp(username, napp_name)
945 1
        return 200
946
947 1
    def rest_reload_napp(self, username, napp_name):
948
        """Request reload a NApp."""
949 1
        res = self.reload_napp(username, napp_name)
950 1
        return 'reloaded', res
951
952 1
    def rest_reload_all_napps(self):
953
        """Request reload all NApps."""
954 1
        for napp in self.napps:
955 1
            self.reload_napp(*napp)
956
        return 'reloaded', 200
957