kytos.core.controller   F
last analyzed

Complexity

Total Complexity 105

Size/Duplication

Total Lines 929
Duplicated Lines 0 %

Test Coverage

Coverage 90.93%

Importance

Changes 0
Metric Value
eloc 444
dl 0
loc 929
ccs 371
cts 408
cp 0.9093
rs 2
c 0
b 0
f 0
wmc 105

41 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.configuration_endpoint() 0 8 1
A Controller.register_rest_endpoint() 0 3 1
A Controller._register_endpoints() 0 19 1
A Controller.restart() 0 11 2
A Controller.metadata_endpoint() 0 12 1
A Controller.stop() 0 10 2
A Controller.start() 0 6 2
B Controller.create_pidfile() 0 47 5
A Controller.loggers() 0 9 1
B Controller.start_controller() 0 64 1
A Controller.__init__() 0 72 2
A Controller.toggle_debug() 0 36 5
A Controller.enable_logs() 0 5 1
A Controller.rest_reload_napp() 0 4 1
B Controller.notify_listeners() 0 22 6
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
A Controller.get_switch_or_create() 0 36 3
A Controller.raw_event_handler() 0 15 3
A Controller.msg_in_event_handler() 0 15 3
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
B Controller.set_switch_options() 0 30 8
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 31 5
A Controller.stop_controller() 0 57 3
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.pre_install_napps() 0 13 2
A Controller.status() 0 13 2
A Controller.app_event_handler() 0 15 3
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 39 6
A Controller.unload_napp() 0 30 5

1 Function

Rating   Name   Duplication   Size   Complexity  
A exc_handler() 0 12 1

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
def exc_handler(_, exc, __):
50
    """Log uncaught exceptions.
51
52
    Args:
53
        exc_type (ignored): exception type
54
        exc: exception instance
55
        tb (ignored): traceback
56
    """
57
    logging.basicConfig(filename='errlog.log',
58
                        format='%(asctime)s:%(pathname)'
59
                        's:%(levelname)s:%(message)s')
60
    logging.exception('Uncaught Exception: %s', exc)
61
62
63 1
class Controller:
64
    """Main class of Kytos.
65
66
    The main responsibilities of this class are:
67
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
68
        - manage KytosNApps (install, load and unload);
69
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
70
        - manage which event should be sent to NApps methods;
71
        - manage the buffers handlers, considering one thread per handler.
72
    """
73
74
    # Created issue #568 for the disabled checks.
75
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
76 1
    def __init__(self, options=None, loop=None):
77
        """Init method of Controller class takes the parameters below.
78
79
        Args:
80
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
81
                instance of :class:`~kytos.core.config.KytosConfig` class.
82
        """
83 1
        if options is None:
84
            options = KytosConfig().options['daemon']
85
86 1
        self._loop = loop or asyncio.get_event_loop()
87 1
        self._pool = ThreadPoolExecutor(max_workers=1)
88
        # asyncio tasks
89 1
        self._tasks = []
90
91
        #: dict: keep the main threads of the controller (buffers and handler)
92 1
        self._threads = {}
93
        #: KytosBuffers: KytosBuffer object with Controller buffers
94 1
        self.buffers = KytosBuffers(loop=self._loop)
95
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
96
        #:
97
        #: This dict stores all connections between the controller and the
98
        #: switches. The key for this dict is a tuple (ip, port). The content
99
        #: is another dict with the connection information.
100 1
        self.connections = {}
101
        #: dict: mapping of events and event listeners.
102
        #:
103
        #: The key of the dict is a KytosEvent (or a string that represent a
104
        #: regex to match against KytosEvents) and the value is a list of
105
        #: methods that will receive the referenced event
106 1
        self.events_listeners = {'kytos/core.connection.new':
107
                                 [self.new_connection]}
108
109
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
110
        #:
111
        #: The key is the napp name (string), while the value is the napp
112
        #: instance itself.
113 1
        self.napps = {}
114
        #: Object generated by ParseArgs on config.py file
115 1
        self.options = options
116
        #: KytosServer: Instance of KytosServer that will be listening to TCP
117
        #: connections.
