Test Failed
Pull Request — master (#149)
by Vinicius
06:57
created

kytos.core.controller.Controller.status()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 4
nop 1
dl 0
loc 13
rs 10
c 0
b 0
f 0
ccs 4
cts 4
cp 1
crap 2
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29
30 2
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 2
from kytos.core.auth import Auth
34 2
from kytos.core.buffers import KytosBuffers
35 2
from kytos.core.config import KytosConfig
36 2
from kytos.core.connection import ConnectionState
37 2
from kytos.core.events import KytosEvent
38 2
from kytos.core.helpers import executor as executor_pool
39 2
from kytos.core.helpers import now
40 2
from kytos.core.interface import Interface
41 2
from kytos.core.logs import LogManager
42 2
from kytos.core.napps.base import NApp
43 2
from kytos.core.napps.manager import NAppsManager
44 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
45 2
from kytos.core.switch import Switch
46
47 2
__all__ = ('Controller',)
48
49
50 2
def exc_handler(_, exc, __):
51
    """Log uncaught exceptions.
52
53
    Args:
54
        exc_type (ignored): exception type
55
        exc: exception instance
56
        tb (ignored): traceback
57
    """
58
    logging.basicConfig(filename='errlog.log',
59
                        format='%(asctime)s:%(pathname)'
60
                        's:%(levelname)s:%(message)s')
61
    logging.exception('Uncaught Exception: %s', exc)
62
63
64 2
class Controller:
65
    """Main class of Kytos.
66
67
    The main responsibilities of this class are:
68
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
69
        - manage KytosNApps (install, load and unload);
70
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
71
        - manage which event should be sent to NApps methods;
72
        - manage the buffers handlers, considering one thread per handler.
73
    """
74
75
    # Created issue #568 for the disabled checks.
76
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
77 2
    def __init__(self, options=None, loop=None):
78
        """Init method of Controller class takes the parameters below.
79
80
        Args:
81
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
82
                instance of :class:`~kytos.core.config.KytosConfig` class.
83
        """
84 2
        if options is None:
85
            options = KytosConfig().options['daemon']
86
87 2
        self._loop = loop or asyncio.get_event_loop()
88 2
        self._pool = ThreadPoolExecutor(max_workers=1)
89
90
        # asyncio tasks
91 2
        self._tasks = []
92
93
        #: dict: keep the main threads of the controller (buffers and handler)
94 2
        self._threads = {}
95
        #: KytosBuffers: KytosBuffer object with Controller buffers
96 2
        self.buffers = KytosBuffers(loop=self._loop)
97
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
98
        #:
99
        #: This dict stores all connections between the controller and the
100
        #: switches. The key for this dict is a tuple (ip, port). The content
101
        #: is another dict with the connection information.
102 2
        self.connections = {}
103
        #: dict: mapping of events and event listeners.
104
        #:
105
        #: The key of the dict is a KytosEvent (or a string that represent a
106
        #: regex to match against KytosEvents) and the value is a list of
107
        #: methods that will receive the referenced event
108 2
        self.events_listeners = {'kytos/core.connection.new':
109
                                 [self.new_connection]}
110
111
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
112
        #:
113
        #: The key is the napp name (string), while the value is the napp
114
        #: instance itself.
115 2
        self.napps = {}
116
        #: Object generated by ParseArgs on config.py file
117 2
        self.options = options
118
        #: KytosServer: Instance of KytosServer that will be listening to TCP
119
        #: connections.
120 2
        self.server = None
121
        #: dict: Current existing switches.
122
        #:
123
        #: The key is the switch dpid, while the value is a Switch object.
124 2
        self.switches = {}  # dpid: Switch()
125 2
        self._switches_lock = threading.Lock()
126
127
        #: datetime.datetime: Time when the controller finished starting.
128 2
        self.started_at = None
129
130
        #: logging.Logger: Logger instance used by Kytos.
131 2
        self.log = None
132
133
        #: Observer that handle NApps when they are enabled or disabled.
134 2
        self.napp_dir_listener = NAppDirListener(self)
135
136 2
        self.napps_manager = NAppsManager(self)
137
138
        #: API Server used to expose rest endpoints.
139 2
        self.api_server = APIServer(__name__, self.options.listen,
140
                                    self.options.api_port,
141
                                    self.napps_manager, self.options.napps)
142
143 2
        self.auth = Auth(self)
144
145 2
        self._register_endpoints()
146
        #: Adding the napps 'enabled' directory into the PATH
147
        #: Now you can access the enabled napps with:
148
        #: from napps.<username>.<napp_name> import ?....
149 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
150 2
        sys.excepthook = exc_handler
151
152 2
    def enable_logs(self):
153
        """Register kytos log and enable the logs."""
154 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
155 2
        LogManager.enable_websocket(self.api_server.server)
156 2
        self.log = logging.getLogger(__name__)
157
158 2
    @staticmethod
159
    def loggers():
160
        """List all logging Loggers.
161
162
        Return a list of Logger objects, with name and logging level.
163
        """
164 2
        return [logging.getLogger(name)
165
                for name in logging.root.manager.loggerDict
166
                if "kytos" in name]
167
168 2
    def toggle_debug(self, name=None):
169
        """Enable/disable logging debug messages to a given logger name.
170
171
        If the name parameter is not specified the debug will be
172
        enabled/disabled following the initial config file. It will decide
173
        to enable/disable using the 'kytos' name to find the current
174
        logger level.
175
        Obs: To disable the debug the logging will be set to NOTSET
176
177
        Args:
178
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
179
        """
180 2
        if name and name not in logging.root.manager.loggerDict:
181
            # A Logger name that is not declared in logging will raise an error
182
            # otherwise logging would create a new Logger.
183 2
            raise ValueError(f"Invalid logger name: {name}")
184
185 2
        if not name:
186
            # Logger name not specified.
187 2
            level = logging.getLogger('kytos').getEffectiveLevel()
188 2
            enable_debug = level != logging.DEBUG
189
190
            # Enable/disable default Loggers
191 2
            LogManager.load_config_file(self.options.logging, enable_debug)
192 2
            return
193
194
        # Get effective logger level for the name
195 2
        level = logging.getLogger(name).getEffectiveLevel()
196 2
        logger = logging.getLogger(name)
197
198 2
        if level == logging.DEBUG:
199
            # disable debug
200 2
            logger.setLevel(logging.NOTSET)
201
        else:
202
            # enable debug
203 2
            logger.setLevel(logging.DEBUG)
204
205 2
    def start(self, restart=False):
206
        """Create pidfile and call start_controller method."""
207 2
        self.enable_logs()
208 2
        if not restart:
209 2
            self.create_pidfile()
210 2
        self.start_controller()
211
212 2
    def create_pidfile(self):
213
        """Create a pidfile."""
214 2
        pid = os.getpid()
215
216
        # Creates directory if it doesn't exist
217
        # System can erase /var/run's content
218 2
        pid_folder = Path(self.options.pidfile).parent
219 2
        self.log.info(pid_folder)
220
221
        # Pylint incorrectly infers Path objects
222
        # https://github.com/PyCQA/pylint/issues/224
223
        # pylint: disable=no-member
224 2
        if not pid_folder.exists():
225
            pid_folder.mkdir()
226
            pid_folder.chmod(0o1777)
227
        # pylint: enable=no-member
228
229
        # Make sure the file is deleted when controller stops
230 2
        atexit.register(Path(self.options.pidfile).unlink)
231
232
        # Checks if a pidfile exists. Creates a new file.
233 2
        try:
234 2
            pidfile = open(self.options.pidfile, mode='x')
235 2
        except OSError:
236
            # This happens if there is a pidfile already.
237
            # We shall check if the process that created the pidfile is still
238
            # running.
239 2
            try:
240 2
                existing_file = open(self.options.pidfile, mode='r')
241 2
                old_pid = int(existing_file.read())
242 2
                os.kill(old_pid, 0)
243
                # If kill() doesn't return an error, there's a process running
244
                # with the same PID. We assume it is Kytos and quit.
245
                # Otherwise, overwrite the file and proceed.
246
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
247
                             "running. Aborting.")
