Test Failed
Pull Request — master (#149)
by Vinicius
18:12 queued 11:21
created

Controller.configuration_endpoint()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29
30 2
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 2
from kytos.core.auth import Auth
34 2
from kytos.core.buffers import KytosBuffers
35 2
from kytos.core.config import KytosConfig
36 2
from kytos.core.connection import ConnectionState
37 2
from kytos.core.events import KytosEvent
38 2
from kytos.core.helpers import executor as executor_pool
39 2
from kytos.core.helpers import now
40 2
from kytos.core.interface import Interface
41 2
from kytos.core.logs import LogManager
42 2
from kytos.core.napps.base import NApp
43 2
from kytos.core.napps.manager import NAppsManager
44 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
45 2
from kytos.core.switch import Switch
46
47 2
__all__ = ('Controller',)
48
49
50 2
def exc_handler(_, exc, __):
51
    """Log uncaught exceptions.
52
53
    Args:
54
        exc_type (ignored): exception type
55
        exc: exception instance
56
        tb (ignored): traceback
57
    """
58
    logging.basicConfig(filename='errlog.log',
59
                        format='%(asctime)s:%(pathname)'
60
                        's:%(levelname)s:%(message)s')
61
    logging.exception('Uncaught Exception: %s', exc)
62
63
64 2
class Controller:
65
    """Main class of Kytos.
66
67
    The main responsibilities of this class are:
68
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
69
        - manage KytosNApps (install, load and unload);
70
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
71
        - manage which event should be sent to NApps methods;
72
        - manage the buffers handlers, considering one thread per handler.
73
    """
74
75
    # Created issue #568 for the disabled checks.
76
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
77 2
    def __init__(self, options=None, loop=None):
78
        """Init method of Controller class takes the parameters below.
79
80
        Args:
81
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
82
                instance of :class:`~kytos.core.config.KytosConfig` class.
83
        """
84 2
        if options is None:
85
            options = KytosConfig().options['daemon']
86
87 2
        self._loop = loop or asyncio.get_event_loop()
88 2
        self._pool = ThreadPoolExecutor(max_workers=1)
89
90
        # asyncio tasks
91 2
        self._tasks = []
92
93
        #: dict: keep the main threads of the controller (buffers and handler)
94 2
        self._threads = {}
95
        #: KytosBuffers: KytosBuffer object with Controller buffers
96 2
        self.buffers = KytosBuffers(loop=self._loop)
97
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
98
        #:
99
        #: This dict stores all connections between the controller and the
100
        #: switches. The key for this dict is a tuple (ip, port). The content
101
        #: is another dict with the connection information.
102 2
        self.connections = {}
103
        #: dict: mapping of events and event listeners.
104
        #:
105
        #: The key of the dict is a KytosEvent (or a string that represent a
106
        #: regex to match against KytosEvents) and the value is a list of
107
        #: methods that will receive the referenced event
108 2
        self.events_listeners = {'kytos/core.connection.new':
109
                                 [self.new_connection]}
110
111
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
112
        #:
113
        #: The key is the napp name (string), while the value is the napp
114
        #: instance itself.
115 2
        self.napps = {}
116
        #: Object generated by ParseArgs on config.py file
117 2
        self.options = options
118
        #: KytosServer: Instance of KytosServer that will be listening to TCP
119
        #: connections.
120 2
        self.server = None
121
        #: dict: Current existing switches.
122
        #:
123
        #: The key is the switch dpid, while the value is a Switch object.
124 2
        self.switches = {}  # dpid: Switch()
125 2
        self._switches_lock = threading.Lock()
126
127
        #: datetime.datetime: Time when the controller finished starting.
128 2
        self.started_at = None
129
130
        #: logging.Logger: Logger instance used by Kytos.
131 2
        self.log = None
132
133
        #: Observer that handle NApps when they are enabled or disabled.
134 2
        self.napp_dir_listener = NAppDirListener(self)
135
136 2
        self.napps_manager = NAppsManager(self)
137
138
        #: API Server used to expose rest endpoints.
139 2
        self.api_server = APIServer(__name__, self.options.listen,
140
                                    self.options.api_port,
141
                                    self.napps_manager, self.options.napps)
142
143 2
        self.auth = Auth(self)
144
145 2
        self._register_endpoints()
146
        #: Adding the napps 'enabled' directory into the PATH
147
        #: Now you can access the enabled napps with:
148
        #: from napps.<username>.<napp_name> import ?....
149 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
150 2
        sys.excepthook = exc_handler
151
152 2
    def enable_logs(self):
153
        """Register kytos log and enable the logs."""
154 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
155 2
        LogManager.enable_websocket(self.api_server.server)
156 2
        self.log = logging.getLogger(__name__)
157
158 2
    @staticmethod
159
    def loggers():
160
        """List all logging Loggers.
161
162
        Return a list of Logger objects, with name and logging level.
163
        """
164 2
        return [logging.getLogger(name)
165
                for name in logging.root.manager.loggerDict
166
                if "kytos" in name]
167
168 2
    def toggle_debug(self, name=None):
169
        """Enable/disable logging debug messages to a given logger name.
170
171
        If the name parameter is not specified the debug will be
172
        enabled/disabled following the initial config file. It will decide
173
        to enable/disable using the 'kytos' name to find the current
174
        logger level.
175
        Obs: To disable the debug the logging will be set to NOTSET
176
177
        Args:
178
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
179
        """
180 2
        if name and name not in logging.root.manager.loggerDict:
181
            # A Logger name that is not declared in logging will raise an error
182
            # otherwise logging would create a new Logger.
183 2
            raise ValueError(f"Invalid logger name: {name}")
184
185 2
        if not name:
186
            # Logger name not specified.
187 2
            level = logging.getLogger('kytos').getEffectiveLevel()
188 2
            enable_debug = level != logging.DEBUG
189
190
            # Enable/disable default Loggers
191 2
            LogManager.load_config_file(self.options.logging, enable_debug)
192 2
            return
193
194
        # Get effective logger level for the name
195 2
        level = logging.getLogger(name).getEffectiveLevel()
196 2
        logger = logging.getLogger(name)
197
198 2
        if level == logging.DEBUG:
199
            # disable debug
200 2
            logger.setLevel(logging.NOTSET)
201
        else:
202
            # enable debug
203 2
            logger.setLevel(logging.DEBUG)
204
205 2
    def start(self, restart=False):
206
        """Create pidfile and call start_controller method."""
207 2
        self.enable_logs()
208 2
        if not restart:
209 2
            self.create_pidfile()
210 2
        self.start_controller()
211
212 2
    def create_pidfile(self):
213
        """Create a pidfile."""
214 2
        pid = os.getpid()
215
216
        # Creates directory if it doesn't exist
217
        # System can erase /var/run's content
218 2
        pid_folder = Path(self.options.pidfile).parent
219 2
        self.log.info(pid_folder)
220
221
        # Pylint incorrectly infers Path objects
222
        # https://github.com/PyCQA/pylint/issues/224
223
        # pylint: disable=no-member
224 2
        if not pid_folder.exists():
225
            pid_folder.mkdir()
226
            pid_folder.chmod(0o1777)
227
        # pylint: enable=no-member
228
229
        # Make sure the file is deleted when controller stops
230 2
        atexit.register(Path(self.options.pidfile).unlink)
231
232
        # Checks if a pidfile exists. Creates a new file.
233 2
        try:
234 2
            pidfile = open(self.options.pidfile, mode='x')
235 2
        except OSError:
236
            # This happens if there is a pidfile already.
237
            # We shall check if the process that created the pidfile is still
238
            # running.
239 2
            try:
240 2
                existing_file = open(self.options.pidfile, mode='r')
241 2
                old_pid = int(existing_file.read())
242 2
                os.kill(old_pid, 0)
243
                # If kill() doesn't return an error, there's a process running
244
                # with the same PID. We assume it is Kytos and quit.
245
                # Otherwise, overwrite the file and proceed.
246
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
247
                             "running. Aborting.")