118 1
        self.server = None
119
        #: dict: Current existing switches.
120
        #:
121
        #: The key is the switch dpid, while the value is a Switch object.
122 1
        self.switches = {}  # dpid: Switch()
123
124
        #: datetime.datetime: Time when the controller finished starting.
125 1
        self.started_at = None
126
127
        #: logging.Logger: Logger instance used by Kytos.
128 1
        self.log = None
129
130
        #: Observer that handle NApps when they are enabled or disabled.
131 1
        self.napp_dir_listener = NAppDirListener(self)
132
133 1
        self.napps_manager = NAppsManager(self)
134
135
        #: API Server used to expose rest endpoints.
136 1
        self.api_server = APIServer(__name__, self.options.listen,
137
                                    self.options.api_port,
138
                                    self.napps_manager, self.options.napps)
139
140 1
        self.auth = Auth(self)
141
142 1
        self._register_endpoints()
143
        #: Adding the napps 'enabled' directory into the PATH
144
        #: Now you can access the enabled napps with:
145
        #: from napps.<username>.<napp_name> import ?....
146 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
147 1
        sys.excepthook = exc_handler
148
149 1
    def enable_logs(self):
150
        """Register kytos log and enable the logs."""
151 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
152 1
        LogManager.enable_websocket(self.api_server.server)
153 1
        self.log = logging.getLogger(__name__)
154
155 1
    @staticmethod
156
    def loggers():
157
        """List all logging Loggers.
158
159
        Return a list of Logger objects, with name and logging level.
160
        """
161 1
        return [logging.getLogger(name)
162
                for name in logging.root.manager.loggerDict
163
                if "kytos" in name]
164
165 1
    def toggle_debug(self, name=None):
166
        """Enable/disable logging debug messages to a given logger name.
167
168
        If the name parameter is not specified the debug will be
169
        enabled/disabled following the initial config file. It will decide
170
        to enable/disable using the 'kytos' name to find the current
171
        logger level.
172
        Obs: To disable the debug the logging will be set to NOTSET
173
174
        Args:
175
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
176
        """
177 1
        if name and name not in logging.root.manager.loggerDict:
178
            # A Logger name that is not declared in logging will raise an error
179
            # otherwise logging would create a new Logger.
180 1
            raise ValueError(f"Invalid logger name: {name}")
181
182 1
        if not name:
183
            # Logger name not specified.
184 1
            level = logging.getLogger('kytos').getEffectiveLevel()
185 1
            enable_debug = level != logging.DEBUG
186
187
            # Enable/disable default Loggers
188 1
            LogManager.load_config_file(self.options.logging, enable_debug)
189 1
            return
190
191
        # Get effective logger level for the name
192 1
        level = logging.getLogger(name).getEffectiveLevel()
193 1
        logger = logging.getLogger(name)
194
195 1
        if level == logging.DEBUG:
196
            # disable debug
197 1
            logger.setLevel(logging.NOTSET)
198
        else:
199
            # enable debug
200 1
            logger.setLevel(logging.DEBUG)
201
202 1
    def start(self, restart=False):
203
        """Create pidfile and call start_controller method."""
204 1
        self.enable_logs()
205 1
        if not restart:
206 1
            self.create_pidfile()
207 1
        self.start_controller()
208
209 1
    def create_pidfile(self):
210
        """Create a pidfile."""
211 1
        pid = os.getpid()
212
213
        # Creates directory if it doesn't exist
214
        # System can erase /var/run's content
215 1
        pid_folder = Path(self.options.pidfile).parent
216 1
        self.log.info(pid_folder)
217
218
        # Pylint incorrectly infers Path objects
219
        # https://github.com/PyCQA/pylint/issues/224
220
        # pylint: disable=no-member
221 1
        if not pid_folder.exists():
222
            pid_folder.mkdir()
223
            pid_folder.chmod(0o1777)
224
        # pylint: enable=no-member
225
226
        # Make sure the file is deleted when controller stops
227 1
        atexit.register(Path(self.options.pidfile).unlink)
228
229
        # Checks if a pidfile exists. Creates a new file.
230 1
        try:
231 1
            pidfile = open(self.options.pidfile, mode='x')
232 1
        except OSError:
233
            # This happens if there is a pidfile already.
234
            # We shall check if the process that created the pidfile is still
235
            # running.
236 1
            try:
237 1
                existing_file = open(self.options.pidfile, mode='r')
238 1
                old_pid = int(existing_file.read())
239 1
                os.kill(old_pid, 0)
240
                # If kill() doesn't return an error, there's a process running
241
                # with the same PID. We assume it is Kytos and quit.
242
                # Otherwise, overwrite the file and proceed.
243
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
244
                             "running. Aborting.")
245
                sys.exit(error_msg.format(self.options.pidfile))
246 1
            except OSError:
247 1
                try:
248 1
                    pidfile = open(self.options.pidfile, mode='w')
249
                except OSError as exception:
250
                    error_msg = "Failed to create pidfile {}: {}."
251
                    sys.exit(error_msg.format(self.options.pidfile, exception))
252
253
        # Identifies the process that created the pidfile.
254 1
        pidfile.write(str(pid))
255 1
        pidfile.close()
256
257 1
    def start_controller(self):
258
        """Start the controller.
259
260
        Starts the KytosServer (TCP Server) coroutine.
261
        Starts a thread for each buffer handler.
262
        Load the installed apps.
263
        """
264 1
        self.log.info("Starting Kytos - Kytos Controller")
265 1
        self.server = KytosServer((self.options.listen,
266
                                   int(self.options.port)),
267
                                  KytosServerProtocol,
268
                                  self,
269
                                  self.options.protocol_name)
270
271 1
        self.log.info("Starting TCP server: %s", self.server)
272 1
        self.server.serve_forever()
273
274 1
        def _stop_loop(_):
275
            loop = asyncio.get_event_loop()
276
            # print(_.result())
277
            threads = threading.enumerate()
278
            self.log.debug("%s threads before loop.stop: %s",
279
                           len(threads), threads)
280
            loop.stop()
281
282 1
        async def _run_api_server_thread(executor):
283
            log = logging.getLogger('kytos.core.controller.api_server_thread')
284
            log.debug('starting')
285
            # log.debug('creating tasks')
286
            loop = asyncio.get_event_loop()
287
            blocking_tasks = [
288
                loop.run_in_executor(executor, self.api_server.run)
289
            ]
290
            # log.debug('waiting for tasks')
291
            completed, pending = await asyncio.wait(blocking_tasks)
292
            # results = [t.result() for t in completed]
293
            # log.debug('results: {!r}'.format(results))
294
            log.debug('completed: %d, pending: %d',
295
                      len(completed), len(pending))
296
297 1
        task = self._loop.create_task(self.raw_event_handler())
298 1
        self._tasks.append(task)
299 1
        task = self._loop.create_task(self.msg_in_event_handler())
300 1
        self._tasks.append(task)
301 1
        task = self._loop.create_task(self.msg_out_event_handler())
302 1
        self._tasks.append(task)
303 1
        task = self._loop.create_task(self.app_event_handler())
304 1
        self._tasks.append(task)
305 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
306 1
        task.add_done_callback(_stop_loop)
307 1
        self._tasks.append(task)
308
309 1
        self.log.info("ThreadPool started: %s", self._pool)
310
311
        # ASYNC TODO: ensure all threads started correctly
312
        # This is critical, if any of them failed starting we should exit.
313
        # sys.exit(error_msg.format(thread, exception))
314
315 1
        self.log.info("Loading Kytos NApps...")
316 1
        self.napp_dir_listener.start()
317 1
        self.pre_install_napps(self.options.napps_pre_installed)
318 1
        self.load_napps()
319
320 1
        self.started_at = now()
321
322 1
    def _register_endpoints(self):
323
        """Register all rest endpoint served by kytos.
324
325
        -   Register APIServer endpoints
326
        -   Register WebUI endpoints
327
        -   Register ``/api/kytos/core/config`` endpoint
328
        """
329 1
        self.api_server.start_api()
330
        # Register controller endpoints as /api/kytos/core/...