248
                sys.exit(error_msg.format(self.options.pidfile))
249 2
            except OSError:
250 2
                try:
251 2
                    pidfile = open(self.options.pidfile, mode='w')
252
                except OSError as exception:
253
                    error_msg = "Failed to create pidfile {}: {}."
254
                    sys.exit(error_msg.format(self.options.pidfile, exception))
255
256
        # Identifies the process that created the pidfile.
257 2
        pidfile.write(str(pid))
258 2
        pidfile.close()
259
260 2
    def start_controller(self):
261
        """Start the controller.
262
263
        Starts the KytosServer (TCP Server) coroutine.
264
        Starts a thread for each buffer handler.
265
        Load the installed apps.
266
        """
267 2
        self.log.info("Starting Kytos - Kytos Controller")
268 2
        self.server = KytosServer((self.options.listen,
269
                                   int(self.options.port)),
270
                                  KytosServerProtocol,
271
                                  self,
272
                                  self.options.protocol_name)
273
274 2
        self.log.info("Starting TCP server: %s", self.server)
275 2
        self.server.serve_forever()
276
277 2
        def _stop_loop(_):
278
            loop = asyncio.get_event_loop()
279
            # print(_.result())
280
            threads = threading.enumerate()
281
            self.log.debug("%s threads before loop.stop: %s",
282
                           len(threads), threads)
