Passed
Push — master ( 3b7d73...e388a5 )
by Humberto
02:56
created

kytos.core.controller   F

Complexity

Total Complexity 102

Size/Duplication

Total Lines 894
Duplicated Lines 0 %

Test Coverage

Coverage 39.63%

Importance

Changes 0
Metric Value
wmc 102
eloc 425
dl 0
loc 894
ccs 155
cts 391
cp 0.3963
rs 2
c 0
b 0
f 0

41 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.__init__() 0 69 2
A Controller.enable_logs() 0 5 1
A Controller.rest_reload_napp() 0 4 1
A Controller.start() 0 6 2
B Controller.notify_listeners() 0 22 6
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
B Controller.create_pidfile() 0 47 5
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
A Controller.get_switch_or_create() 0 36 3
A Controller.raw_event_handler() 0 15 3
A Controller.configuration_endpoint() 0 8 1
A Controller.msg_in_event_handler() 0 15 3
A Controller.loggers() 0 9 1
A Controller.start_controller() 0 59 1
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
A Controller.toggle_debug() 0 36 5
C Controller.set_switch_options() 0 33 9
A Controller.metadata_endpoint() 0 12 1
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 31 5
A Controller.stop_controller() 0 47 1
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller.pre_install_napps() 0 13 2
A Controller.status() 0 13 2
A Controller.app_event_handler() 0 15 3
A Controller._register_endpoints() 0 19 1
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 33 5
A Controller.unload_napp() 0 30 5

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
class Controller:
50
    """Main class of Kytos.
51
52
    The main responsibilities of this class are:
53
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
54
        - manage KytosNApps (install, load and unload);
55
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
56
        - manage which event should be sent to NApps methods;
57
        - manage the buffers handlers, considering one thread per handler.
58
    """
59
60
    # Created issue #568 for the disabled checks.
61
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
62 1
    def __init__(self, options=None, loop=None):
63
        """Init method of Controller class takes the parameters below.
64
65
        Args:
66
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
67
                instance of :class:`~kytos.core.config.KytosConfig` class.
68
        """
69 1
        if options is None:
70
            options = KytosConfig().options['daemon']
71
72 1
        self._loop = loop or asyncio.get_event_loop()
73 1
        self._pool = ThreadPoolExecutor(max_workers=1)
74
75
        #: dict: keep the main threads of the controller (buffers and handler)
76 1
        self._threads = {}
77
        #: KytosBuffers: KytosBuffer object with Controller buffers
78 1
        self.buffers = KytosBuffers(loop=self._loop)
79
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
80
        #:
81
        #: This dict stores all connections between the controller and the
82
        #: switches. The key for this dict is a tuple (ip, port). The content
83
        #: is another dict with the connection information.
84 1
        self.connections = {}
85
        #: dict: mapping of events and event listeners.
86
        #:
87
        #: The key of the dict is a KytosEvent (or a string that represent a
88
        #: regex to match agains KytosEvents) and the value is a list of
89
        #: methods that will receive the referenced event
90 1
        self.events_listeners = {'kytos/core.connection.new':
91
                                 [self.new_connection]}
92
93
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
94
        #:
95
        #: The key is the napp name (string), while the value is the napp
96
        #: instance itself.
97 1
        self.napps = {}
98
        #: Object generated by ParseArgs on config.py file
99 1
        self.options = options
100
        #: KytosServer: Instance of KytosServer that will be listening to TCP
101
        #: connections.
102 1
        self.server = None
103
        #: dict: Current existing switches.
104
        #:
105
        #: The key is the switch dpid, while the value is a Switch object.
106 1
        self.switches = {}  # dpid: Switch()
107
108
        #: datetime.datetime: Time when the controller finished starting.
109 1
        self.started_at = None
110
111
        #: logging.Logger: Logger instance used by Kytos.
112 1
        self.log = None
113
114
        #: Observer that handle NApps when they are enabled or disabled.
115 1
        self.napp_dir_listener = NAppDirListener(self)
116
117 1
        self.napps_manager = NAppsManager(self)
118
119
        #: API Server used to expose rest endpoints.
120 1
        self.api_server = APIServer(__name__, self.options.listen,
121
                                    self.options.api_port,
122
                                    self.napps_manager, self.options.napps)
123
124 1
        self.auth = Auth(self)
125
126 1
        self._register_endpoints()