331 1
        self.api_server.register_core_endpoint('config/',
332
                                               self.configuration_endpoint)
333 1
        self.api_server.register_core_endpoint('metadata/',
334
                                               Controller.metadata_endpoint)
335 1
        self.api_server.register_core_endpoint(
336
            'reload/<username>/<napp_name>/',
337
            self.rest_reload_napp)
338 1
        self.api_server.register_core_endpoint('reload/all',
339
                                               self.rest_reload_all_napps)
340 1
        self.auth.register_core_auth_services()
341
342 1
    def register_rest_endpoint(self, url, function, methods):
343
        """Deprecate in favor of @rest decorator."""
344 1
        self.api_server.register_rest_endpoint(url, function, methods)
345
346 1
    @classmethod
347
    def metadata_endpoint(cls):
348
        """Return the Kytos metadata.
349
350
        Returns:
351
            string: Json with current kytos metadata.
352
353
        """
354 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
355 1
        meta_file = open(meta_path).read()
356 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
357 1
        return json.dumps(metadata)
358
359 1
    def configuration_endpoint(self):
360
        """Return the configuration options used by Kytos.
361
362
        Returns:
363
            string: Json with current configurations used by kytos.
364
365
        """
366 1
        return json.dumps(self.options.__dict__)
367
368 1
    def restart(self, graceful=True):
369
        """Restart Kytos SDN Controller.
370
371
        Args:
372
            graceful(bool): Represents the way that Kytos will restart.
373
        """
374 1
        if self.started_at is not None:
375 1
            self.stop(graceful)
376 1
            self.__init__(self.options)
377
378 1
        self.start(restart=True)
379
380 1
    def stop(self, graceful=True):
381
        """Shutdown all services used by kytos.
382
383
        This method should:
384
            - stop all Websockets
385
            - stop the API Server
386
            - stop the Controller
387
        """
388 1
        if self.started_at:
389 1
            self.stop_controller(graceful)
390
391 1
    def stop_controller(self, graceful=True):
392
        """Stop the controller.
393
394
        This method should:
395
            - announce on the network that the controller will shutdown;
396
            - stop receiving incoming packages;
397
            - call the 'shutdown' method of each KytosNApp that is running;
398
            - finish reading the events on all buffers;
399
            - stop each running handler;
400
            - stop all running threads;
401
            - stop the KytosServer;
402
        """
403 1
        self.log.info("Stopping Kytos")
404
405 1
        self.buffers.send_stop_signal()
406 1
        self.api_server.stop_api_server()
407 1
        self.napp_dir_listener.stop()
408
409 1
        self.log.info("Stopping threadpool: %s", self._pool)
410
411 1
        threads = threading.enumerate()
412 1
        self.log.debug("%s threads before threadpool shutdown: %s",
413
                       len(threads), threads)
414
415 1
        try:
416
            # Python >= 3.9
417
            # pylint: disable=unexpected-keyword-arg
418 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
419
            # pylint: enable=unexpected-keyword-arg
420
        except TypeError:
421
            self._pool.shutdown(wait=graceful)
422
423
        # self.server.socket.shutdown()
424
        # self.server.socket.close()
425
426
        # for thread in self._threads.values():
427
        #     self.log.info("Stopping thread: %s", thread.name)
428
        #     thread.join()
429
430
        # for thread in self._threads.values():
431
        #     while thread.is_alive():
432
        #         self.log.info("Thread is alive: %s", thread.name)
433
        #         pass
434
435 1
        self.started_at = None
436 1
        self.unload_napps()
437 1
        self.buffers = KytosBuffers()
438
439
        # Cancel all async tasks (event handlers and servers)
440 1
        for task in self._tasks:
441
            task.cancel()
442
443
        # ASYNC TODO: close connections
444
        # self.server.server_close()
445
446
        # Shutdown the TCP server and the main asyncio loop
447 1
        self.server.shutdown()
448
449 1
    def status(self):
450
        """Return status of Kytos Server.
451
452
        If the controller kytos is running this method will be returned
453
        "Running since 'Started_At'", otherwise "Stopped".
454
455
        Returns:
456
            string: String with kytos status.
457
458
        """
