kytos.core.controller   F
last analyzed

Complexity

Total Complexity 103

Size/Duplication

Total Lines 899
Duplicated Lines 0 %

Test Coverage

Coverage 91.39%

Importance

Changes 0
Metric Value
eloc 430
dl 0
loc 899
rs 2
c 0
b 0
f 0
ccs 361
cts 395
cp 0.9139
wmc 103

41 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.unload_napps() 0 8 2
A Controller.unload_napp() 0 30 5
A Controller.__init__() 0 69 2
A Controller.enable_logs() 0 5 1
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.configuration_endpoint() 0 8 1
A Controller.loggers() 0 9 1
A Controller.register_rest_endpoint() 0 3 1
A Controller.toggle_debug() 0 36 5
A Controller._register_endpoints() 0 19 1
A Controller.rest_reload_napp() 0 4 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller.get_connection_by_id() 0 12 1
A Controller.get_switch_or_create() 0 36 3
A Controller.get_interface_by_id() 0 22 3
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
C Controller.set_switch_options() 0 33 9
A Controller.remove_connection() 0 16 3
A Controller.pre_install_napps() 0 13 2
A Controller.start() 0 6 2
B Controller.notify_listeners() 0 22 6
A Controller.restart() 0 11 2
B Controller.create_pidfile() 0 47 5
A Controller.start_controller() 0 59 1
A Controller.uptime() 0 12 2
A Controller.metadata_endpoint() 0 12 1
A Controller.stop_controller() 0 47 1
A Controller.stop() 0 10 2
A Controller.status() 0 13 2
A Controller._import_napp() 0 16 1
A Controller.reload_napp_module() 0 14 3
B Controller.load_napp() 0 38 6
A Controller.raw_event_handler() 0 15 3
A Controller.msg_in_event_handler() 0 15 3
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 31 5
A Controller.app_event_handler() 0 15 3
A Controller.reload_napp() 0 12 2

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
class Controller:
50
    """Main class of Kytos.
51
52
    The main responsibilities of this class are:
53
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
54
        - manage KytosNApps (install, load and unload);
55
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
56
        - manage which event should be sent to NApps methods;
57
        - manage the buffers handlers, considering one thread per handler.
58
    """
59
60
    # Created issue #568 for the disabled checks.
61
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
62 1
    def __init__(self, options=None, loop=None):
63
        """Init method of Controller class takes the parameters below.
64
65
        Args:
66
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
67
                instance of :class:`~kytos.core.config.KytosConfig` class.
68
        """
69 1
        if options is None:
70
            options = KytosConfig().options['daemon']
71
72 1
        self._loop = loop or asyncio.get_event_loop()
73 1
        self._pool = ThreadPoolExecutor(max_workers=1)
74
75
        #: dict: keep the main threads of the controller (buffers and handler)
76 1
        self._threads = {}
77
        #: KytosBuffers: KytosBuffer object with Controller buffers
78 1
        self.buffers = KytosBuffers(loop=self._loop)
79
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
80
        #:
81
        #: This dict stores all connections between the controller and the
82
        #: switches. The key for this dict is a tuple (ip, port). The content
83
        #: is another dict with the connection information.
84 1
        self.connections = {}
85
        #: dict: mapping of events and event listeners.
86
        #:
87
        #: The key of the dict is a KytosEvent (or a string that represent a
88
        #: regex to match against KytosEvents) and the value is a list of
89
        #: methods that will receive the referenced event
90 1
        self.events_listeners = {'kytos/core.connection.new':
91
                                 [self.new_connection]}
92
93
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
94
        #:
95
        #: The key is the napp name (string), while the value is the napp
96
        #: instance itself.
97 1
        self.napps = {}
98
        #: Object generated by ParseArgs on config.py file
99 1
        self.options = options
100
        #: KytosServer: Instance of KytosServer that will be listening to TCP
101
        #: connections.
102 1
        self.server = None
103
        #: dict: Current existing switches.
104
        #:
105
        #: The key is the switch dpid, while the value is a Switch object.