127
        #: Adding the napps 'enabled' directory into the PATH
128
        #: Now you can access the enabled napps with:
129
        #: from napps.<username>.<napp_name> import ?....
130 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
131
132 1
    def enable_logs(self):
133
        """Register kytos log and enable the logs."""
134 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
135 1
        LogManager.enable_websocket(self.api_server.server)
136 1
        self.log = logging.getLogger(__name__)
137
138 1
    @staticmethod
139
    def loggers():
140
        """List all logging Loggers.
141
142
        Return a list of Logger objects, with name and logging level.
143
        """
144 1
        return [logging.getLogger(name)
145
                for name in logging.root.manager.loggerDict
146
                if "kytos" in name]
147
148 1
    def toggle_debug(self, name=None):
149
        """Enable/disable logging debug messages to a given logger name.
150
151
        If the name parameter is not specified the debug will be
152
        enabled/disabled following the initial config file. It will decide
153
        to enable/disable using the 'kytos' name to find the current
154
        logger level.
155
        Obs: To disable the debug the logging will be set to NOTSET
156
157
        Args:
158
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
159
        """
160 1
        if name and name not in logging.root.manager.loggerDict:
161
            # A Logger name that is not declared in logging will raise an error
162
            # otherwise logging would create a new Logger.
163 1
            raise ValueError(f"Invalid logger name: {name}")
164
165 1
        if not name:
166
            # Logger name not specified.
167 1
            level = logging.getLogger('kytos').getEffectiveLevel()
168 1
            enable_debug = level != logging.DEBUG
169
170
            # Enable/disable default Loggers
171 1
            LogManager.load_config_file(self.options.logging, enable_debug)
172 1
            return
173
174
        # Get effective logger level for the name
175 1
        level = logging.getLogger(name).getEffectiveLevel()
176 1
        logger = logging.getLogger(name)
177
178 1
        if level == logging.DEBUG:
179
            # disable debug
180 1
            logger.setLevel(logging.NOTSET)
181
        else:
182
            # enable debug
183 1
            logger.setLevel(logging.DEBUG)
184
185 1
    def start(self, restart=False):
186
        """Create pidfile and call start_controller method."""
187
        self.enable_logs()
188
        if not restart:
189
            self.create_pidfile()
190
        self.start_controller()
191
192 1
    def create_pidfile(self):
193
        """Create a pidfile."""
194
        pid = os.getpid()
195
196
        # Creates directory if it doesn't exist
197
        # System can erase /var/run's content
198
        pid_folder = Path(self.options.pidfile).parent
199
        self.log.info(pid_folder)
200
201
        # Pylint incorrectly infers Path objects
202
        # https://github.com/PyCQA/pylint/issues/224
203
        # pylint: disable=no-member
204
        if not pid_folder.exists():
205
            pid_folder.mkdir()
206
            pid_folder.chmod(0o1777)
207
        # pylint: enable=no-member
208
209
        # Make sure the file is deleted when controller stops
210
        atexit.register(Path(self.options.pidfile).unlink)
211
212
        # Checks if a pidfile exists. Creates a new file.
213
        try:
214
            pidfile = open(self.options.pidfile, mode='x')
215
        except OSError:
216
            # This happens if there is a pidfile already.
217
            # We shall check if the process that created the pidfile is still
218
            # running.
219
            try:
220
                existing_file = open(self.options.pidfile, mode='r')
221
                old_pid = int(existing_file.read())
222
                os.kill(old_pid, 0)
223
                # If kill() doesn't return an error, there's a process running
224
                # with the same PID. We assume it is Kytos and quit.
225
                # Otherwise, overwrite the file and proceed.
226
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
227
                             "running. Aborting.")
228
                sys.exit(error_msg.format(self.options.pidfile))
229
            except OSError:
230
                try:
231
                    pidfile = open(self.options.pidfile, mode='w')
232
                except OSError as exception:
233
                    error_msg = "Failed to create pidfile {}: {}."
234
                    sys.exit(error_msg.format(self.options.pidfile, exception))
235
236
        # Identifies the process that created the pidfile.
237
        pidfile.write(str(pid))
238
        pidfile.close()
239
240 1
    def start_controller(self):
241
        """Start the controller.
242
243
        Starts the KytosServer (TCP Server) coroutine.
244
        Starts a thread for each buffer handler.
245
        Load the installed apps.
246
        """