459 1
        if self.started_at:
460 1
            return "Running since %s" % self.started_at
461 1
        return "Stopped"
462
463 1
    def uptime(self):
464
        """Return the uptime of kytos server.
465
466
        This method should return:
467
            - 0 if Kytos Server is stopped.
468
            - (kytos.start_at - datetime.now) if Kytos Server is running.
469
470
        Returns:
471
           datetime.timedelta: The uptime interval.
472
473
        """
474 1
        return now() - self.started_at if self.started_at else 0
475
476 1
    def notify_listeners(self, event):
477
        """Send the event to the specified listeners.
478
479
        Loops over self.events_listeners matching (by regexp) the attribute
480
        name of the event with the keys of events_listeners. If a match occurs,
481
        then send the event to each registered listener.
482
483
        Args:
484
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
485
        """
486 1
        self.log.debug("looking for listeners for %s", event)
487 1
        for event_regex, listeners in dict(self.events_listeners).items():
488
            # self.log.debug("listeners found for %s: %r => %s", event,
489
            #                event_regex, [l.__qualname__ for l in listeners])
490
            # Do not match if the event has more characters
491
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
492 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
493 1
                event_regex += '$'
494 1
            if re.match(event_regex, event.name):
495
                # self.log.debug('Calling listeners for %s', event)
496 1
                for listener in listeners:
497 1
                    listener(event)
498
499 1
    async def raw_event_handler(self):
500
        """Handle raw events.
501
502
        Listen to the raw_buffer and send all its events to the
503
        corresponding listeners.
504
        """
505 1
        self.log.info("Raw Event Handler started")
506 1
        while True:
507 1
            event = await self.buffers.raw.aget()
508 1
            self.notify_listeners(event)
509 1
            self.log.debug("Raw Event handler called")
510
511 1
            if event.name == "kytos/core.shutdown":
512 1
                self.log.debug("Raw Event handler stopped")
513 1
                break
514
515 1
    async def msg_in_event_handler(self):
516
        """Handle msg_in events.
517
518
        Listen to the msg_in buffer and send all its events to the
519
        corresponding listeners.
520
        """
521 1
        self.log.info("Message In Event Handler started")
522 1
        while True:
523 1
            event = await self.buffers.msg_in.aget()
524 1
            self.notify_listeners(event)
525 1
            self.log.debug("Message In Event handler called")
526
527 1
            if event.name == "kytos/core.shutdown":
528 1
                self.log.debug("Message In Event handler stopped")
529 1
                break
530
531 1
    async def msg_out_event_handler(self):
532
        """Handle msg_out events.
533
534
        Listen to the msg_out buffer and send all its events to the
535
        corresponding listeners.
536
        """
537 1
        self.log.info("Message Out Event Handler started")
538 1
        while True:
539 1
            triggered_event = await self.buffers.msg_out.aget()
540
541 1
            if triggered_event.name == "kytos/core.shutdown":
542 1
                self.log.debug("Message Out Event handler stopped")
543 1
                break
544
545 1
            message = triggered_event.content['message']
546 1
            destination = triggered_event.destination
547 1
            if (destination and
548
                    not destination.state == ConnectionState.FINISHED):
549 1
                packet = message.pack()
550 1
                destination.send(packet)
551 1
                self.log.debug('Connection %s: OUT OFP, '
552
                               'version: %s, type: %s, xid: %s - %s',
553
                               destination.id,
554
                               message.header.version,
555
                               message.header.message_type,
556
                               message.header.xid,
557
                               packet.hex())
558 1
                self.notify_listeners(triggered_event)
559 1
                self.log.debug("Message Out Event handler called")
560
            else:
561
                self.log.info("connection closed. Cannot send message")
562
563 1
    async def app_event_handler(self):
564
        """Handle app events.
565
566
        Listen to the app buffer and send all its events to the
567
        corresponding listeners.
568
        """
569 1
        self.log.info("App Event Handler started")
570 1
        while True:
571 1
            event = await self.buffers.app.aget()
572 1
            self.notify_listeners(event)
573 1
            self.log.debug("App Event handler called")