283
            loop.stop()
284
285 2
        async def _run_api_server_thread(executor):
286
            log = logging.getLogger('kytos.core.controller.api_server_thread')
287
            log.debug('starting')
288
            # log.debug('creating tasks')
289
            loop = asyncio.get_event_loop()
290
            blocking_tasks = [
291
                loop.run_in_executor(executor, self.api_server.run)
292
            ]
293
            # log.debug('waiting for tasks')
294
            completed, pending = await asyncio.wait(blocking_tasks)
295
            # results = [t.result() for t in completed]
296
            # log.debug('results: {!r}'.format(results))
297
            log.debug('completed: %d, pending: %d',
298
                      len(completed), len(pending))
299
300 2
        task = self._loop.create_task(self.raw_event_handler())
301 2
        self._tasks.append(task)
302 2
        task = self._loop.create_task(self.msg_in_event_handler())
303 2
        self._tasks.append(task)
304 2
        task = self._loop.create_task(self.msg_out_event_handler())
305 2
        self._tasks.append(task)
306 2
        task = self._loop.create_task(self.app_event_handler())
307 2
        self._tasks.append(task)
308 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
309 2
        task.add_done_callback(_stop_loop)
310 2
        self._tasks.append(task)
311
312 2
        self.log.info("ThreadPool started: %s", self._pool)
313
314
        # ASYNC TODO: ensure all threads started correctly
315
        # This is critical, if any of them failed starting we should exit.
316
        # sys.exit(error_msg.format(thread, exception))
317
318 2
        self.log.info("Loading Kytos NApps...")
319 2
        self.napp_dir_listener.start()
320 2
        self.pre_install_napps(self.options.napps_pre_installed)
321 2
        self.load_napps()
322
323 2
        self.started_at = now()
324
325 2
    def _register_endpoints(self):
326
        """Register all rest endpoint served by kytos.
327
328
        -   Register APIServer endpoints
329
        -   Register WebUI endpoints
330
        -   Register ``/api/kytos/core/config`` endpoint
331
        """
332 2
        self.api_server.start_api()
333
        # Register controller endpoints as /api/kytos/core/...
334 2
        self.api_server.register_core_endpoint('config/',
335
                                               self.configuration_endpoint)
336 2
        self.api_server.register_core_endpoint('metadata/',
337
                                               Controller.metadata_endpoint)
338 2
        self.api_server.register_core_endpoint(
339
            'reload/<username>/<napp_name>/',
340
            self.rest_reload_napp)