248
                sys.exit(error_msg.format(self.options.pidfile))
249 2
            except OSError:
250 2
                try:
251 2
                    pidfile = open(self.options.pidfile, mode='w')
252
                except OSError as exception:
253
                    error_msg = "Failed to create pidfile {}: {}."
254
                    sys.exit(error_msg.format(self.options.pidfile, exception))
255
256
        # Identifies the process that created the pidfile.
257 2
        pidfile.write(str(pid))
258 2
        pidfile.close()
259
260 2
    def start_controller(self):
261
        """Start the controller.
262
263
        Starts the KytosServer (TCP Server) coroutine.
264
        Starts a thread for each buffer handler.
265
        Load the installed apps.
266
        """
267 2
        self.log.info("Starting Kytos - Kytos Controller")
268 2
        self.server = KytosServer((self.options.listen,
269
                                   int(self.options.port)),
270
                                  KytosServerProtocol,
271
                                  self,
272
                                  self.options.protocol_name)
273
274 2
        self.log.info("Starting TCP server: %s", self.server)
275 2
        self.server.serve_forever()
276
277 2
        def _stop_loop(_):
278
            loop = asyncio.get_event_loop()
279
            # print(_.result())
280
            threads = threading.enumerate()
281
            self.log.debug("%s threads before loop.stop: %s",
282
                           len(threads), threads)