106 1
        self.switches = {}  # dpid: Switch()
107
108
        #: datetime.datetime: Time when the controller finished starting.
109 1
        self.started_at = None
110
111
        #: logging.Logger: Logger instance used by Kytos.
112 1
        self.log = None
113
114
        #: Observer that handle NApps when they are enabled or disabled.
115 1
        self.napp_dir_listener = NAppDirListener(self)
116
117 1
        self.napps_manager = NAppsManager(self)
118
119
        #: API Server used to expose rest endpoints.
120 1
        self.api_server = APIServer(__name__, self.options.listen,
121
                                    self.options.api_port,
122
                                    self.napps_manager, self.options.napps)
123
124 1
        self.auth = Auth(self)
125
126 1
        self._register_endpoints()
127
        #: Adding the napps 'enabled' directory into the PATH
128
        #: Now you can access the enabled napps with:
129
        #: from napps.<username>.<napp_name> import ?....
130 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
131
132 1
    def enable_logs(self):
133
        """Register kytos log and enable the logs."""
134 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
135 1
        LogManager.enable_websocket(self.api_server.server)
136 1
        self.log = logging.getLogger(__name__)
137
138 1
    @staticmethod
139
    def loggers():
140
        """List all logging Loggers.
141
142
        Return a list of Logger objects, with name and logging level.
143
        """
144 1
        return [logging.getLogger(name)
145
                for name in logging.root.manager.loggerDict
146
                if "kytos" in name]
147
148 1
    def toggle_debug(self, name=None):
149
        """Enable/disable logging debug messages to a given logger name.
150
151
        If the name parameter is not specified the debug will be
152
        enabled/disabled following the initial config file. It will decide
153
        to enable/disable using the 'kytos' name to find the current
154
        logger level.
155
        Obs: To disable the debug the logging will be set to NOTSET
156
157
        Args:
158
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
159
        """
160 1
        if name and name not in logging.root.manager.loggerDict:
161
            # A Logger name that is not declared in logging will raise an error
162
            # otherwise logging would create a new Logger.
163 1
            raise ValueError(f"Invalid logger name: {name}")
164
165 1
        if not name:
166
            # Logger name not specified.
167 1
            level = logging.getLogger('kytos').getEffectiveLevel()
168 1
            enable_debug = level != logging.DEBUG
169
170
            # Enable/disable default Loggers
171 1
            LogManager.load_config_file(self.options.logging, enable_debug)
172 1
            return
173
174
        # Get effective logger level for the name
175 1
        level = logging.getLogger(name).getEffectiveLevel()
176 1
        logger = logging.getLogger(name)
177
178 1
        if level == logging.DEBUG:
179
            # disable debug
180 1
            logger.setLevel(logging.NOTSET)
181
        else:
182
            # enable debug
183 1
            logger.setLevel(logging.DEBUG)
184
185 1
    def start(self, restart=False):
186
        """Create pidfile and call start_controller method."""
187 1
        self.enable_logs()
188 1
        if not restart:
189 1
            self.create_pidfile()
190 1
        self.start_controller()
191
192 1
    def create_pidfile(self):
193
        """Create a pidfile."""
194 1
        pid = os.getpid()
195
196
        # Creates directory if it doesn't exist
197
        # System can erase /var/run's content
198 1
        pid_folder = Path(self.options.pidfile).parent
199 1
        self.log.info(pid_folder)
200
201
        # Pylint incorrectly infers Path objects
202
        # https://github.com/PyCQA/pylint/issues/224
203
        # pylint: disable=no-member
204 1
        if not pid_folder.exists():
205
            pid_folder.mkdir()
206
            pid_folder.chmod(0o1777)
207
        # pylint: enable=no-member
208
209
        # Make sure the file is deleted when controller stops
210 1
        atexit.register(Path(self.options.pidfile).unlink)
211
212
        # Checks if a pidfile exists. Creates a new file.
213 1
        try:
214 1
            pidfile = open(self.options.pidfile, mode='x')
215 1
        except OSError:
216
            # This happens if there is a pidfile already.
217
            # We shall check if the process that created the pidfile is still
218
            # running.
219 1
            try:
220 1
                existing_file = open(self.options.pidfile, mode='r')
221 1
                old_pid = int(existing_file.read())
222 1
                os.kill(old_pid, 0)
223
                # If kill() doesn't return an error, there's a process running
224
                # with the same PID. We assume it is Kytos and quit.
225
                # Otherwise, overwrite the file and proceed.
226
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
227
                             "running. Aborting.")
228
                sys.exit(error_msg.format(self.options.pidfile))
229 1
            except OSError:
230 1
                try:
231 1
                    pidfile = open(self.options.pidfile, mode='w')
232
                except OSError as exception:
233
                    error_msg = "Failed to create pidfile {}: {}."
234
                    sys.exit(error_msg.format(self.options.pidfile, exception))
235
236
        # Identifies the process that created the pidfile.
237 1
        pidfile.write(str(pid))
238 1
        pidfile.close()
239
240 1
    def start_controller(self):
241
        """Start the controller.
242
243
        Starts the KytosServer (TCP Server) coroutine.
244
        Starts a thread for each buffer handler.
245
        Load the installed apps.
246
        """
247 1
        self.log.info("Starting Kytos - Kytos Controller")
248 1
        self.server = KytosServer((self.options.listen,
249
                                   int(self.options.port)),
250
                                  KytosServerProtocol,
251
                                  self,
252
                                  self.options.protocol_name)
253
254 1
        self.log.info("Starting TCP server: %s", self.server)
255 1
        self.server.serve_forever()
256
257 1
        def _stop_loop(_):
258
            loop = asyncio.get_event_loop()
259
            # print(_.result())
260
            threads = threading.enumerate()
261
            self.log.debug("%s threads before loop.stop: %s",
262
                           len(threads), threads)
263
            loop.stop()
264
265 1
        async def _run_api_server_thread(executor):
266
            log = logging.getLogger('kytos.core.controller.api_server_thread')
267
            log.debug('starting')
268
            # log.debug('creating tasks')
269
            loop = asyncio.get_event_loop()
270
            blocking_tasks = [
271
                loop.run_in_executor(executor, self.api_server.run)
272
            ]
273
            # log.debug('waiting for tasks')
274
            completed, pending = await asyncio.wait(blocking_tasks)
275
            # results = [t.result() for t in completed]
276
            # log.debug('results: {!r}'.format(results))
277
            log.debug('completed: %d, pending: %d',
278
                      len(completed), len(pending))
279
280 1
        task = self._loop.create_task(self.raw_event_handler())
281 1
        task = self._loop.create_task(self.msg_in_event_handler())
282 1
        task = self._loop.create_task(self.msg_out_event_handler())
283 1
        task = self._loop.create_task(self.app_event_handler())
284 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
285 1
        task.add_done_callback(_stop_loop)
286
287 1
        self.log.info("ThreadPool started: %s", self._pool)
288
289
        # ASYNC TODO: ensure all threads started correctly
290
        # This is critical, if any of them failed starting we should exit.
291
        # sys.exit(error_msg.format(thread, exception))
292
293 1
        self.log.info("Loading Kytos NApps...")
294 1
        self.napp_dir_listener.start()
295 1
        self.pre_install_napps(self.options.napps_pre_installed)
296 1
        self.load_napps()
297
298 1
        self.started_at = now()
299
300 1
    def _register_endpoints(self):
301
        """Register all rest endpoint served by kytos.
302
303
        -   Register APIServer endpoints
304
        -   Register WebUI endpoints
305
        -   Register ``/api/kytos/core/config`` endpoint
306
        """
307 1
        self.api_server.start_api()
308
        # Register controller endpoints as /api/kytos/core/...
309 1
        self.api_server.register_core_endpoint('config/',
310
                                               self.configuration_endpoint)
311 1
        self.api_server.register_core_endpoint('metadata/',
312
                                               Controller.metadata_endpoint)