247
        self.log.info("Starting Kytos - Kytos Controller")
248
        self.server = KytosServer((self.options.listen,
249
                                   int(self.options.port)),
250
                                  KytosServerProtocol,
251
                                  self,
252
                                  self.options.protocol_name)
253
254
        self.log.info("Starting TCP server: %s", self.server)
255
        self.server.serve_forever()
256
257
        def _stop_loop(_):
258
            loop = asyncio.get_event_loop()
259
            # print(_.result())
260
            threads = threading.enumerate()
261
            self.log.debug("%s threads before loop.stop: %s",
262
                           len(threads), threads)
263
            loop.stop()
264
265
        async def _run_api_server_thread(executor):
266
            log = logging.getLogger('kytos.core.controller.api_server_thread')
267
            log.debug('starting')
268
            # log.debug('creating tasks')
269
            loop = asyncio.get_event_loop()
270
            blocking_tasks = [
271
                loop.run_in_executor(executor, self.api_server.run)
272
            ]
273
            # log.debug('waiting for tasks')
274
            completed, pending = await asyncio.wait(blocking_tasks)
275
            # results = [t.result() for t in completed]
276
            # log.debug('results: {!r}'.format(results))
277
            log.debug('completed: %d, pending: %d',
278
                      len(completed), len(pending))
279
280
        task = self._loop.create_task(self.raw_event_handler())
281
        task = self._loop.create_task(self.msg_in_event_handler())
282
        task = self._loop.create_task(self.msg_out_event_handler())
283
        task = self._loop.create_task(self.app_event_handler())
284
        task = self._loop.create_task(_run_api_server_thread(self._pool))
285
        task.add_done_callback(_stop_loop)
286
287
        self.log.info("ThreadPool started: %s", self._pool)
288
289
        # ASYNC TODO: ensure all threads started correctly
290
        # This is critical, if any of them failed starting we should exit.
291
        # sys.exit(error_msg.format(thread, exception))
292
293
        self.log.info("Loading Kytos NApps...")
294
        self.napp_dir_listener.start()
295
        self.pre_install_napps(self.options.napps_pre_installed)
296
        self.load_napps()
297
298
        self.started_at = now()
299
300 1
    def _register_endpoints(self):
301
        """Register all rest endpoint served by kytos.
302
303
        -   Register APIServer endpoints
304
        -   Register WebUI endpoints
305
        -   Register ``/api/kytos/core/config`` endpoint
306
        """
307 1
        self.api_server.start_api()
308
        # Register controller endpoints as /api/kytos/core/...
309 1
        self.api_server.register_core_endpoint('config/',
310
                                               self.configuration_endpoint)
311 1
        self.api_server.register_core_endpoint('metadata/',
312
                                               Controller.metadata_endpoint)
313 1
        self.api_server.register_core_endpoint(
314
            'reload/<username>/<napp_name>/',
315
            self.rest_reload_napp)
316 1
        self.api_server.register_core_endpoint('reload/all',
317
                                               self.rest_reload_all_napps)
318 1
        self.auth.register_core_auth_services()
319
320 1
    def register_rest_endpoint(self, url, function, methods):
321
        """Deprecate in favor of @rest decorator."""
322 1
        self.api_server.register_rest_endpoint(url, function, methods)
323
324 1
    @classmethod
325
    def metadata_endpoint(cls):
326
        """Return the Kytos metadata.
327
328
        Returns:
329
            string: Json with current kytos metadata.
330
331
        """
332
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
333
        meta_file = open(meta_path).read()
334
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
335
        return json.dumps(metadata)
336
337 1
    def configuration_endpoint(self):
338
        """Return the configuration options used by Kytos.
339
340
        Returns:
341
            string: Json with current configurations used by kytos.
342
343
        """
344 1
        return json.dumps(self.options.__dict__)
345
346 1
    def restart(self, graceful=True):
347
        """Restart Kytos SDN Controller.
348
349
        Args:
350
            graceful(bool): Represents the way that Kytos will restart.
351
        """
352
        if self.started_at is not None:
353
            self.stop(graceful)
354
            self.__init__(self.options)
355
356
        self.start(restart=True)
357
358 1
    def stop(self, graceful=True):
359
        """Shutdown all services used by kytos.
360
361
        This method should:
362
            - stop all Websockets
363
            - stop the API Server
364
            - stop the Controller
365
        """