574
575 1
            if event.name == "kytos/core.shutdown":
576 1
                self.log.debug("App Event handler stopped")
577 1
                break
578
579 1
    def get_interface_by_id(self, interface_id):
580
        """Find a Interface  with interface_id.
581
582
        Args:
583
            interface_id(str): Interface Identifier.
584
585
        Returns:
586
            Interface: Instance of Interface with the id given.
587
588
        """
589 1
        if interface_id is None:
590 1
            return None
591
592 1
        switch_id = ":".join(interface_id.split(":")[:-1])
593 1
        interface_number = int(interface_id.split(":")[-1])
594
595 1
        switch = self.switches.get(switch_id)
596
597 1
        if not switch:
598 1
            return None
599
600 1
        return switch.interfaces.get(interface_number, None)
601
602 1
    def get_switch_by_dpid(self, dpid):
603
        """Return a specific switch by dpid.
604
605
        Args:
606
            dpid (|DPID|): dpid object used to identify a switch.
607
608
        Returns:
609
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
610
611
        """
612 1
        return self.switches.get(dpid)
613
614 1
    def get_switch_or_create(self, dpid, connection):
615
        """Return switch or create it if necessary.
616
617
        Args:
618
            dpid (|DPID|): dpid object used to identify a switch.
619
            connection (:class:`~kytos.core.connection.Connection`):
620
                connection used by switch. If a switch has a connection that
621
                will be updated.
622
623
        Returns:
624
            :class:`~kytos.core.switch.Switch`: new or existent switch.
625
626
        """
627 1
        self.create_or_update_connection(connection)
628 1
        switch = self.get_switch_by_dpid(dpid)
629 1
        event_name = 'kytos/core.switch.'
630
631 1
        if switch is None:
632 1
            switch = Switch(dpid=dpid)
633 1
            self.add_new_switch(switch)
634 1
            event_name += 'new'
635
        else:
636 1
            event_name += 'reconnected'
637
638 1
        self.set_switch_options(dpid=dpid)
639 1
        event = KytosEvent(name=event_name, content={'switch': switch})
640
641 1
        old_connection = switch.connection
642 1
        switch.update_connection(connection)
643
644 1
        if old_connection is not connection:
645 1
            self.remove_connection(old_connection)
646
647 1
        self.buffers.app.put(event)
648
649 1
        return switch
650
651 1
    def set_switch_options(self, dpid):
652
        """Update the switch settings based on kytos.conf options.
653
654
        Args:
655
            dpid (str): dpid used to identify a switch.
656
657
        """
658 1
        switch = self.switches.get(dpid)
659 1
        if not switch:
660
            return
661
662 1
        vlan_pool = {}
663 1
        vlan_pool = self.options.vlan_pool
664 1
        if not vlan_pool:
665 1
            return
666
667 1
        if vlan_pool.get(dpid):
668 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
669 1
            for intf_num, port_list in vlan_pool[dpid].items():
670 1
                if not switch.interfaces.get((intf_num)):
671 1
                    vlan_ids = set()
672 1
                    for vlan_range in port_list:
673 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
674 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by Vinicius Arcanjo
The variable range does not seem to be defined.
Loading history...
675 1
                            vlan_ids.add(vlan_id)
676 1
                    intf_num = int(intf_num)
677 1
                    intf = Interface(name=intf_num, port_number=intf_num,
678
                                     switch=switch)
679 1
                    intf.set_available_tags(vlan_ids)
680 1
                    switch.update_interface(intf)
681
682 1
    def create_or_update_connection(self, connection):
683
        """Update a connection.
684
685
        Args:
686
            connection (:class:`~kytos.core.connection.Connection`):
687
                Instance of connection that will be updated.
688
        """
689 1
        self.connections[connection.id] = connection
690
691 1
    def get_connection_by_id(self, conn_id):
692
        """Return a existent connection by id.
693
694
        Args:
695
            id (int): id from a connection.
696
697
        Returns:
698
            :class:`~kytos.core.connection.Connection`: Instance of connection
699
                or None Type.
700
701
        """
702 1
        return self.connections.get(conn_id)