283
            loop.stop()
284
285 2
        async def _run_api_server_thread(executor):
286
            log = logging.getLogger('kytos.core.controller.api_server_thread')
287
            log.debug('starting')
288
            # log.debug('creating tasks')
289
            loop = asyncio.get_event_loop()
290
            blocking_tasks = [
291
                loop.run_in_executor(executor, self.api_server.run)
292
            ]
293
            # log.debug('waiting for tasks')
294
            completed, pending = await asyncio.wait(blocking_tasks)
295
            # results = [t.result() for t in completed]
296
            # log.debug('results: {!r}'.format(results))
297
            log.debug('completed: %d, pending: %d',
298
                      len(completed), len(pending))
299
300 2
        task = self._loop.create_task(self.raw_event_handler())
301 2
        self._tasks.append(task)
302 2
        task = self._loop.create_task(self.msg_in_event_handler())
303 2
        self._tasks.append(task)
304 2
        task = self._loop.create_task(self.msg_out_event_handler())
305 2
        self._tasks.append(task)
306 2
        task = self._loop.create_task(self.app_event_handler())
307 2
        self._tasks.append(task)
308 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
309 2
        task.add_done_callback(_stop_loop)
310 2
        self._tasks.append(task)
311
312 2
        self.log.info("ThreadPool started: %s", self._pool)
313
314
        # ASYNC TODO: ensure all threads started correctly
315
        # This is critical, if any of them failed starting we should exit.
316
        # sys.exit(error_msg.format(thread, exception))
317
318 2
        self.log.info("Loading Kytos NApps...")
319 2
        self.napp_dir_listener.start()
320 2
        self.pre_install_napps(self.options.napps_pre_installed)
321 2
        self.load_napps()
322
323 2
        self.started_at = now()
324
325 2
    def _register_endpoints(self):
326
        """Register all rest endpoint served by kytos.
327
328
        -   Register APIServer endpoints
329
        -   Register WebUI endpoints
330
        -   Register ``/api/kytos/core/config`` endpoint
331
        """
332 2
        self.api_server.start_api()
333
        # Register controller endpoints as /api/kytos/core/...
334 2
        self.api_server.register_core_endpoint('config/',
335
                                               self.configuration_endpoint)
336 2
        self.api_server.register_core_endpoint('metadata/',
337
                                               Controller.metadata_endpoint)
338 2
        self.api_server.register_core_endpoint(
339
            'reload/<username>/<napp_name>/',
340
            self.rest_reload_napp)
341 2
        self.api_server.register_core_endpoint('reload/all',
342
                                               self.rest_reload_all_napps)