313 1
        self.api_server.register_core_endpoint(
314
            'reload/<username>/<napp_name>/',
315
            self.rest_reload_napp)
316 1
        self.api_server.register_core_endpoint('reload/all',
317
                                               self.rest_reload_all_napps)
318 1
        self.auth.register_core_auth_services()
319
320 1
    def register_rest_endpoint(self, url, function, methods):
321
        """Deprecate in favor of @rest decorator."""
322 1
        self.api_server.register_rest_endpoint(url, function, methods)
323
324 1
    @classmethod
325
    def metadata_endpoint(cls):
326
        """Return the Kytos metadata.
327
328
        Returns:
329
            string: Json with current kytos metadata.
330
331
        """
332 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
333 1
        meta_file = open(meta_path).read()
334 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
335 1
        return json.dumps(metadata)
336
337 1
    def configuration_endpoint(self):
338
        """Return the configuration options used by Kytos.
339
340
        Returns:
341
            string: Json with current configurations used by kytos.
342
343
        """
344 1
        return json.dumps(self.options.__dict__)
345
346 1
    def restart(self, graceful=True):
347
        """Restart Kytos SDN Controller.
348
349
        Args:
350
            graceful(bool): Represents the way that Kytos will restart.
351
        """
352 1
        if self.started_at is not None:
353 1
            self.stop(graceful)
354 1
            self.__init__(self.options)
355
356 1
        self.start(restart=True)
357
358 1
    def stop(self, graceful=True):
359
        """Shutdown all services used by kytos.
360
361
        This method should:
362
            - stop all Websockets
363
            - stop the API Server
364
            - stop the Controller
365
        """
366 1
        if self.started_at:
367 1
            self.stop_controller(graceful)
368
369 1
    def stop_controller(self, graceful=True):
370
        """Stop the controller.
371
372
        This method should:
373
            - announce on the network that the controller will shutdown;
374
            - stop receiving incoming packages;
375
            - call the 'shutdown' method of each KytosNApp that is running;
376
            - finish reading the events on all buffers;
377
            - stop each running handler;
378
            - stop all running threads;
379
            - stop the KytosServer;
380
        """
381 1
        self.log.info("Stopping Kytos")
382
383 1
        self.buffers.send_stop_signal()
384 1
        self.api_server.stop_api_server()
385 1
        self.napp_dir_listener.stop()
386
387 1
        self.log.info("Stopping threadpool: %s", self._pool)
388
389 1
        threads = threading.enumerate()
390 1
        self.log.debug("%s threads before threadpool shutdown: %s",
391
                       len(threads), threads)
392
393 1
        self._pool.shutdown(wait=graceful)
394
395
        # self.server.socket.shutdown()
396
        # self.server.socket.close()
397
398
        # for thread in self._threads.values():
399
        #     self.log.info("Stopping thread: %s", thread.name)
400
        #     thread.join()
401
402
        # for thread in self._threads.values():
403
        #     while thread.is_alive():
404
        #         self.log.info("Thread is alive: %s", thread.name)
405
        #         pass
406
407 1
        self.started_at = None
408 1
        self.unload_napps()
409 1
        self.buffers = KytosBuffers()
410
411
        # ASYNC TODO: close connections
412
        # self.server.server_close()
413
414
        # Shutdown the TCP server and the main asyncio loop
415 1
        self.server.shutdown()
416
417 1
    def status(self):
418
        """Return status of Kytos Server.
419
420
        If the controller kytos is running this method will be returned
421
        "Running since 'Started_At'", otherwise "Stopped".
422
423
        Returns:
424
            string: String with kytos status.
425
426
        """
427 1
        if self.started_at:
428 1
            return "Running since %s" % self.started_at
429 1
        return "Stopped"
430
431 1
    def uptime(self):
432
        """Return the uptime of kytos server.
433
434
        This method should return:
435
            - 0 if Kytos Server is stopped.
436
            - (kytos.start_at - datetime.now) if Kytos Server is running.
437
438
        Returns:
439
           datetime.timedelta: The uptime interval.
440
441
        """