366
        if self.started_at:
367
            self.stop_controller(graceful)
368
369 1
    def stop_controller(self, graceful=True):
370
        """Stop the controller.
371
372
        This method should:
373
            - announce on the network that the controller will shutdown;
374
            - stop receiving incoming packages;
375
            - call the 'shutdown' method of each KytosNApp that is running;
376
            - finish reading the events on all buffers;
377
            - stop each running handler;
378
            - stop all running threads;
379
            - stop the KytosServer;
380
        """
381
        self.log.info("Stopping Kytos")
382
383
        self.buffers.send_stop_signal()
384
        self.api_server.stop_api_server()
385
        self.napp_dir_listener.stop()
386
387
        self.log.info("Stopping threadpool: %s", self._pool)
388
389
        threads = threading.enumerate()
390
        self.log.debug("%s threads before threadpool shutdown: %s",
391
                       len(threads), threads)
392
393
        self._pool.shutdown(wait=graceful)
394
395
        # self.server.socket.shutdown()
396
        # self.server.socket.close()
397
398
        # for thread in self._threads.values():
399
        #     self.log.info("Stopping thread: %s", thread.name)
400
        #     thread.join()
401
402
        # for thread in self._threads.values():
403
        #     while thread.is_alive():
404
        #         self.log.info("Thread is alive: %s", thread.name)
405
        #         pass
406
407
        self.started_at = None
408
        self.unload_napps()
409
        self.buffers = KytosBuffers()
410
411
        # ASYNC TODO: close connections
412
        # self.server.server_close()
413
414
        # Shutdown the TCP server and the main asyncio loop
415
        self.server.shutdown()
416
417 1
    def status(self):
418
        """Return status of Kytos Server.
419
420
        If the controller kytos is running this method will be returned
421
        "Running since 'Started_At'", otherwise "Stopped".
422
423
        Returns:
424
            string: String with kytos status.
425
426
        """
427
        if self.started_at:
428
            return "Running since %s" % self.started_at
429
        return "Stopped"
430
431 1
    def uptime(self):
432
        """Return the uptime of kytos server.
433
434
        This method should return:
435
            - 0 if Kytos Server is stopped.
436
            - (kytos.start_at - datetime.now) if Kytos Server is running.
437
438
        Returns:
439
           datetime.timedelta: The uptime interval.
440
441
        """
442
        return now() - self.started_at if self.started_at else 0
443
444 1
    def notify_listeners(self, event):
445
        """Send the event to the specified listeners.
446
447
        Loops over self.events_listeners matching (by regexp) the attribute
448
        name of the event with the keys of events_listeners. If a match occurs,
449
        then send the event to each registered listener.
450
451
        Args:
452
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
453
        """
454
        self.log.debug("looking for listeners for %s", event)
455
        for event_regex, listeners in dict(self.events_listeners).items():
456
            # self.log.debug("listeners found for %s: %r => %s", event,
457
            #                event_regex, [l.__qualname__ for l in listeners])
458
            # Do not match if the event has more characters
459
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
460
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
461
                event_regex += '$'
462
            if re.match(event_regex, event.name):
463
                # self.log.debug('Calling listeners for %s', event)
464
                for listener in listeners:
465
                    listener(event)
466
467 1
    async def raw_event_handler(self):
468
        """Handle raw events.
469
470
        Listen to the raw_buffer and send all its events to the
471
        corresponding listeners.
472
        """
473
        self.log.info("Raw Event Handler started")
474
        while True:
475
            event = await self.buffers.raw.aget()
476
            self.notify_listeners(event)
477
            self.log.debug("Raw Event handler called")
478
479
            if event.name == "kytos/core.shutdown":
480
                self.log.debug("Raw Event handler stopped")
481
                break
482
483 1
    async def msg_in_event_handler(self):
484
        """Handle msg_in events.
485
486
        Listen to the msg_in buffer and send all its events to the
487
        corresponding listeners.
488
        """
489
        self.log.info("Message In Event Handler started")
490
        while True:
491
            event = await self.buffers.msg_in.aget()
492
            self.notify_listeners(event)
493
            self.log.debug("Message In Event handler called")
494
495
            if event.name == "kytos/core.shutdown":
496
                self.log.debug("Message In Event handler stopped")
497
                break
498
499 1
    async def msg_out_event_handler(self):