343 2
        self.auth.register_core_auth_services()
344
345 2
    def register_rest_endpoint(self, url, function, methods):
346
        """Deprecate in favor of @rest decorator."""
347 2
        self.api_server.register_rest_endpoint(url, function, methods)
348
349 2
    @classmethod
350
    def metadata_endpoint(cls):
351
        """Return the Kytos metadata.
352
353
        Returns:
354
            string: Json with current kytos metadata.
355
356
        """
357 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
358 2
        meta_file = open(meta_path).read()
359 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
360 2
        return json.dumps(metadata)
361
362 2
    def configuration_endpoint(self):
363
        """Return the configuration options used by Kytos.
364
365
        Returns:
366
            string: Json with current configurations used by kytos.
367
368
        """
369 2
        return json.dumps(self.options.__dict__)
370
371 2
    def restart(self, graceful=True):
372
        """Restart Kytos SDN Controller.
373
374
        Args:
375
            graceful(bool): Represents the way that Kytos will restart.
376
        """
377 2
        if self.started_at is not None:
378 2
            self.stop(graceful)
379 2
            self.__init__(self.options)
380
381 2
        self.start(restart=True)
382
383 2
    def stop(self, graceful=True):
384
        """Shutdown all services used by kytos.
385
386
        This method should:
387
            - stop all Websockets
388
            - stop the API Server
389
            - stop the Controller
390
        """
391 2
        if self.started_at:
392 2
            self.stop_controller(graceful)
393
394 2
    def stop_controller(self, graceful=True):
395
        """Stop the controller.
396
397
        This method should:
398
            - announce on the network that the controller will shutdown;
399
            - stop receiving incoming packages;
400
            - call the 'shutdown' method of each KytosNApp that is running;
401
            - finish reading the events on all buffers;
402
            - stop each running handler;
403
            - stop all running threads;
404
            - stop the KytosServer;
405
        """
406 2
        self.log.info("Stopping Kytos")
407
408 2
        self.buffers.send_stop_signal()
409 2
        self.api_server.stop_api_server()
410 2
        self.napp_dir_listener.stop()
411
412 2
        self.log.info("Stopping threadpool: %s", self._pool)
413 2
        if executor_pool:
414 2
            self.log.info(f"Stopping threadpool: {executor_pool}")
415
416 2
        threads = threading.enumerate()
417 2
        self.log.debug("%s threads before threadpool shutdown: %s",
418
                       len(threads), threads)
419
420 2
        try:
421
            # Python >= 3.9
422
            # pylint: disable=unexpected-keyword-arg
423 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
424 2
            if executor_pool:
425 2
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
426 2
        except TypeError:
427 2
            self._pool.shutdown(wait=graceful)
428 2
            if executor_pool:
429 2
                executor_pool.shutdown(wait=graceful)
430
431
        # self.server.socket.shutdown()
432
        # self.server.socket.close()
433
434
        # for thread in self._threads.values():
435
        #     self.log.info("Stopping thread: %s", thread.name)
436
        #     thread.join()
437
438
        # for thread in self._threads.values():
439
        #     while thread.is_alive():
440
        #         self.log.info("Thread is alive: %s", thread.name)
441
        #         pass
442
443 2
        self.started_at = None
444 2
        self.unload_napps()
445 2
        self.buffers = KytosBuffers()
446
447
        # Cancel all async tasks (event handlers and servers)
448 2
        for task in self._tasks:
449
            task.cancel()
450
451
        # ASYNC TODO: close connections
452
        # self.server.server_close()
453
454
        # Shutdown the TCP server and the main asyncio loop
455 2
        self.server.shutdown()
456
457 2
    def status(self):
458
        """Return status of Kytos Server.
459
460
        If the controller kytos is running this method will be returned
461
        "Running since 'Started_At'", otherwise "Stopped".
462
463
        Returns:
464
            string: String with kytos status.
465
466
        """
467 2
        if self.started_at:
468 2
            return "Running since %s" % self.started_at
469 2
        return "Stopped"