442 1
        return now() - self.started_at if self.started_at else 0
443
444 1
    def notify_listeners(self, event):
445
        """Send the event to the specified listeners.
446
447
        Loops over self.events_listeners matching (by regexp) the attribute
448
        name of the event with the keys of events_listeners. If a match occurs,
449
        then send the event to each registered listener.
450
451
        Args:
452
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
453
        """
454 1
        self.log.debug("looking for listeners for %s", event)
455 1
        for event_regex, listeners in dict(self.events_listeners).items():
456
            # self.log.debug("listeners found for %s: %r => %s", event,
457
            #                event_regex, [l.__qualname__ for l in listeners])
458
            # Do not match if the event has more characters
459
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
460 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
461 1
                event_regex += '$'
462 1
            if re.match(event_regex, event.name):
463
                # self.log.debug('Calling listeners for %s', event)
464 1
                for listener in listeners:
465 1
                    listener(event)
466
467 1
    async def raw_event_handler(self):
468
        """Handle raw events.
469
470
        Listen to the raw_buffer and send all its events to the
471
        corresponding listeners.
472
        """
473 1
        self.log.info("Raw Event Handler started")
474 1
        while True:
475 1
            event = await self.buffers.raw.aget()
476 1
            self.notify_listeners(event)
477 1
            self.log.debug("Raw Event handler called")
478
479 1
            if event.name == "kytos/core.shutdown":
480 1
                self.log.debug("Raw Event handler stopped")
481 1
                break
482
483 1
    async def msg_in_event_handler(self):
484
        """Handle msg_in events.
485
486
        Listen to the msg_in buffer and send all its events to the
487
        corresponding listeners.
488
        """
489 1
        self.log.info("Message In Event Handler started")
490 1
        while True:
491 1
            event = await self.buffers.msg_in.aget()
492 1
            self.notify_listeners(event)
493 1
            self.log.debug("Message In Event handler called")
494
495 1
            if event.name == "kytos/core.shutdown":
496 1
                self.log.debug("Message In Event handler stopped")
497 1
                break
498
499 1
    async def msg_out_event_handler(self):
500
        """Handle msg_out events.
501
502
        Listen to the msg_out buffer and send all its events to the
503
        corresponding listeners.
504
        """
505 1
        self.log.info("Message Out Event Handler started")
506 1
        while True:
507 1
            triggered_event = await self.buffers.msg_out.aget()
508
509 1
            if triggered_event.name == "kytos/core.shutdown":
510 1
                self.log.debug("Message Out Event handler stopped")
511 1
                break
512
513 1
            message = triggered_event.content['message']
514 1
            destination = triggered_event.destination
515 1
            if (destination and
516
                    not destination.state == ConnectionState.FINISHED):
517 1
                packet = message.pack()
518 1
                destination.send(packet)
519 1
                self.log.debug('Connection %s: OUT OFP, '
520
                               'version: %s, type: %s, xid: %s - %s',
521
                               destination.id,
522
                               message.header.version,
523
                               message.header.message_type,
524
                               message.header.xid,
525
                               packet.hex())
526 1
                self.notify_listeners(triggered_event)
527 1
                self.log.debug("Message Out Event handler called")
528
            else:
529
                self.log.info("connection closed. Cannot send message")
530
531 1
    async def app_event_handler(self):
532
        """Handle app events.
533
534
        Listen to the app buffer and send all its events to the
535
        corresponding listeners.
536
        """
537 1
        self.log.info("App Event Handler started")
538 1
        while True:
539 1
            event = await self.buffers.app.aget()
540 1
            self.notify_listeners(event)
541 1
            self.log.debug("App Event handler called")
542
543 1
            if event.name == "kytos/core.shutdown":
544 1
                self.log.debug("App Event handler stopped")
545 1
                break
546
547 1
    def get_interface_by_id(self, interface_id):
548
        """Find a Interface  with interface_id.
549
550
        Args:
551
            interface_id(str): Interface Identifier.
552
553
        Returns:
554
            Interface: Instance of Interface with the id given.
555
556
        """