703
704 1
    def remove_connection(self, connection):
705
        """Close a existent connection and remove it.
706
707
        Args:
708
            connection (:class:`~kytos.core.connection.Connection`):
709
                Instance of connection that will be removed.
710
        """
711 1
        if connection is None:
712 1
            return False
713
714 1
        try:
715 1
            connection.close()
716 1
            del self.connections[connection.id]
717 1
        except KeyError:
718 1
            return False
719 1
        return True
720
721 1
    def remove_switch(self, switch):
722
        """Remove an existent switch.
723
724
        Args:
725
            switch (:class:`~kytos.core.switch.Switch`):
726
                Instance of switch that will be removed.
727
        """
728 1
        try:
729 1
            del self.switches[switch.dpid]
730 1
        except KeyError:
731 1
            return False
732 1
        return True
733
734 1
    def new_connection(self, event):
735
        """Handle a kytos/core.connection.new event.
736
737
        This method will read new connection event and store the connection
738
        (socket) into the connections attribute on the controller.
739
740
        It also clear all references to the connection since it is a new
741
        connection on the same ip:port.
742
743
        Args:
744
            event (~kytos.core.KytosEvent):
745
                The received event (``kytos/core.connection.new``) with the
746
                needed info.
747
        """
748 1
        self.log.info("Handling %s...", event)
749
750 1
        connection = event.source
751 1
        self.log.debug("Event source: %s", event.source)
752
753
        # Remove old connection (aka cleanup) if it exists
754 1
        if self.get_connection_by_id(connection.id):
755
            self.remove_connection(connection.id)
756
757
        # Update connections with the new connection
758 1
        self.create_or_update_connection(connection)
759
760 1
    def add_new_switch(self, switch):
761
        """Add a new switch on the controller.
762
763
        Args:
764
            switch (Switch): A Switch object
765
        """
766 1
        self.switches[switch.dpid] = switch
767
768 1
    def _import_napp(self, username, napp_name):
769
        """Import a NApp module.
770
771
        Raises:
772
            FileNotFoundError: if NApp's main.py is not found.
773
            ModuleNotFoundError: if any NApp requirement is not installed.
774
775
        """
776 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
777 1
        path = os.path.join(self.options.napps, username, napp_name,
778
                            'main.py')
779 1
        napp_spec = spec_from_file_location(mod_name, path)
780 1
        napp_module = module_from_spec(napp_spec)
781 1
        sys.modules[napp_spec.name] = napp_module
782 1
        napp_spec.loader.exec_module(napp_module)
783 1
        return napp_module
784
785 1
    def load_napp(self, username, napp_name):
786
        """Load a single NApp.
787
788
        Args:
789
            username (str): NApp username (makes up NApp's path).
790
            napp_name (str): Name of the NApp to be loaded.
791
        """
792 1
        if (username, napp_name) in self.napps:
793 1
            message = 'NApp %s/%s was already loaded'
794 1
            self.log.warning(message, username, napp_name)
795 1
            return
796
797 1
        try:
798 1
            napp_module = self._import_napp(username, napp_name)
799 1
        except ModuleNotFoundError as err:
800 1
            self.log.error("Error loading NApp '%s/%s': %s",
801
                           username, napp_name, err)
802 1
            return
803 1
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by Humberto Diógenes
The variable FileNotFoundError does not seem to be defined.
Loading history...
804 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
805 1
            self.log.warning(msg, err.filename)
806 1
            return
807
808 1
        try:
809 1
            napp = napp_module.Main(controller=self)
810 1
        except:  # noqa pylint: disable=bare-except
811 1
            self.log.critical("NApp initialization failed: %s/%s",
812
                              username, napp_name, exc_info=True)
813 1
            return
814
815 1
        self.napps[(username, napp_name)] = napp
816
817 1
        napp.start()
818 1
        self.api_server.authenticate_endpoints(napp)
819 1
        self.api_server.register_napp_endpoints(napp)
820
821
        # pylint: disable=protected-access
822 1
        for event, listeners in napp._listeners.items():
823
            self.events_listeners.setdefault(event, []).extend(listeners)
824
        # pylint: enable=protected-access