470
471 2
    def uptime(self):
472
        """Return the uptime of kytos server.
473
474
        This method should return:
475
            - 0 if Kytos Server is stopped.
476
            - (kytos.start_at - datetime.now) if Kytos Server is running.
477
478
        Returns:
479
           datetime.timedelta: The uptime interval.
480
481
        """
482 2
        return now() - self.started_at if self.started_at else 0
483
484 2
    def notify_listeners(self, event):
485
        """Send the event to the specified listeners.
486
487
        Loops over self.events_listeners matching (by regexp) the attribute
488
        name of the event with the keys of events_listeners. If a match occurs,
489
        then send the event to each registered listener.
490
491
        Args:
492
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
493
        """
494 2
        self.log.debug("looking for listeners for %s", event)
495 2
        for event_regex, listeners in dict(self.events_listeners).items():
496
            # self.log.debug("listeners found for %s: %r => %s", event,
497
            #                event_regex, [l.__qualname__ for l in listeners])
498
            # Do not match if the event has more characters
499
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
500 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
501 2
                event_regex += '$'
502 2
            if re.match(event_regex, event.name):
503
                # self.log.debug('Calling listeners for %s', event)
504 2
                for listener in listeners:
505 2
                    listener(event)
506
507 2
    async def raw_event_handler(self):
508
        """Handle raw events.
509
510
        Listen to the raw_buffer and send all its events to the
511
        corresponding listeners.
512
        """
513 2
        self.log.info("Raw Event Handler started")
514 2
        while True:
515 2
            event = await self.buffers.raw.aget()
516 2
            self.notify_listeners(event)
517 2
            self.log.debug("Raw Event handler called")
518
519 2
            if event.name == "kytos/core.shutdown":
520 2
                self.log.debug("Raw Event handler stopped")
521 2
                break
522
523 2
    async def msg_in_event_handler(self):
524
        """Handle msg_in events.
525
526
        Listen to the msg_in buffer and send all its events to the
527
        corresponding listeners.
528
        """
529 2
        self.log.info("Message In Event Handler started")
530 2
        while True:
531 2
            event = await self.buffers.msg_in.aget()
532 2
            self.notify_listeners(event)
533 2
            self.log.debug("Message In Event handler called")
534
535 2
            if event.name == "kytos/core.shutdown":
536 2
                self.log.debug("Message In Event handler stopped")
537 2
                break
538
539 2
    async def msg_out_event_handler(self):
540
        """Handle msg_out events.
541
542
        Listen to the msg_out buffer and send all its events to the
543
        corresponding listeners.
544
        """
545 2
        self.log.info("Message Out Event Handler started")
546 2
        while True:
547 2
            triggered_event = await self.buffers.msg_out.aget()
548
549 2
            if triggered_event.name == "kytos/core.shutdown":
550 2
                self.log.debug("Message Out Event handler stopped")
551 2
                break
552
553 2
            message = triggered_event.content['message']
554 2
            destination = triggered_event.destination
555 2
            if (destination and
556
                    not destination.state == ConnectionState.FINISHED):
557 2
                packet = message.pack()
558 2
                destination.send(packet)
559 2
                self.log.debug('Connection %s: OUT OFP, '
560
                               'version: %s, type: %s, xid: %s - %s',
561
                               destination.id,
562
                               message.header.version,
563
                               message.header.message_type,
564
                               message.header.xid,
565
                               packet.hex())
566 2
                self.notify_listeners(triggered_event)
567 2
                self.log.debug("Message Out Event handler called")
568
            else:
569
                self.log.info("connection closed. Cannot send message")
570
571 2
    async def app_event_handler(self):
572
        """Handle app events.
573
574
        Listen to the app buffer and send all its events to the
575
        corresponding listeners.
576
        """
577 2
        self.log.info("App Event Handler started")
578 2
        while True:
579 2
            event = await self.buffers.app.aget()
580 2
            self.notify_listeners(event)
581 2
            self.log.debug("App Event handler called")