557 1
        if interface_id is None:
558 1
            return None
559
560 1
        switch_id = ":".join(interface_id.split(":")[:-1])
561 1
        interface_number = int(interface_id.split(":")[-1])
562
563 1
        switch = self.switches.get(switch_id)
564
565 1
        if not switch:
566 1
            return None
567
568 1
        return switch.interfaces.get(interface_number, None)
569
570 1
    def get_switch_by_dpid(self, dpid):
571
        """Return a specific switch by dpid.
572
573
        Args:
574
            dpid (|DPID|): dpid object used to identify a switch.
575
576
        Returns:
577
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
578
579
        """
580 1
        return self.switches.get(dpid)
581
582 1
    def get_switch_or_create(self, dpid, connection):
583
        """Return switch or create it if necessary.
584
585
        Args:
586
            dpid (|DPID|): dpid object used to identify a switch.
587
            connection (:class:`~kytos.core.connection.Connection`):
588
                connection used by switch. If a switch has a connection that
589
                will be updated.
590
591
        Returns:
592
            :class:`~kytos.core.switch.Switch`: new or existent switch.
593
594
        """
595 1
        self.create_or_update_connection(connection)
596 1
        switch = self.get_switch_by_dpid(dpid)
597 1
        event_name = 'kytos/core.switch.'
598
599 1
        if switch is None:
600 1
            switch = Switch(dpid=dpid)
601 1
            self.add_new_switch(switch)
602 1
            event_name += 'new'
603
        else:
604 1
            event_name += 'reconnected'
605
606 1
        self.set_switch_options(dpid=dpid)
607 1
        event = KytosEvent(name=event_name, content={'switch': switch})
608
609 1
        old_connection = switch.connection
610 1
        switch.update_connection(connection)
611
612 1
        if old_connection is not connection:
613 1
            self.remove_connection(old_connection)
614
615 1
        self.buffers.app.put(event)
616
617 1
        return switch
618
619 1
    def set_switch_options(self, dpid):
620
        """Update the switch settings based on kytos.conf options.
621
622
        Args:
623
            dpid (str): dpid used to identify a switch.
624
625
        """
626 1
        switch = self.switches.get(dpid)
627 1
        if not switch:
628
            return
629
630 1
        vlan_pool = {}
631 1
        try:
632 1
            vlan_pool = json.loads(self.options.vlan_pool)
633 1
            if not vlan_pool:
634 1
                return
635
        except (TypeError, json.JSONDecodeError) as err:
636
            self.log.error("Invalid vlan_pool settings: %s", err)
637
638 1
        if vlan_pool.get(dpid):
639 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
640 1
            for intf_num, port_list in vlan_pool[dpid].items():
641 1
                if not switch.interfaces.get((intf_num)):
642 1
                    vlan_ids = set()
643 1
                    for vlan_range in port_list:
644 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
645 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by Vinicius Arcanjo
The variable range does not seem to be defined.
Loading history...
646 1
                            vlan_ids.add(vlan_id)
647 1
                    intf_num = int(intf_num)
648 1
                    intf = Interface(name=intf_num, port_number=intf_num,
649
                                     switch=switch)
650 1
                    intf.set_available_tags(vlan_ids)
651 1
                    switch.update_interface(intf)
652
653 1
    def create_or_update_connection(self, connection):
654
        """Update a connection.
655
656
        Args:
657
            connection (:class:`~kytos.core.connection.Connection`):
658
                Instance of connection that will be updated.
659
        """
660 1
        self.connections[connection.id] = connection
661
662 1
    def get_connection_by_id(self, conn_id):
663
        """Return a existent connection by id.
664
665
        Args:
666
            id (int): id from a connection.
667
668
        Returns:
669
            :class:`~kytos.core.connection.Connection`: Instance of connection
670
                or None Type.
671
672
        """
673 1
        return self.connections.get(conn_id)
674
675 1
    def remove_connection(self, connection):