341 2
        self.api_server.register_core_endpoint('reload/all',
342
                                               self.rest_reload_all_napps)
343 2
        self.auth.register_core_auth_services()
344
345 2
    def register_rest_endpoint(self, url, function, methods):
346
        """Deprecate in favor of @rest decorator."""
347 2
        self.api_server.register_rest_endpoint(url, function, methods)
348
349 2
    @classmethod
350
    def metadata_endpoint(cls):
351
        """Return the Kytos metadata.
352
353
        Returns:
354
            string: Json with current kytos metadata.
355
356
        """
357 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
358 2
        meta_file = open(meta_path).read()
359 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
360 2
        return json.dumps(metadata)
361
362 2
    def configuration_endpoint(self):
363
        """Return the configuration options used by Kytos.
364
365
        Returns:
366
            string: Json with current configurations used by kytos.
367
368
        """
369 2
        return json.dumps(self.options.__dict__)
370
371 2
    def restart(self, graceful=True):
372
        """Restart Kytos SDN Controller.
373
374
        Args:
375
            graceful(bool): Represents the way that Kytos will restart.
376
        """
377 2
        if self.started_at is not None:
378 2
            self.stop(graceful)
379 2
            self.__init__(self.options)
380
381 2
        self.start(restart=True)
382
383 2
    def stop(self, graceful=True):
384
        """Shutdown all services used by kytos.
385
386
        This method should:
387
            - stop all Websockets
388
            - stop the API Server
389
            - stop the Controller
390
        """
391 2
        if self.started_at:
392 2
            self.stop_controller(graceful)
393
394 2
    def stop_controller(self, graceful=True):
395
        """Stop the controller.
396
397
        This method should:
398
            - announce on the network that the controller will shutdown;
399
            - stop receiving incoming packages;
400
            - call the 'shutdown' method of each KytosNApp that is running;
401
            - finish reading the events on all buffers;
402
            - stop each running handler;
403
            - stop all running threads;
404
            - stop the KytosServer;
405
        """
406 2
        self.log.info("Stopping Kytos")
407
408 2
        self.buffers.send_stop_signal()
409 2
        self.api_server.stop_api_server()
410 2
        self.napp_dir_listener.stop()
411
412 2
        self.log.info("Stopping threadpool: %s", self._pool)
413 2
        if executor_pool:
414 2
            self.log.info(f"Stopping threadpool: {executor_pool}")
415
416 2
        threads = threading.enumerate()
417 2
        self.log.debug("%s threads before threadpool shutdown: %s",
418
                       len(threads), threads)
419
420 2
        try:
421
            # Python >= 3.9
422
            # pylint: disable=unexpected-keyword-arg
423 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
424
            # pylint: enable=unexpected-keyword-arg
425 2
            if executor_pool:
426 2
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
427 2
        except TypeError:
428 2
            self._pool.shutdown(wait=graceful)
429 2
            if executor_pool:
430 2
                executor_pool.shutdown(wait=graceful)
431
432
        # self.server.socket.shutdown()
433
        # self.server.socket.close()
434
435
        # for thread in self._threads.values():
436
        #     self.log.info("Stopping thread: %s", thread.name)
437
        #     thread.join()
438
439
        # for thread in self._threads.values():
440
        #     while thread.is_alive():
441
        #         self.log.info("Thread is alive: %s", thread.name)
442
        #         pass
443
444 2
        self.started_at = None
445 2
        self.unload_napps()
446 2
        self.buffers = KytosBuffers()
447
448
        # Cancel all async tasks (event handlers and servers)
449 2
        for task in self._tasks:
450
            task.cancel()
451
452
        # ASYNC TODO: close connections
453
        # self.server.server_close()
454
455
        # Shutdown the TCP server and the main asyncio loop
456 2
        self.server.shutdown()
457
458 2
    def status(self):
459
        """Return status of Kytos Server.
460
461
        If the controller kytos is running this method will be returned
462
        "Running since 'Started_At'", otherwise "Stopped".
463
464
        Returns:
465
            string: String with kytos status.
466
467
        """