582
583 2
            if event.name == "kytos/core.shutdown":
584 2
                self.log.debug("App Event handler stopped")
585 2
                break
586
587 2
    def get_interface_by_id(self, interface_id):
588
        """Find a Interface  with interface_id.
589
590
        Args:
591
            interface_id(str): Interface Identifier.
592
593
        Returns:
594
            Interface: Instance of Interface with the id given.
595
596
        """
597 2
        if interface_id is None:
598 2
            return None
599
600 2
        switch_id = ":".join(interface_id.split(":")[:-1])
601 2
        interface_number = int(interface_id.split(":")[-1])
602
603 2
        switch = self.switches.get(switch_id)
604
605 2
        if not switch:
606 2
            return None
607
608 2
        return switch.interfaces.get(interface_number, None)
609
610 2
    def get_switch_by_dpid(self, dpid):
611
        """Return a specific switch by dpid.
612
613
        Args:
614
            dpid (|DPID|): dpid object used to identify a switch.
615
616
        Returns:
617
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
618
619
        """
620 2
        return self.switches.get(dpid)
621
622 2
    def get_switch_or_create(self, dpid, connection=None):
623
        """Return switch or create it if necessary.
624
625
        Args:
626
            dpid (|DPID|): dpid object used to identify a switch.
627
            connection (:class:`~kytos.core.connection.Connection`):
628
                connection used by switch. If a switch has a connection that
629
                will be updated.
630
631
        Returns:
632
            :class:`~kytos.core.switch.Switch`: new or existent switch.
633
634
        """
635 2
        with self._switches_lock:
636 2
            if connection:
637 2
                self.create_or_update_connection(connection)
638
639 2
            switch = self.get_switch_by_dpid(dpid)
640 2
            event_name = 'kytos/core.switch.'
641
642 2
            if switch is None:
643 2
                switch = Switch(dpid=dpid)
644 2
                self.add_new_switch(switch)
645 2
                event_name += 'new'
646
            else:
647 2
                event_name += 'reconnected'
648
649 2
            self.set_switch_options(dpid=dpid)
650 2
            event = KytosEvent(name=event_name, content={'switch': switch})
651
652 2
            if connection:
653 2
                old_connection = switch.connection
654 2
                switch.update_connection(connection)
655
656 2
                if old_connection is not connection:
657 2
                    self.remove_connection(old_connection)
658
659 2
            self.buffers.app.put(event)
660
661 2
            return switch
662
663 2
    def set_switch_options(self, dpid):
664
        """Update the switch settings based on kytos.conf options.
665
666
        Args:
667
            dpid (str): dpid used to identify a switch.
668
669
        """
670 2
        switch = self.switches.get(dpid)
671 2
        if not switch:
672
            return
673
674 2
        vlan_pool = {}
675 2
        vlan_pool = self.options.vlan_pool
676 2
        if not vlan_pool:
677 2
            return
678
679 2
        if vlan_pool.get(dpid):
680 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
681 2
            for intf_num, port_list in vlan_pool[dpid].items():
682 2
                if not switch.interfaces.get((intf_num)):
683 2
                    vlan_ids = set()
684 2
                    for vlan_range in port_list:
685 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
686 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
687 2
                            vlan_ids.add(vlan_id)
688 2
                    intf_num = int(intf_num)
689 2
                    intf = Interface(name=intf_num, port_number=intf_num,
690
                                     switch=switch)
691 2
                    intf.set_available_tags(vlan_ids)
692 2
                    switch.update_interface(intf)
693
694 2
    def create_or_update_connection(self, connection):
695
        """Update a connection.
696
697
        Args:
698
            connection (:class:`~kytos.core.connection.Connection`):
699
                Instance of connection that will be updated.
700
        """
701 2
        self.connections[connection.id] = connection
702
703 2
    def get_connection_by_id(self, conn_id):
704
        """Return a existent connection by id.
705
706
        Args:
707
            id (int): id from a connection.
708
709
        Returns:
710
            :class:`~kytos.core.connection.Connection`: Instance of connection
711
                or None Type.
712
713
        """