676
        """Close a existent connection and remove it.
677
678
        Args:
679
            connection (:class:`~kytos.core.connection.Connection`):
680
                Instance of connection that will be removed.
681
        """
682 1
        if connection is None:
683 1
            return False
684
685 1
        try:
686 1
            connection.close()
687 1
            del self.connections[connection.id]
688 1
        except KeyError:
689 1
            return False
690 1
        return True
691
692 1
    def remove_switch(self, switch):
693
        """Remove an existent switch.
694
695
        Args:
696
            switch (:class:`~kytos.core.switch.Switch`):
697
                Instance of switch that will be removed.
698
        """
699 1
        try:
700 1
            del self.switches[switch.dpid]
701 1
        except KeyError:
702 1
            return False
703 1
        return True
704
705 1
    def new_connection(self, event):
706
        """Handle a kytos/core.connection.new event.
707
708
        This method will read new connection event and store the connection
709
        (socket) into the connections attribute on the controller.
710
711
        It also clear all references to the connection since it is a new
712
        connection on the same ip:port.
713
714
        Args:
715
            event (~kytos.core.KytosEvent):
716
                The received event (``kytos/core.connection.new``) with the
717
                needed info.
718
        """
719 1
        self.log.info("Handling %s...", event)
720
721 1
        connection = event.source
722 1
        self.log.debug("Event source: %s", event.source)
723
724
        # Remove old connection (aka cleanup) if it exists
725 1
        if self.get_connection_by_id(connection.id):
726
            self.remove_connection(connection.id)
727
728
        # Update connections with the new connection
729 1
        self.create_or_update_connection(connection)
730
731 1
    def add_new_switch(self, switch):
732
        """Add a new switch on the controller.
733
734
        Args:
735
            switch (Switch): A Switch object
736
        """
737 1
        self.switches[switch.dpid] = switch
738
739 1
    def _import_napp(self, username, napp_name):
740
        """Import a NApp module.
741
742
        Raises:
743
            FileNotFoundError: if NApp's main.py is not found.
744
            ModuleNotFoundError: if any NApp requirement is not installed.
745
746
        """
747 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
748 1
        path = os.path.join(self.options.napps, username, napp_name,
749
                            'main.py')
750 1
        napp_spec = spec_from_file_location(mod_name, path)
751 1
        napp_module = module_from_spec(napp_spec)
752 1
        sys.modules[napp_spec.name] = napp_module
753 1
        napp_spec.loader.exec_module(napp_module)
754 1
        return napp_module
755
756 1
    def load_napp(self, username, napp_name):
757
        """Load a single NApp.
758
759
        Args:
760
            username (str): NApp username (makes up NApp's path).
761
            napp_name (str): Name of the NApp to be loaded.
762
        """
763 1
        if (username, napp_name) in self.napps:
764 1
            message = 'NApp %s/%s was already loaded'
765 1
            self.log.warning(message, username, napp_name)
766 1
            return
767
768 1
        try:
769 1
            napp_module = self._import_napp(username, napp_name)
770 1
        except ModuleNotFoundError as err:
771 1
            self.log.error("Error loading NApp '%s/%s': %s",
772
                           username, napp_name, err)
773 1
            return
774 1
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by Humberto Diógenes
The variable FileNotFoundError does not seem to be defined.
Loading history...
775 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
776 1
            self.log.warning(msg, err.filename)
777 1
            return
778
779 1
        try:
780 1
            napp = napp_module.Main(controller=self)
781 1
        except:  # noqa pylint: disable=bare-except
782 1
            self.log.critical("NApp initialization failed: %s/%s",
783
                              username, napp_name, exc_info=True)
784 1
            return
785
786 1
        self.napps[(username, napp_name)] = napp
787
788 1
        napp.start()
789 1
        self.api_server.register_napp_endpoints(napp)
790
791
        # pylint: disable=protected-access
792 1
        for event, listeners in napp._listeners.items():
793
            self.events_listeners.setdefault(event, []).extend(listeners)
794
        # pylint: enable=protected-access
795
796 1
    def pre_install_napps(self, napps, enable=True):