500
        """Handle msg_out events.
501
502
        Listen to the msg_out buffer and send all its events to the
503
        corresponding listeners.
504
        """
505
        self.log.info("Message Out Event Handler started")
506
        while True:
507
            triggered_event = await self.buffers.msg_out.aget()
508
509
            if triggered_event.name == "kytos/core.shutdown":
510
                self.log.debug("Message Out Event handler stopped")
511
                break
512
513
            message = triggered_event.content['message']
514
            destination = triggered_event.destination
515
            if (destination and
516
                    not destination.state == ConnectionState.FINISHED):
517
                packet = message.pack()
518
                destination.send(packet)
519
                self.log.debug('Connection %s: OUT OFP, '
520
                               'version: %s, type: %s, xid: %s - %s',
521
                               destination.id,
522
                               message.header.version,
523
                               message.header.message_type,
524
                               message.header.xid,
525
                               packet.hex())
526
                self.notify_listeners(triggered_event)
527
                self.log.debug("Message Out Event handler called")
528
            else:
529
                self.log.info("connection closed. Cannot send message")
530
531 1
    async def app_event_handler(self):
532
        """Handle app events.
533
534
        Listen to the app buffer and send all its events to the
535
        corresponding listeners.
536
        """
537
        self.log.info("App Event Handler started")
538
        while True:
539
            event = await self.buffers.app.aget()
540
            self.notify_listeners(event)
541
            self.log.debug("App Event handler called")
542
543
            if event.name == "kytos/core.shutdown":
544
                self.log.debug("App Event handler stopped")
545
                break
546
547 1
    def get_interface_by_id(self, interface_id):
548
        """Find a Interface  with interface_id.
549
550
        Args:
551
            interface_id(str): Interface Identifier.
552
553
        Returns:
554
            Interface: Instance of Interface with the id given.
555
556
        """
557
        if interface_id is None:
558
            return None
559
560
        switch_id = ":".join(interface_id.split(":")[:-1])
561
        interface_number = int(interface_id.split(":")[-1])
562
563
        switch = self.switches.get(switch_id)
564
565
        if not switch:
566
            return None
567
568
        return switch.interfaces.get(interface_number, None)
569
570 1
    def get_switch_by_dpid(self, dpid):
571
        """Return a specific switch by dpid.
572
573
        Args:
574
            dpid (|DPID|): dpid object used to identify a switch.
575
576
        Returns:
577
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
578
579
        """
580 1
        return self.switches.get(dpid)
581
582 1
    def get_switch_or_create(self, dpid, connection):
583
        """Return switch or create it if necessary.
584
585
        Args:
586
            dpid (|DPID|): dpid object used to identify a switch.
587
            connection (:class:`~kytos.core.connection.Connection`):
588
                connection used by switch. If a switch has a connection that
589
                will be updated.
590
591
        Returns:
592
            :class:`~kytos.core.switch.Switch`: new or existent switch.
593
594
        """
595 1
        self.create_or_update_connection(connection)
596 1
        switch = self.get_switch_by_dpid(dpid)
597 1
        event_name = 'kytos/core.switch.'
598
599 1
        if switch is None:
600
            switch = Switch(dpid=dpid)
601
            self.add_new_switch(switch)
602
            event_name += 'new'
603
        else:
604 1
            event_name += 'reconnected'
605
606 1
        self.set_switch_options(dpid=dpid)
607 1
        event = KytosEvent(name=event_name, content={'switch': switch})
608
609 1
        old_connection = switch.connection
610 1
        switch.update_connection(connection)
611
612 1
        if old_connection is not connection:
613
            self.remove_connection(old_connection)
614
615 1
        self.buffers.app.put(event)
616
617 1
        return switch
618
619 1
    def set_switch_options(self, dpid):
620
        """Update the switch settings based on kytos.conf options.
621
622
        Args:
623
            dpid (str): dpid used to identify a switch.
624
625
        """
626 1
        switch = self.switches.get(dpid)
627 1
        if not switch:
628
            return
629
630 1
        vlan_pool = {}
631 1
        try:
632 1
            vlan_pool = json.loads(self.options.vlan_pool)
633 1
            if not vlan_pool:
634
                return
635
        except (TypeError, json.JSONDecodeError) as err:
636
            self.log.error("Invalid vlan_pool settings: %s", err)