714 2
        return self.connections.get(conn_id)
715
716 2
    def remove_connection(self, connection):
717
        """Close a existent connection and remove it.
718
719
        Args:
720
            connection (:class:`~kytos.core.connection.Connection`):
721
                Instance of connection that will be removed.
722
        """
723 2
        if connection is None:
724 2
            return False
725
726 2
        try:
727 2
            connection.close()
728 2
            del self.connections[connection.id]
729 2
        except KeyError:
730 2
            return False
731 2
        return True
732
733 2
    def remove_switch(self, switch):
734
        """Remove an existent switch.
735
736
        Args:
737
            switch (:class:`~kytos.core.switch.Switch`):
738
                Instance of switch that will be removed.
739
        """
740 2
        try:
741 2
            del self.switches[switch.dpid]
742 2
        except KeyError:
743 2
            return False
744 2
        return True
745
746 2
    def new_connection(self, event):
747
        """Handle a kytos/core.connection.new event.
748
749
        This method will read new connection event and store the connection
750
        (socket) into the connections attribute on the controller.
751
752
        It also clear all references to the connection since it is a new
753
        connection on the same ip:port.
754
755
        Args:
756
            event (~kytos.core.KytosEvent):
757
                The received event (``kytos/core.connection.new``) with the
758
                needed info.
759
        """
760 2
        self.log.info("Handling %s...", event)
761
762 2
        connection = event.source
763 2
        self.log.debug("Event source: %s", event.source)
764
765
        # Remove old connection (aka cleanup) if it exists
766 2
        if self.get_connection_by_id(connection.id):
767
            self.remove_connection(connection.id)
768
769
        # Update connections with the new connection
770 2
        self.create_or_update_connection(connection)
771
772 2
    def add_new_switch(self, switch):
773
        """Add a new switch on the controller.
774
775
        Args:
776
            switch (Switch): A Switch object
777
        """
778 2
        self.switches[switch.dpid] = switch
779
780 2
    def _import_napp(self, username, napp_name):
781
        """Import a NApp module.
782
783
        Raises:
784
            FileNotFoundError: if NApp's main.py is not found.
785
            ModuleNotFoundError: if any NApp requirement is not installed.
786
787
        """
788 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
789 2
        path = os.path.join(self.options.napps, username, napp_name,
790
                            'main.py')
791 2
        napp_spec = spec_from_file_location(mod_name, path)
792 2
        napp_module = module_from_spec(napp_spec)
793 2
        sys.modules[napp_spec.name] = napp_module
794 2
        napp_spec.loader.exec_module(napp_module)
795 2
        return napp_module
796
797 2
    def load_napp(self, username, napp_name):
798
        """Load a single NApp.
799
800
        Args:
801
            username (str): NApp username (makes up NApp's path).
802
            napp_name (str): Name of the NApp to be loaded.
803
        """
804 2
        if (username, napp_name) in self.napps:
805 2
            message = 'NApp %s/%s was already loaded'
806 2
            self.log.warning(message, username, napp_name)
807 2
            return
808
809 2
        try:
810 2
            napp_module = self._import_napp(username, napp_name)
811 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
812 2
            self.log.error("Error loading NApp '%s/%s': %s",
813
                           username, napp_name, err)
814 2
            return
815 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
816 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
817 2
            self.log.warning(msg, err.filename)
818 2
            return
819
820 2
        try:
821 2
            napp = napp_module.Main(controller=self)
822 2
        except:  # noqa pylint: disable=bare-except
823 2
            self.log.critical("NApp initialization failed: %s/%s",
824
                              username, napp_name, exc_info=True)
825 2
            return
826
827 2
        self.napps[(username, napp_name)] = napp
828
829 2
        napp.start()
830 2
        self.api_server.authenticate_endpoints(napp)
831 2
        self.api_server.register_napp_endpoints(napp)
832
833
        # pylint: disable=protected-access
834 2
        for event, listeners in napp._listeners.items():