825
826 1
    def pre_install_napps(self, napps, enable=True):
827
        """Pre install and enable NApps.
828
829
        Before installing, it'll check if it's installed yet.
830
831
        Args:
832
            napps ([str]): List of NApps to be pre-installed and enabled.
833
        """
834 1
        all_napps = self.napps_manager.get_installed_napps()
835 1
        installed = [str(napp) for napp in all_napps]
836 1
        napps_diff = [napp for napp in napps if napp not in installed]
837 1
        for napp in napps_diff:
838 1
            self.napps_manager.install(napp, enable=enable)
839
840 1
    def load_napps(self):
841
        """Load all NApps enabled on the NApps dir."""
842 1
        for napp in self.napps_manager.get_enabled_napps():
843 1
            try:
844 1
                self.log.info("Loading NApp %s", napp.id)
845 1
                self.load_napp(napp.username, napp.name)
846
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by Macartur Sousa
The variable FileNotFoundError does not seem to be defined.
Loading history...
847
                self.log.error("Could not load NApp %s: %s",
848
                               napp.id, exception)
849
850 1
    def unload_napp(self, username, napp_name):
851
        """Unload a specific NApp.
852
853
        Args:
854
            username (str): NApp username.
855
            napp_name (str): Name of the NApp to be unloaded.
856
        """
857 1
        napp = self.napps.pop((username, napp_name), None)
858
859 1
        if napp is None:
860
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
861
        else:
862 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
863 1
            napp_id = NApp(username, napp_name).id
864 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
865 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
866
            # Call listener before removing it from events_listeners
867 1
            napp_shutdown_fn(event)
868
869
            # Remove rest endpoints from that napp
870 1
            self.api_server.remove_napp_endpoints(napp)
871
872
            # Removing listeners from that napp
873
            # pylint: disable=protected-access
874 1
            for event_type, napp_listeners in napp._listeners.items():
875
                event_listeners = self.events_listeners[event_type]
876
                for listener in napp_listeners:
877
                    event_listeners.remove(listener)
878
                if not event_listeners:
879
                    del self.events_listeners[event_type]
880
            # pylint: enable=protected-access
881
882 1
    def unload_napps(self):
883
        """Unload all loaded NApps that are not core NApps."""
884
        # list() is used here to avoid the error:
885
        # 'RuntimeError: dictionary changed size during iteration'
886
        # This is caused by looping over an dictionary while removing
887
        # items from it.
888
        for (username, napp_name) in list(self.napps.keys()):  # noqa
889
            self.unload_napp(username, napp_name)
890
891 1
    def reload_napp_module(self, username, napp_name, napp_file):
892
        """Reload a NApp Module."""
893 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
894 1
        try:
895 1
            napp_module = import_module(mod_name)
896 1
        except ModuleNotFoundError as err:
897 1
            self.log.error("Module '%s' not found", mod_name)
898 1
            raise
899 1
        try:
900 1
            napp_module = reload_module(napp_module)
901 1
        except ImportError as err:
902 1
            self.log.error("Error reloading NApp '%s/%s': %s",
903
                           username, napp_name, err)
904 1
            raise
905
906 1
    def reload_napp(self, username, napp_name):
907
        """Reload a NApp."""
908 1
        self.unload_napp(username, napp_name)
909 1
        try:
910 1
            self.reload_napp_module(username, napp_name, 'settings')
911 1
            self.reload_napp_module(username, napp_name, 'main')
912 1
        except (ModuleNotFoundError, ImportError):
913 1
            return 400
914 1
        self.log.info("NApp '%s/%s' successfully reloaded",
915
                      username, napp_name)
916 1
        self.load_napp(username, napp_name)
917 1
        return 200
918
919 1
    def rest_reload_napp(self, username, napp_name):
920
        """Request reload a NApp."""
921 1
        res = self.reload_napp(username, napp_name)
922 1
        return 'reloaded', res
923
924 1
    def rest_reload_all_napps(self):
925
        """Request reload all NApps."""
926 1
        for napp in self.napps:
927 1
            self.reload_napp(*napp)
928
        return 'reloaded', 200
929