797
        """Pre install and enable NApps.
798
799
        Before installing, it'll check if it's installed yet.
800
801
        Args:
802
            napps ([str]): List of NApps to be pre-installed and enabled.
803
        """
804 1
        all_napps = self.napps_manager.get_installed_napps()
805 1
        installed = [str(napp) for napp in all_napps]
806 1
        napps_diff = [napp for napp in napps if napp not in installed]
807 1
        for napp in napps_diff:
808 1
            self.napps_manager.install(napp, enable=enable)
809
810 1
    def load_napps(self):
811
        """Load all NApps enabled on the NApps dir."""
812 1
        for napp in self.napps_manager.get_enabled_napps():
813 1
            try:
814 1
                self.log.info("Loading NApp %s", napp.id)
815 1
                self.load_napp(napp.username, napp.name)
816
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by Macartur Sousa
The variable FileNotFoundError does not seem to be defined.
Loading history...
817
                self.log.error("Could not load NApp %s: %s",
818
                               napp.id, exception)
819
820 1
    def unload_napp(self, username, napp_name):
821
        """Unload a specific NApp.
822
823
        Args:
824
            username (str): NApp username.
825
            napp_name (str): Name of the NApp to be unloaded.
826
        """
827 1
        napp = self.napps.pop((username, napp_name), None)
828
829 1
        if napp is None:
830
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
831
        else:
832 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
833 1
            napp_id = NApp(username, napp_name).id
834 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
835 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
836
            # Call listener before removing it from events_listeners
837 1
            napp_shutdown_fn(event)
838
839
            # Remove rest endpoints from that napp
840 1
            self.api_server.remove_napp_endpoints(napp)
841
842
            # Removing listeners from that napp
843
            # pylint: disable=protected-access
844 1
            for event_type, napp_listeners in napp._listeners.items():
845
                event_listeners = self.events_listeners[event_type]
846
                for listener in napp_listeners:
847
                    event_listeners.remove(listener)
848
                if not event_listeners:
849
                    del self.events_listeners[event_type]
850
            # pylint: enable=protected-access
851
852 1
    def unload_napps(self):
853
        """Unload all loaded NApps that are not core NApps."""
854
        # list() is used here to avoid the error:
855
        # 'RuntimeError: dictionary changed size during iteration'
856
        # This is caused by looping over an dictionary while removing
857
        # items from it.
858
        for (username, napp_name) in list(self.napps.keys()):  # noqa
859
            self.unload_napp(username, napp_name)
860
861 1
    def reload_napp_module(self, username, napp_name, napp_file):
862
        """Reload a NApp Module."""
863 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
864 1
        try:
865 1
            napp_module = import_module(mod_name)
866 1
        except ModuleNotFoundError as err:
867 1
            self.log.error("Module '%s' not found", mod_name)
868 1
            raise
869 1
        try:
870 1
            napp_module = reload_module(napp_module)
871 1
        except ImportError as err:
872 1
            self.log.error("Error reloading NApp '%s/%s': %s",
873
                           username, napp_name, err)
874 1
            raise
875
876 1
    def reload_napp(self, username, napp_name):
877
        """Reload a NApp."""
878 1
        self.unload_napp(username, napp_name)
879 1
        try:
880 1
            self.reload_napp_module(username, napp_name, 'settings')
881 1
            self.reload_napp_module(username, napp_name, 'main')
882 1
        except (ModuleNotFoundError, ImportError):
883 1
            return 400
884 1
        self.log.info("NApp '%s/%s' successfully reloaded",
885
                      username, napp_name)
886 1
        self.load_napp(username, napp_name)
887 1
        return 200
888
889 1
    def rest_reload_napp(self, username, napp_name):
890
        """Request reload a NApp."""
891 1
        res = self.reload_napp(username, napp_name)
892 1
        return 'reloaded', res
893
894 1
    def rest_reload_all_napps(self):
895
        """Request reload all NApps."""
896 1
        for napp in self.napps:
897 1
            self.reload_napp(*napp)
898
        return 'reloaded', 200
899