468 2
        if self.started_at:
469 2
            return "Running since %s" % self.started_at
470 2
        return "Stopped"
471
472 2
    def uptime(self):
473
        """Return the uptime of kytos server.
474
475
        This method should return:
476
            - 0 if Kytos Server is stopped.
477
            - (kytos.start_at - datetime.now) if Kytos Server is running.
478
479
        Returns:
480
           datetime.timedelta: The uptime interval.
481
482
        """
483 2
        return now() - self.started_at if self.started_at else 0
484
485 2
    def notify_listeners(self, event):
486
        """Send the event to the specified listeners.
487
488
        Loops over self.events_listeners matching (by regexp) the attribute
489
        name of the event with the keys of events_listeners. If a match occurs,
490
        then send the event to each registered listener.
491
492
        Args:
493
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
494
        """
495 2
        self.log.debug("looking for listeners for %s", event)
496 2
        for event_regex, listeners in dict(self.events_listeners).items():
497
            # self.log.debug("listeners found for %s: %r => %s", event,
498
            #                event_regex, [l.__qualname__ for l in listeners])
499
            # Do not match if the event has more characters
500
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
501 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
502 2
                event_regex += '$'
503 2
            if re.match(event_regex, event.name):
504
                # self.log.debug('Calling listeners for %s', event)
505 2
                for listener in listeners:
506 2
                    listener(event)
507
508 2
    async def raw_event_handler(self):
509
        """Handle raw events.
510
511
        Listen to the raw_buffer and send all its events to the
512
        corresponding listeners.
513
        """
514 2
        self.log.info("Raw Event Handler started")
515 2
        while True:
516 2
            event = await self.buffers.raw.aget()
517 2
            self.notify_listeners(event)
518 2
            self.log.debug("Raw Event handler called")
519
520 2
            if event.name == "kytos/core.shutdown":
521 2
                self.log.debug("Raw Event handler stopped")
522 2
                break
523
524 2
    async def msg_in_event_handler(self):
525
        """Handle msg_in events.
526
527
        Listen to the msg_in buffer and send all its events to the
528
        corresponding listeners.
529
        """
530 2
        self.log.info("Message In Event Handler started")
531 2
        while True:
532 2
            event = await self.buffers.msg_in.aget()
533 2
            self.notify_listeners(event)
534 2
            self.log.debug("Message In Event handler called")
535
536 2
            if event.name == "kytos/core.shutdown":
537 2
                self.log.debug("Message In Event handler stopped")
538 2
                break
539
540 2
    async def msg_out_event_handler(self):
541
        """Handle msg_out events.
542
543
        Listen to the msg_out buffer and send all its events to the
544
        corresponding listeners.
545
        """
546 2
        self.log.info("Message Out Event Handler started")
547 2
        while True:
548 2
            triggered_event = await self.buffers.msg_out.aget()
549
550 2
            if triggered_event.name == "kytos/core.shutdown":
551 2
                self.log.debug("Message Out Event handler stopped")
552 2
                break
553
554 2
            message = triggered_event.content['message']
555 2
            destination = triggered_event.destination
556 2
            if (destination and
557
                    not destination.state == ConnectionState.FINISHED):
558 2
                packet = message.pack()
559 2
                destination.send(packet)
560 2
                self.log.debug('Connection %s: OUT OFP, '
561
                               'version: %s, type: %s, xid: %s - %s',
562
                               destination.id,
563
                               message.header.version,
564
                               message.header.message_type,
565
                               message.header.xid,
566
                               packet.hex())
567 2
                self.notify_listeners(triggered_event)
568 2
                self.log.debug("Message Out Event handler called")
569
            else:
570
                self.log.info("connection closed. Cannot send message")
571
572 2
    async def app_event_handler(self):
573
        """Handle app events.
574
575
        Listen to the app buffer and send all its events to the
576
        corresponding listeners.
577
        """
578 2
        self.log.info("App Event Handler started")
579 2
        while True:
580 2
            event = await self.buffers.app.aget()
581 2
            self.notify_listeners(event)