637
638 1
        if vlan_pool.get(dpid):
639 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
640 1
            for intf_num, port_list in vlan_pool[dpid].items():
641 1
                if not switch.interfaces.get((intf_num)):
642 1
                    vlan_ids = set()
643 1
                    for vlan_range in port_list:
644 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
645 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
646 1
                            vlan_ids.add(vlan_id)
647 1
                    intf_num = int(intf_num)
648 1
                    intf = Interface(name=intf_num, port_number=intf_num,
649
                                     switch=switch)
650 1
                    intf.set_available_tags(vlan_ids)
651 1
                    switch.update_interface(intf)
652
653 1
    def create_or_update_connection(self, connection):
654
        """Update a connection.
655
656
        Args:
657
            connection (:class:`~kytos.core.connection.Connection`):
658
                Instance of connection that will be updated.
659
        """
660 1
        self.connections[connection.id] = connection
661
662 1
    def get_connection_by_id(self, conn_id):
663
        """Return a existent connection by id.
664
665
        Args:
666
            id (int): id from a connection.
667
668
        Returns:
669
            :class:`~kytos.core.connection.Connection`: Instance of connection
670
                or None Type.
671
672
        """
673
        return self.connections.get(conn_id)
674
675 1
    def remove_connection(self, connection):
676
        """Close a existent connection and remove it.
677
678
        Args:
679
            connection (:class:`~kytos.core.connection.Connection`):
680
                Instance of connection that will be removed.
681
        """
682
        if connection is None:
683
            return False
684
685
        try:
686
            connection.close()
687
            del self.connections[connection.id]
688
        except KeyError:
689
            return False
690
        return True
691
692 1
    def remove_switch(self, switch):
693
        """Remove an existent switch.
694
695
        Args:
696
            switch (:class:`~kytos.core.switch.Switch`):
697
                Instance of switch that will be removed.
698
        """
699
        try:
700
            del self.switches[switch.dpid]
701
        except KeyError:
702
            return False
703
        return True
704
705 1
    def new_connection(self, event):
706
        """Handle a kytos/core.connection.new event.
707
708
        This method will read new connection event and store the connection
709
        (socket) into the connections attribute on the controller.
710
711
        It also clear all references to the connection since it is a new
712
        connection on the same ip:port.
713
714
        Args:
715
            event (~kytos.core.KytosEvent):
716
                The received event (``kytos/core.connection.new``) with the
717
                needed info.
718
        """
719
        self.log.info("Handling %s...", event)
720
721
        connection = event.source
722
        self.log.debug("Event source: %s", event.source)
723
724
        # Remove old connection (aka cleanup) if it exists
725
        if self.get_connection_by_id(connection.id):
726
            self.remove_connection(connection.id)
727
728
        # Update connections with the new connection
729
        self.create_or_update_connection(connection)
730
731 1
    def add_new_switch(self, switch):
732
        """Add a new switch on the controller.
733
734
        Args:
735
            switch (Switch): A Switch object
736
        """
737
        self.switches[switch.dpid] = switch
738
739 1
    def _import_napp(self, username, napp_name):
740
        """Import a NApp module.
741
742
        Raises:
743
            FileNotFoundError: if NApp's main.py is not found.
744
            ModuleNotFoundError: if any NApp requirement is not installed.
745
746
        """
747
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
748
        path = os.path.join(self.options.napps, username, napp_name,
749
                            'main.py')
750
        napp_spec = spec_from_file_location(mod_name, path)
751
        napp_module = module_from_spec(napp_spec)
752
        sys.modules[napp_spec.name] = napp_module
753
        napp_spec.loader.exec_module(napp_module)
754
        return napp_module
755
756 1
    def load_napp(self, username, napp_name):
757
        """Load a single NApp.
758
759
        Args:
760
            username (str): NApp username (makes up NApp's path).
761
            napp_name (str): Name of the NApp to be loaded.
762
        """
763
        if (username, napp_name) in self.napps:
764
            message = 'NApp %s/%s was already loaded'
765
            self.log.warning(message, username, napp_name)
766
            return
767
768
        try:
769
            napp_module = self._import_napp(username, napp_name)
770
        except ModuleNotFoundError as err:
771
            self.log.error("Error loading NApp '%s/%s': %s",
772
                           username, napp_name, err)
773
            return
774
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
775
            msg = "NApp module not found, assuming it's a meta napp: %s"