835
            self.events_listeners.setdefault(event, []).extend(listeners)
836
        # pylint: enable=protected-access
837
838 2
    def pre_install_napps(self, napps, enable=True):
839
        """Pre install and enable NApps.
840
841
        Before installing, it'll check if it's installed yet.
842
843
        Args:
844
            napps ([str]): List of NApps to be pre-installed and enabled.
845
        """
846 2
        all_napps = self.napps_manager.get_installed_napps()
847 2
        installed = [str(napp) for napp in all_napps]
848 2
        napps_diff = [napp for napp in napps if napp not in installed]
849 2
        for napp in napps_diff:
850 2
            self.napps_manager.install(napp, enable=enable)
851
852 2
    def load_napps(self):
853
        """Load all NApps enabled on the NApps dir."""
854 2
        for napp in self.napps_manager.get_enabled_napps():
855 2
            try:
856 2
                self.log.info("Loading NApp %s", napp.id)
857 2
                self.load_napp(napp.username, napp.name)
858
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
859
                self.log.error("Could not load NApp %s: %s",
860
                               napp.id, exception)
861
862 2
    def unload_napp(self, username, napp_name):
863
        """Unload a specific NApp.
864
865
        Args:
866
            username (str): NApp username.
867
            napp_name (str): Name of the NApp to be unloaded.
868
        """
869 2
        napp = self.napps.pop((username, napp_name), None)
870
871 2
        if napp is None:
872
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
873
        else:
874 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
875 2
            napp_id = NApp(username, napp_name).id
876 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
877 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
878
            # Call listener before removing it from events_listeners
879 2
            napp_shutdown_fn(event)
880
881
            # Remove rest endpoints from that napp
882 2
            self.api_server.remove_napp_endpoints(napp)
883
884
            # Removing listeners from that napp
885
            # pylint: disable=protected-access
886 2
            for event_type, napp_listeners in napp._listeners.items():
887
                event_listeners = self.events_listeners[event_type]
888
                for listener in napp_listeners:
889
                    event_listeners.remove(listener)
890
                if not event_listeners:
891
                    del self.events_listeners[event_type]
892
            # pylint: enable=protected-access
893
894 2
    def unload_napps(self):
895
        """Unload all loaded NApps that are not core NApps."""
896
        # list() is used here to avoid the error:
897
        # 'RuntimeError: dictionary changed size during iteration'
898
        # This is caused by looping over an dictionary while removing
899
        # items from it.
900
        for (username, napp_name) in list(self.napps.keys()):  # noqa
901
            self.unload_napp(username, napp_name)
902
903 2
    def reload_napp_module(self, username, napp_name, napp_file):
904
        """Reload a NApp Module."""
905 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
906 2
        try:
907 2
            napp_module = import_module(mod_name)
908 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
909 2
            self.log.error("Module '%s' not found", mod_name)
910 2
            raise
911 2
        try:
912 2
            napp_module = reload_module(napp_module)
913 2
        except ImportError as err:
914 2
            self.log.error("Error reloading NApp '%s/%s': %s",
915
                           username, napp_name, err)
916 2
            raise
917
918 2
    def reload_napp(self, username, napp_name):
919
        """Reload a NApp."""
920 2
        self.unload_napp(username, napp_name)
921 2
        try:
922 2
            self.reload_napp_module(username, napp_name, 'settings')
923 2
            self.reload_napp_module(username, napp_name, 'main')
924 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
925 2
            return 400
926 2
        self.log.info("NApp '%s/%s' successfully reloaded",
927
                      username, napp_name)
928 2
        self.load_napp(username, napp_name)
929 2
        return 200
930
931 2
    def rest_reload_napp(self, username, napp_name):
932
        """Request reload a NApp."""
933 2
        res = self.reload_napp(username, napp_name)
934 2
        return 'reloaded', res
935
936 2
    def rest_reload_all_napps(self):
937
        """Request reload all NApps."""
938 2
        for napp in self.napps:
939 2
            self.reload_napp(*napp)
940
        return 'reloaded', 200
941