582 2
            self.log.debug("App Event handler called")
583
584 2
            if event.name == "kytos/core.shutdown":
585 2
                self.log.debug("App Event handler stopped")
586 2
                break
587
588 2
    def get_interface_by_id(self, interface_id):
589
        """Find a Interface  with interface_id.
590
591
        Args:
592
            interface_id(str): Interface Identifier.
593
594
        Returns:
595
            Interface: Instance of Interface with the id given.
596
597
        """
598 2
        if interface_id is None:
599 2
            return None
600
601 2
        switch_id = ":".join(interface_id.split(":")[:-1])
602 2
        interface_number = int(interface_id.split(":")[-1])
603
604 2
        switch = self.switches.get(switch_id)
605
606 2
        if not switch:
607 2
            return None
608
609 2
        return switch.interfaces.get(interface_number, None)
610
611 2
    def get_switch_by_dpid(self, dpid):
612
        """Return a specific switch by dpid.
613
614
        Args:
615
            dpid (|DPID|): dpid object used to identify a switch.
616
617
        Returns:
618
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
619
620
        """
621 2
        return self.switches.get(dpid)
622
623 2
    def get_switch_or_create(self, dpid, connection=None):
624
        """Return switch or create it if necessary.
625
626
        Args:
627
            dpid (|DPID|): dpid object used to identify a switch.
628
            connection (:class:`~kytos.core.connection.Connection`):
629
                connection used by switch. If a switch has a connection that
630
                will be updated.
631
632
        Returns:
633
            :class:`~kytos.core.switch.Switch`: new or existent switch.
634
635
        """
636 2
        with self._switches_lock:
637 2
            if connection:
638 2
                self.create_or_update_connection(connection)
639
640 2
            switch = self.get_switch_by_dpid(dpid)
641 2
            event_name = 'kytos/core.switch.'
642
643 2
            if switch is None:
644 2
                switch = Switch(dpid=dpid)
645 2
                self.add_new_switch(switch)
646 2
                event_name += 'new'
647
            else:
648 2
                event_name += 'reconnected'
649
650 2
            self.set_switch_options(dpid=dpid)
651 2
            event = KytosEvent(name=event_name, content={'switch': switch})
652
653 2
            if connection:
654 2
                old_connection = switch.connection
655 2
                switch.update_connection(connection)
656
657 2
                if old_connection is not connection:
658 2
                    self.remove_connection(old_connection)
659
660 2
            self.buffers.app.put(event)
661
662 2
            return switch
663
664 2
    def set_switch_options(self, dpid):
665
        """Update the switch settings based on kytos.conf options.
666
667
        Args:
668
            dpid (str): dpid used to identify a switch.
669
670
        """
671 2
        switch = self.switches.get(dpid)
672 2
        if not switch:
673
            return
674
675 2
        vlan_pool = {}
676 2
        vlan_pool = self.options.vlan_pool
677 2
        if not vlan_pool:
678 2
            return
679
680 2
        if vlan_pool.get(dpid):
681 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
682 2
            for intf_num, port_list in vlan_pool[dpid].items():
683 2
                if not switch.interfaces.get((intf_num)):
684 2
                    vlan_ids = set()
685 2
                    for vlan_range in port_list:
686 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
687 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
688 2
                            vlan_ids.add(vlan_id)
689 2
                    intf_num = int(intf_num)
690 2
                    intf = Interface(name=intf_num, port_number=intf_num,
691
                                     switch=switch)
692 2
                    intf.set_available_tags(vlan_ids)
693 2
                    switch.update_interface(intf)
694
695 2
    def create_or_update_connection(self, connection):
696
        """Update a connection.
697
698
        Args:
699
            connection (:class:`~kytos.core.connection.Connection`):
700
                Instance of connection that will be updated.
701
        """
702 2
        self.connections[connection.id] = connection
703
704 2
    def get_connection_by_id(self, conn_id):
705
        """Return a existent connection by id.
706
707
        Args:
708
            id (int): id from a connection.
709
710
        Returns:
711
            :class:`~kytos.core.connection.Connection`: Instance of connection
712
                or None Type.
713
714
        """