776
            self.log.warning(msg, err.filename)
777
            return
778
779
        napp = napp_module.Main(controller=self)
780
781
        self.napps[(username, napp_name)] = napp
782
783
        napp.start()
784
        self.api_server.register_napp_endpoints(napp)
785
786
        # pylint: disable=protected-access
787
        for event, listeners in napp._listeners.items():
788
            self.events_listeners.setdefault(event, []).extend(listeners)
789
        # pylint: enable=protected-access
790
791 1
    def pre_install_napps(self, napps, enable=True):
792
        """Pre install and enable NApps.
793
794
        Before installing, it'll check if it's installed yet.
795
796
        Args:
797
            napps ([str]): List of NApps to be pre-installed and enabled.
798
        """
799
        all_napps = self.napps_manager.get_installed_napps()
800
        installed = [str(napp) for napp in all_napps]
801
        napps_diff = [napp for napp in napps if napp not in installed]
802
        for napp in napps_diff:
803
            self.napps_manager.install(napp, enable=enable)
804
805 1
    def load_napps(self):
806
        """Load all NApps enabled on the NApps dir."""
807
        for napp in self.napps_manager.get_enabled_napps():
808
            try:
809
                self.log.info("Loading NApp %s", napp.id)
810
                self.load_napp(napp.username, napp.name)
811
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
812
                self.log.error("Could not load NApp %s: %s",
813
                               napp.id, exception)
814
815 1
    def unload_napp(self, username, napp_name):
816
        """Unload a specific NApp.
817
818
        Args:
819
            username (str): NApp username.
820
            napp_name (str): Name of the NApp to be unloaded.
821
        """
822 1
        napp = self.napps.pop((username, napp_name), None)
823
824 1
        if napp is None:
825
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
826
        else:
827 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
828 1
            napp_id = NApp(username, napp_name).id
829 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
830 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
831
            # Call listener before removing it from events_listeners
832 1
            napp_shutdown_fn(event)
833
834
            # Remove rest endpoints from that napp
835 1
            self.api_server.remove_napp_endpoints(napp)
836
837
            # Removing listeners from that napp
838
            # pylint: disable=protected-access
839 1
            for event_type, napp_listeners in napp._listeners.items():
840
                event_listeners = self.events_listeners[event_type]
841
                for listener in napp_listeners:
842
                    event_listeners.remove(listener)
843
                if not event_listeners:
844
                    del self.events_listeners[event_type]
845
            # pylint: enable=protected-access
846
847 1
    def unload_napps(self):
848
        """Unload all loaded NApps that are not core NApps."""
849
        # list() is used here to avoid the error:
850
        # 'RuntimeError: dictionary changed size during iteration'
851
        # This is caused by looping over an dictionary while removing
852
        # items from it.
853
        for (username, napp_name) in list(self.napps.keys()):  # noqa
854
            self.unload_napp(username, napp_name)
855
856 1
    def reload_napp_module(self, username, napp_name, napp_file):
857
        """Reload a NApp Module."""
858
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
859
        try:
860
            napp_module = import_module(mod_name)
861
        except ModuleNotFoundError as err:
862
            self.log.error("Module '%s' not found", mod_name)
863
            raise
864
        try:
865
            napp_module = reload_module(napp_module)
866
        except ImportError as err:
867
            self.log.error("Error reloading NApp '%s/%s': %s",
868
                           username, napp_name, err)
869
            raise
870
871 1
    def reload_napp(self, username, napp_name):
872
        """Reload a NApp."""
873
        self.unload_napp(username, napp_name)
874
        try:
875
            self.reload_napp_module(username, napp_name, 'settings')
876
            self.reload_napp_module(username, napp_name, 'main')
877
        except (ModuleNotFoundError, ImportError):
878
            return 400
879
        self.log.info("NApp '%s/%s' successfully reloaded",
880
                      username, napp_name)
881
        self.load_napp(username, napp_name)
882
        return 200
883
884 1
    def rest_reload_napp(self, username, napp_name):
885
        """Request reload a NApp."""
886
        res = self.reload_napp(username, napp_name)
887
        return 'reloaded', res
888
889 1
    def rest_reload_all_napps(self):
890
        """Request reload all NApps."""
891
        for napp in self.napps:
892
            self.reload_napp(*napp)
893
        return 'reloaded', 200
894