715 2
        return self.connections.get(conn_id)
716
717 2
    def remove_connection(self, connection):
718
        """Close a existent connection and remove it.
719
720
        Args:
721
            connection (:class:`~kytos.core.connection.Connection`):
722
                Instance of connection that will be removed.
723
        """
724 2
        if connection is None:
725 2
            return False
726
727 2
        try:
728 2
            connection.close()
729 2
            del self.connections[connection.id]
730 2
        except KeyError:
731 2
            return False
732 2
        return True
733
734 2
    def remove_switch(self, switch):
735
        """Remove an existent switch.
736
737
        Args:
738
            switch (:class:`~kytos.core.switch.Switch`):
739
                Instance of switch that will be removed.
740
        """
741 2
        try:
742 2
            del self.switches[switch.dpid]
743 2
        except KeyError:
744 2
            return False
745 2
        return True
746
747 2
    def new_connection(self, event):
748
        """Handle a kytos/core.connection.new event.
749
750
        This method will read new connection event and store the connection
751
        (socket) into the connections attribute on the controller.
752
753
        It also clear all references to the connection since it is a new
754
        connection on the same ip:port.
755
756
        Args:
757
            event (~kytos.core.KytosEvent):
758
                The received event (``kytos/core.connection.new``) with the
759
                needed info.
760
        """
761 2
        self.log.info("Handling %s...", event)
762
763 2
        connection = event.source
764 2
        self.log.debug("Event source: %s", event.source)
765
766
        # Remove old connection (aka cleanup) if it exists
767 2
        if self.get_connection_by_id(connection.id):
768
            self.remove_connection(connection.id)
769
770
        # Update connections with the new connection
771 2
        self.create_or_update_connection(connection)
772
773 2
    def add_new_switch(self, switch):
774
        """Add a new switch on the controller.
775
776
        Args:
777
            switch (Switch): A Switch object
778
        """
779 2
        self.switches[switch.dpid] = switch
780
781 2
    def _import_napp(self, username, napp_name):
782
        """Import a NApp module.
783
784
        Raises:
785
            FileNotFoundError: if NApp's main.py is not found.
786
            ModuleNotFoundError: if any NApp requirement is not installed.
787
788
        """
789 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
790 2
        path = os.path.join(self.options.napps, username, napp_name,
791
                            'main.py')
792 2
        napp_spec = spec_from_file_location(mod_name, path)
793 2
        napp_module = module_from_spec(napp_spec)
794 2
        sys.modules[napp_spec.name] = napp_module
795 2
        napp_spec.loader.exec_module(napp_module)
796 2
        return napp_module
797
798 2
    def load_napp(self, username, napp_name):
799
        """Load a single NApp.
800
801
        Args:
802
            username (str): NApp username (makes up NApp's path).
803
            napp_name (str): Name of the NApp to be loaded.
804
        """
805 2
        if (username, napp_name) in self.napps:
806 2
            message = 'NApp %s/%s was already loaded'
807 2
            self.log.warning(message, username, napp_name)
808 2
            return
809
810 2
        try:
811 2
            napp_module = self._import_napp(username, napp_name)
812 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
813 2
            self.log.error("Error loading NApp '%s/%s': %s",
814
                           username, napp_name, err)
815 2
            return
816 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
817 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
818 2
            self.log.warning(msg, err.filename)
819 2
            return
820
821 2
        try:
822 2
            napp = napp_module.Main(controller=self)
823 2
        except:  # noqa pylint: disable=bare-except
824 2
            self.log.critical("NApp initialization failed: %s/%s",
825
                              username, napp_name, exc_info=True)
826 2
            return
827
828 2
        self.napps[(username, napp_name)] = napp
829
830 2
        napp.start()
831 2
        self.api_server.authenticate_endpoints(napp)
832 2
        self.api_server.register_napp_endpoints(napp)
833
834
        # pylint: disable=protected-access
835 2
        for event, listeners in napp._listeners.items():
836
            self.events_listeners.setdefault(event, []).extend(listeners)
837
        # pylint: enable=protected-access
838
839 2
    def pre_install_napps(self, napps, enable=True):
840
        """Pre install and enable NApps.
841
842
        Before installing, it'll check if it's installed yet.
843
844
        Args:
845
            napps ([str]): List of NApps to be pre-installed and enabled.
846
        """
847 2
        all_napps = self.napps_manager.get_installed_napps()
848 2
        installed = [str(napp) for napp in all_napps]
849 2
        napps_diff = [napp for napp in napps if napp not in installed]
850 2
        for napp in napps_diff:
851 2
            self.napps_manager.install(napp, enable=enable)
852
853 2
    def load_napps(self):
854
        """Load all NApps enabled on the NApps dir."""
855 2
        for napp in self.napps_manager.get_enabled_napps():
856 2
            try:
857 2
                self.log.info("Loading NApp %s", napp.id)
858 2
                self.load_napp(napp.username, napp.name)
859
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
860
                self.log.error("Could not load NApp %s: %s",
861
                               napp.id, exception)
862
863 2
    def unload_napp(self, username, napp_name):
864
        """Unload a specific NApp.
865
866
        Args:
867
            username (str): NApp username.
868
            napp_name (str): Name of the NApp to be unloaded.
869
        """
870 2
        napp = self.napps.pop((username, napp_name), None)
871
872 2
        if napp is None:
873
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
874
        else:
875 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
876 2
            napp_id = NApp(username, napp_name).id
877 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
878 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
879
            # Call listener before removing it from events_listeners
880 2
            napp_shutdown_fn(event)
881
882
            # Remove rest endpoints from that napp
883 2
            self.api_server.remove_napp_endpoints(napp)
884
885
            # Removing listeners from that napp
886
            # pylint: disable=protected-access
887 2
            for event_type, napp_listeners in napp._listeners.items():
888
                event_listeners = self.events_listeners[event_type]
889
                for listener in napp_listeners:
890
                    event_listeners.remove(listener)
891
                if not event_listeners:
892
                    del self.events_listeners[event_type]
893
            # pylint: enable=protected-access
894
895 2
    def unload_napps(self):
896
        """Unload all loaded NApps that are not core NApps."""
897
        # list() is used here to avoid the error:
898
        # 'RuntimeError: dictionary changed size during iteration'
899
        # This is caused by looping over an dictionary while removing
900
        # items from it.
901
        for (username, napp_name) in list(self.napps.keys()):  # noqa
902
            self.unload_napp(username, napp_name)
903
904 2
    def reload_napp_module(self, username, napp_name, napp_file):
905
        """Reload a NApp Module."""
906 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
907 2
        try:
908 2
            napp_module = import_module(mod_name)
909 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
910 2
            self.log.error("Module '%s' not found", mod_name)
911 2
            raise
912 2
        try:
913 2
            napp_module = reload_module(napp_module)
914 2
        except ImportError as err:
915 2
            self.log.error("Error reloading NApp '%s/%s': %s",
916
                           username, napp_name, err)
917 2
            raise
918
919 2
    def reload_napp(self, username, napp_name):
920
        """Reload a NApp."""
921 2
        self.unload_napp(username, napp_name)
922 2
        try:
923 2
            self.reload_napp_module(username, napp_name, 'settings')
924 2
            self.reload_napp_module(username, napp_name, 'main')
925 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
926 2
            return 400
927 2
        self.log.info("NApp '%s/%s' successfully reloaded",
928
                      username, napp_name)
929 2
        self.load_napp(username, napp_name)
930 2
        return 200
931
932 2
    def rest_reload_napp(self, username, napp_name):
933
        """Request reload a NApp."""
934 2
        res = self.reload_napp(username, napp_name)
935 2
        return 'reloaded', res
936
937 2
    def rest_reload_all_napps(self):
938
        """Request reload all NApps."""
939 2
        for napp in self.napps:
940 2
            self.reload_napp(*napp)
941
        return 'reloaded', 200
942