Passed
Push — master ( a1aaaf...e75c6b )
by Humberto
02:11 queued 12s
created

kytos.core.controller.exc_handler()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 12
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
def exc_handler(_, exc, __):
50
    """Log uncaught exceptions.
51
52
    Args:
53
        exc_type (ignored): exception type
54
        exc: exception instance
55
        tb (ignored): traceback
56
    """
57
    logging.basicConfig(filename='errlog.log',
58
                        format='%(asctime)s:%(pathname)'
59
                        's:%(levelname)s:%(message)s')
60
    logging.exception('Uncaught Exception: %s', exc)
61
62
63 1
class Controller:
64
    """Main class of Kytos.
65
66
    The main responsibilities of this class are:
67
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
68
        - manage KytosNApps (install, load and unload);
69
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
70
        - manage which event should be sent to NApps methods;
71
        - manage the buffers handlers, considering one thread per handler.
72
    """
73
74
    # Created issue #568 for the disabled checks.
75
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
76 1
    def __init__(self, options=None, loop=None):
77
        """Init method of Controller class takes the parameters below.
78
79
        Args:
80
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
81
                instance of :class:`~kytos.core.config.KytosConfig` class.
82
        """
83 1
        if options is None:
84
            options = KytosConfig().options['daemon']
85
86 1
        self._loop = loop or asyncio.get_event_loop()
87 1
        self._pool = ThreadPoolExecutor(max_workers=1)
88
89
        #: dict: keep the main threads of the controller (buffers and handler)
90 1
        self._threads = {}
91
        #: KytosBuffers: KytosBuffer object with Controller buffers
92 1
        self.buffers = KytosBuffers(loop=self._loop)
93
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
94
        #:
95
        #: This dict stores all connections between the controller and the
96
        #: switches. The key for this dict is a tuple (ip, port). The content
97
        #: is another dict with the connection information.
98 1
        self.connections = {}
99
        #: dict: mapping of events and event listeners.
100
        #:
101
        #: The key of the dict is a KytosEvent (or a string that represent a
102
        #: regex to match against KytosEvents) and the value is a list of
103
        #: methods that will receive the referenced event
104 1
        self.events_listeners = {'kytos/core.connection.new':
105
                                 [self.new_connection]}
106
107
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
108
        #:
109
        #: The key is the napp name (string), while the value is the napp
110
        #: instance itself.
111 1
        self.napps = {}
112
        #: Object generated by ParseArgs on config.py file
113 1
        self.options = options
114
        #: KytosServer: Instance of KytosServer that will be listening to TCP
115
        #: connections.
116 1
        self.server = None
117
        #: dict: Current existing switches.
118
        #:
119
        #: The key is the switch dpid, while the value is a Switch object.
120 1
        self.switches = {}  # dpid: Switch()
121
122
        #: datetime.datetime: Time when the controller finished starting.
123 1
        self.started_at = None
124
125
        #: logging.Logger: Logger instance used by Kytos.
126 1
        self.log = None
127
128
        #: Observer that handle NApps when they are enabled or disabled.
129 1
        self.napp_dir_listener = NAppDirListener(self)
130
131 1
        self.napps_manager = NAppsManager(self)
132
133
        #: API Server used to expose rest endpoints.
134 1
        self.api_server = APIServer(__name__, self.options.listen,
135
                                    self.options.api_port,
136
                                    self.napps_manager, self.options.napps)
137
138 1
        self.auth = Auth(self)
139
140 1
        self._register_endpoints()
141
        #: Adding the napps 'enabled' directory into the PATH
142
        #: Now you can access the enabled napps with:
143
        #: from napps.<username>.<napp_name> import ?....
144 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
145 1
        sys.excepthook = exc_handler
146
147 1
    def enable_logs(self):
148
        """Register kytos log and enable the logs."""
149 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
150 1
        LogManager.enable_websocket(self.api_server.server)
151 1
        self.log = logging.getLogger(__name__)
152
153 1
    @staticmethod
154
    def loggers():
155
        """List all logging Loggers.
156
157
        Return a list of Logger objects, with name and logging level.
158
        """
159 1
        return [logging.getLogger(name)
160
                for name in logging.root.manager.loggerDict
161
                if "kytos" in name]
162
163 1
    def toggle_debug(self, name=None):
164
        """Enable/disable logging debug messages to a given logger name.
165
166
        If the name parameter is not specified the debug will be
167
        enabled/disabled following the initial config file. It will decide
168
        to enable/disable using the 'kytos' name to find the current
169
        logger level.
170
        Obs: To disable the debug the logging will be set to NOTSET
171
172
        Args:
173
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
174
        """
175 1
        if name and name not in logging.root.manager.loggerDict:
176
            # A Logger name that is not declared in logging will raise an error
177
            # otherwise logging would create a new Logger.
178 1
            raise ValueError(f"Invalid logger name: {name}")
179
180 1
        if not name:
181
            # Logger name not specified.
182 1
            level = logging.getLogger('kytos').getEffectiveLevel()
183 1
            enable_debug = level != logging.DEBUG
184
185
            # Enable/disable default Loggers
186 1
            LogManager.load_config_file(self.options.logging, enable_debug)
187 1
            return
188
189
        # Get effective logger level for the name
190 1
        level = logging.getLogger(name).getEffectiveLevel()
191 1
        logger = logging.getLogger(name)
192
193 1
        if level == logging.DEBUG:
194
            # disable debug
195 1
            logger.setLevel(logging.NOTSET)
196
        else:
197
            # enable debug
198 1
            logger.setLevel(logging.DEBUG)
199
200 1
    def start(self, restart=False):
201
        """Create pidfile and call start_controller method."""
202 1
        self.enable_logs()
203 1
        if not restart:
204 1
            self.create_pidfile()
205 1
        self.start_controller()
206
207 1
    def create_pidfile(self):
208
        """Create a pidfile."""
209 1
        pid = os.getpid()
210
211
        # Creates directory if it doesn't exist
212
        # System can erase /var/run's content
213 1
        pid_folder = Path(self.options.pidfile).parent
214 1
        self.log.info(pid_folder)
215
216
        # Pylint incorrectly infers Path objects
217
        # https://github.com/PyCQA/pylint/issues/224
218
        # pylint: disable=no-member
219 1
        if not pid_folder.exists():
220
            pid_folder.mkdir()
221
            pid_folder.chmod(0o1777)
222
        # pylint: enable=no-member
223
224
        # Make sure the file is deleted when controller stops
225 1
        atexit.register(Path(self.options.pidfile).unlink)
226
227
        # Checks if a pidfile exists. Creates a new file.
228 1
        try:
229 1
            pidfile = open(self.options.pidfile, mode='x')
230 1
        except OSError:
231
            # This happens if there is a pidfile already.
232
            # We shall check if the process that created the pidfile is still
233
            # running.
234 1
            try:
235 1
                existing_file = open(self.options.pidfile, mode='r')
236 1
                old_pid = int(existing_file.read())
237 1
                os.kill(old_pid, 0)
238
                # If kill() doesn't return an error, there's a process running
239
                # with the same PID. We assume it is Kytos and quit.
240
                # Otherwise, overwrite the file and proceed.
241
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
242
                             "running. Aborting.")
243
                sys.exit(error_msg.format(self.options.pidfile))
244 1
            except OSError:
245 1
                try:
246 1
                    pidfile = open(self.options.pidfile, mode='w')
247
                except OSError as exception:
248
                    error_msg = "Failed to create pidfile {}: {}."
249
                    sys.exit(error_msg.format(self.options.pidfile, exception))
250
251
        # Identifies the process that created the pidfile.
252 1
        pidfile.write(str(pid))
253 1
        pidfile.close()
254
255 1
    def start_controller(self):
256
        """Start the controller.
257
258
        Starts the KytosServer (TCP Server) coroutine.
259
        Starts a thread for each buffer handler.
260
        Load the installed apps.
261
        """
262 1
        self.log.info("Starting Kytos - Kytos Controller")
263 1
        self.server = KytosServer((self.options.listen,
264
                                   int(self.options.port)),
265
                                  KytosServerProtocol,
266
                                  self,
267
                                  self.options.protocol_name)
268
269 1
        self.log.info("Starting TCP server: %s", self.server)
270 1
        self.server.serve_forever()
271
272 1
        def _stop_loop(_):
273
            loop = asyncio.get_event_loop()
274
            # print(_.result())
275
            threads = threading.enumerate()
276
            self.log.debug("%s threads before loop.stop: %s",
277
                           len(threads), threads)
278
            loop.stop()
279
280 1
        async def _run_api_server_thread(executor):
281
            log = logging.getLogger('kytos.core.controller.api_server_thread')
282
            log.debug('starting')
283
            # log.debug('creating tasks')
284
            loop = asyncio.get_event_loop()
285
            blocking_tasks = [
286
                loop.run_in_executor(executor, self.api_server.run)
287
            ]
288
            # log.debug('waiting for tasks')
289
            completed, pending = await asyncio.wait(blocking_tasks)
290
            # results = [t.result() for t in completed]
291
            # log.debug('results: {!r}'.format(results))
292
            log.debug('completed: %d, pending: %d',
293
                      len(completed), len(pending))
294
295 1
        task = self._loop.create_task(self.raw_event_handler())
296 1
        task = self._loop.create_task(self.msg_in_event_handler())
297 1
        task = self._loop.create_task(self.msg_out_event_handler())
298 1
        task = self._loop.create_task(self.app_event_handler())
299 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
300 1
        task.add_done_callback(_stop_loop)
301
302 1
        self.log.info("ThreadPool started: %s", self._pool)
303
304
        # ASYNC TODO: ensure all threads started correctly
305
        # This is critical, if any of them failed starting we should exit.
306
        # sys.exit(error_msg.format(thread, exception))
307
308 1
        self.log.info("Loading Kytos NApps...")
309 1
        self.napp_dir_listener.start()
310 1
        self.pre_install_napps(self.options.napps_pre_installed)
311 1
        self.load_napps()
312
313 1
        self.started_at = now()
314
315 1
    def _register_endpoints(self):
316
        """Register all rest endpoint served by kytos.
317
318
        -   Register APIServer endpoints
319
        -   Register WebUI endpoints
320
        -   Register ``/api/kytos/core/config`` endpoint
321
        """
322 1
        self.api_server.start_api()
323
        # Register controller endpoints as /api/kytos/core/...
324 1
        self.api_server.register_core_endpoint('config/',
325
                                               self.configuration_endpoint)
326 1
        self.api_server.register_core_endpoint('metadata/',
327
                                               Controller.metadata_endpoint)
328 1
        self.api_server.register_core_endpoint(
329
            'reload/<username>/<napp_name>/',
330
            self.rest_reload_napp)
331 1
        self.api_server.register_core_endpoint('reload/all',
332
                                               self.rest_reload_all_napps)
333 1
        self.auth.register_core_auth_services()
334
335 1
    def register_rest_endpoint(self, url, function, methods):
336
        """Deprecate in favor of @rest decorator."""
337 1
        self.api_server.register_rest_endpoint(url, function, methods)
338
339 1
    @classmethod
340
    def metadata_endpoint(cls):
341
        """Return the Kytos metadata.
342
343
        Returns:
344
            string: Json with current kytos metadata.
345
346
        """
347 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
348 1
        meta_file = open(meta_path).read()
349 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
350 1
        return json.dumps(metadata)
351
352 1
    def configuration_endpoint(self):
353
        """Return the configuration options used by Kytos.
354
355
        Returns:
356
            string: Json with current configurations used by kytos.
357
358
        """
359 1
        return json.dumps(self.options.__dict__)
360
361 1
    def restart(self, graceful=True):
362
        """Restart Kytos SDN Controller.
363
364
        Args:
365
            graceful(bool): Represents the way that Kytos will restart.
366
        """
367 1
        if self.started_at is not None:
368 1
            self.stop(graceful)
369 1
            self.__init__(self.options)
370
371 1
        self.start(restart=True)
372
373 1
    def stop(self, graceful=True):
374
        """Shutdown all services used by kytos.
375
376
        This method should:
377
            - stop all Websockets
378
            - stop the API Server
379
            - stop the Controller
380
        """
381 1
        if self.started_at:
382 1
            self.stop_controller(graceful)
383
384 1
    def stop_controller(self, graceful=True):
385
        """Stop the controller.
386
387
        This method should:
388
            - announce on the network that the controller will shutdown;
389
            - stop receiving incoming packages;
390
            - call the 'shutdown' method of each KytosNApp that is running;
391
            - finish reading the events on all buffers;
392
            - stop each running handler;
393
            - stop all running threads;
394
            - stop the KytosServer;
395
        """
396 1
        self.log.info("Stopping Kytos")
397
398 1
        self.buffers.send_stop_signal()
399 1
        self.api_server.stop_api_server()
400 1
        self.napp_dir_listener.stop()
401
402 1
        self.log.info("Stopping threadpool: %s", self._pool)
403
404 1
        threads = threading.enumerate()
405 1
        self.log.debug("%s threads before threadpool shutdown: %s",
406
                       len(threads), threads)
407
408 1
        self._pool.shutdown(wait=graceful)
409
410
        # self.server.socket.shutdown()
411
        # self.server.socket.close()
412
413
        # for thread in self._threads.values():
414
        #     self.log.info("Stopping thread: %s", thread.name)
415
        #     thread.join()
416
417
        # for thread in self._threads.values():
418
        #     while thread.is_alive():
419
        #         self.log.info("Thread is alive: %s", thread.name)
420
        #         pass
421
422 1
        self.started_at = None
423 1
        self.unload_napps()
424 1
        self.buffers = KytosBuffers()
425
426
        # ASYNC TODO: close connections
427
        # self.server.server_close()
428
429
        # Shutdown the TCP server and the main asyncio loop
430 1
        self.server.shutdown()
431
432 1
    def status(self):
433
        """Return status of Kytos Server.
434
435
        If the controller kytos is running this method will be returned
436
        "Running since 'Started_At'", otherwise "Stopped".
437
438
        Returns:
439
            string: String with kytos status.
440
441
        """
442 1
        if self.started_at:
443 1
            return "Running since %s" % self.started_at
444 1
        return "Stopped"
445
446 1
    def uptime(self):
447
        """Return the uptime of kytos server.
448
449
        This method should return:
450
            - 0 if Kytos Server is stopped.
451
            - (kytos.start_at - datetime.now) if Kytos Server is running.
452
453
        Returns:
454
           datetime.timedelta: The uptime interval.
455
456
        """
457 1
        return now() - self.started_at if self.started_at else 0
458
459 1
    def notify_listeners(self, event):
460
        """Send the event to the specified listeners.
461
462
        Loops over self.events_listeners matching (by regexp) the attribute
463
        name of the event with the keys of events_listeners. If a match occurs,
464
        then send the event to each registered listener.
465
466
        Args:
467
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
468
        """
469 1
        self.log.debug("looking for listeners for %s", event)
470 1
        for event_regex, listeners in dict(self.events_listeners).items():
471
            # self.log.debug("listeners found for %s: %r => %s", event,
472
            #                event_regex, [l.__qualname__ for l in listeners])
473
            # Do not match if the event has more characters
474
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
475 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
476 1
                event_regex += '$'
477 1
            if re.match(event_regex, event.name):
478
                # self.log.debug('Calling listeners for %s', event)
479 1
                for listener in listeners:
480 1
                    listener(event)
481
482 1
    async def raw_event_handler(self):
483
        """Handle raw events.
484
485
        Listen to the raw_buffer and send all its events to the
486
        corresponding listeners.
487
        """
488 1
        self.log.info("Raw Event Handler started")
489 1
        while True:
490 1
            event = await self.buffers.raw.aget()
491 1
            self.notify_listeners(event)
492 1
            self.log.debug("Raw Event handler called")
493
494 1
            if event.name == "kytos/core.shutdown":
495 1
                self.log.debug("Raw Event handler stopped")
496 1
                break
497
498 1
    async def msg_in_event_handler(self):
499
        """Handle msg_in events.
500
501
        Listen to the msg_in buffer and send all its events to the
502
        corresponding listeners.
503
        """
504 1
        self.log.info("Message In Event Handler started")
505 1
        while True:
506 1
            event = await self.buffers.msg_in.aget()
507 1
            self.notify_listeners(event)
508 1
            self.log.debug("Message In Event handler called")
509
510 1
            if event.name == "kytos/core.shutdown":
511 1
                self.log.debug("Message In Event handler stopped")
512 1
                break
513
514 1
    async def msg_out_event_handler(self):
515
        """Handle msg_out events.
516
517
        Listen to the msg_out buffer and send all its events to the
518
        corresponding listeners.
519
        """
520 1
        self.log.info("Message Out Event Handler started")
521 1
        while True:
522 1
            triggered_event = await self.buffers.msg_out.aget()
523
524 1
            if triggered_event.name == "kytos/core.shutdown":
525 1
                self.log.debug("Message Out Event handler stopped")
526 1
                break
527
528 1
            message = triggered_event.content['message']
529 1
            destination = triggered_event.destination
530 1
            if (destination and
531
                    not destination.state == ConnectionState.FINISHED):
532 1
                packet = message.pack()
533 1
                destination.send(packet)
534 1
                self.log.debug('Connection %s: OUT OFP, '
535
                               'version: %s, type: %s, xid: %s - %s',
536
                               destination.id,
537
                               message.header.version,
538
                               message.header.message_type,
539
                               message.header.xid,
540
                               packet.hex())
541 1
                self.notify_listeners(triggered_event)
542 1
                self.log.debug("Message Out Event handler called")
543
            else:
544
                self.log.info("connection closed. Cannot send message")
545
546 1
    async def app_event_handler(self):
547
        """Handle app events.
548
549
        Listen to the app buffer and send all its events to the
550
        corresponding listeners.
551
        """
552 1
        self.log.info("App Event Handler started")
553 1
        while True:
554 1
            event = await self.buffers.app.aget()
555 1
            self.notify_listeners(event)
556 1
            self.log.debug("App Event handler called")
557
558 1
            if event.name == "kytos/core.shutdown":
559 1
                self.log.debug("App Event handler stopped")
560 1
                break
561
562 1
    def get_interface_by_id(self, interface_id):
563
        """Find a Interface  with interface_id.
564
565
        Args:
566
            interface_id(str): Interface Identifier.
567
568
        Returns:
569
            Interface: Instance of Interface with the id given.
570
571
        """
572 1
        if interface_id is None:
573 1
            return None
574
575 1
        switch_id = ":".join(interface_id.split(":")[:-1])
576 1
        interface_number = int(interface_id.split(":")[-1])
577
578 1
        switch = self.switches.get(switch_id)
579
580 1
        if not switch:
581 1
            return None
582
583 1
        return switch.interfaces.get(interface_number, None)
584
585 1
    def get_switch_by_dpid(self, dpid):
586
        """Return a specific switch by dpid.
587
588
        Args:
589
            dpid (|DPID|): dpid object used to identify a switch.
590
591
        Returns:
592
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
593
594
        """
595 1
        return self.switches.get(dpid)
596
597 1
    def get_switch_or_create(self, dpid, connection):
598
        """Return switch or create it if necessary.
599
600
        Args:
601
            dpid (|DPID|): dpid object used to identify a switch.
602
            connection (:class:`~kytos.core.connection.Connection`):
603
                connection used by switch. If a switch has a connection that
604
                will be updated.
605
606
        Returns:
607
            :class:`~kytos.core.switch.Switch`: new or existent switch.
608
609
        """
610 1
        self.create_or_update_connection(connection)
611 1
        switch = self.get_switch_by_dpid(dpid)
612 1
        event_name = 'kytos/core.switch.'
613
614 1
        if switch is None:
615 1
            switch = Switch(dpid=dpid)
616 1
            self.add_new_switch(switch)
617 1
            event_name += 'new'
618
        else:
619 1
            event_name += 'reconnected'
620
621 1
        self.set_switch_options(dpid=dpid)
622 1
        event = KytosEvent(name=event_name, content={'switch': switch})
623
624 1
        old_connection = switch.connection
625 1
        switch.update_connection(connection)
626
627 1
        if old_connection is not connection:
628 1
            self.remove_connection(old_connection)
629
630 1
        self.buffers.app.put(event)
631
632 1
        return switch
633
634 1
    def set_switch_options(self, dpid):
635
        """Update the switch settings based on kytos.conf options.
636
637
        Args:
638
            dpid (str): dpid used to identify a switch.
639
640
        """
641 1
        switch = self.switches.get(dpid)
642 1
        if not switch:
643
            return
644
645 1
        vlan_pool = {}
646 1
        vlan_pool = self.options.vlan_pool
647 1
        if not vlan_pool:
648 1
            return
649
650 1
        if vlan_pool.get(dpid):
651 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
652 1
            for intf_num, port_list in vlan_pool[dpid].items():
653 1
                if not switch.interfaces.get((intf_num)):
654 1
                    vlan_ids = set()
655 1
                    for vlan_range in port_list:
656 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
657 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
658 1
                            vlan_ids.add(vlan_id)
659 1
                    intf_num = int(intf_num)
660 1
                    intf = Interface(name=intf_num, port_number=intf_num,
661
                                     switch=switch)
662 1
                    intf.set_available_tags(vlan_ids)
663 1
                    switch.update_interface(intf)
664
665 1
    def create_or_update_connection(self, connection):
666
        """Update a connection.
667
668
        Args:
669
            connection (:class:`~kytos.core.connection.Connection`):
670
                Instance of connection that will be updated.
671
        """
672 1
        self.connections[connection.id] = connection
673
674 1
    def get_connection_by_id(self, conn_id):
675
        """Return a existent connection by id.
676
677
        Args:
678
            id (int): id from a connection.
679
680
        Returns:
681
            :class:`~kytos.core.connection.Connection`: Instance of connection
682
                or None Type.
683
684
        """
685 1
        return self.connections.get(conn_id)
686
687 1
    def remove_connection(self, connection):
688
        """Close a existent connection and remove it.
689
690
        Args:
691
            connection (:class:`~kytos.core.connection.Connection`):
692
                Instance of connection that will be removed.
693
        """
694 1
        if connection is None:
695 1
            return False
696
697 1
        try:
698 1
            connection.close()
699 1
            del self.connections[connection.id]
700 1
        except KeyError:
701 1
            return False
702 1
        return True
703
704 1
    def remove_switch(self, switch):
705
        """Remove an existent switch.
706
707
        Args:
708
            switch (:class:`~kytos.core.switch.Switch`):
709
                Instance of switch that will be removed.
710
        """
711 1
        try:
712 1
            del self.switches[switch.dpid]
713 1
        except KeyError:
714 1
            return False
715 1
        return True
716
717 1
    def new_connection(self, event):
718
        """Handle a kytos/core.connection.new event.
719
720
        This method will read new connection event and store the connection
721
        (socket) into the connections attribute on the controller.
722
723
        It also clear all references to the connection since it is a new
724
        connection on the same ip:port.
725
726
        Args:
727
            event (~kytos.core.KytosEvent):
728
                The received event (``kytos/core.connection.new``) with the
729
                needed info.
730
        """
731 1
        self.log.info("Handling %s...", event)
732
733 1
        connection = event.source
734 1
        self.log.debug("Event source: %s", event.source)
735
736
        # Remove old connection (aka cleanup) if it exists
737 1
        if self.get_connection_by_id(connection.id):
738
            self.remove_connection(connection.id)
739
740
        # Update connections with the new connection
741 1
        self.create_or_update_connection(connection)
742
743 1
    def add_new_switch(self, switch):
744
        """Add a new switch on the controller.
745
746
        Args:
747
            switch (Switch): A Switch object
748
        """
749 1
        self.switches[switch.dpid] = switch
750
751 1
    def _import_napp(self, username, napp_name):
752
        """Import a NApp module.
753
754
        Raises:
755
            FileNotFoundError: if NApp's main.py is not found.
756
            ModuleNotFoundError: if any NApp requirement is not installed.
757
758
        """
759 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
760 1
        path = os.path.join(self.options.napps, username, napp_name,
761
                            'main.py')
762 1
        napp_spec = spec_from_file_location(mod_name, path)
763 1
        napp_module = module_from_spec(napp_spec)
764 1
        sys.modules[napp_spec.name] = napp_module
765 1
        napp_spec.loader.exec_module(napp_module)
766 1
        return napp_module
767
768 1
    def load_napp(self, username, napp_name):
769
        """Load a single NApp.
770
771
        Args:
772
            username (str): NApp username (makes up NApp's path).
773
            napp_name (str): Name of the NApp to be loaded.
774
        """
775 1
        if (username, napp_name) in self.napps:
776 1
            message = 'NApp %s/%s was already loaded'
777 1
            self.log.warning(message, username, napp_name)
778 1
            return
779
780 1
        try:
781 1
            napp_module = self._import_napp(username, napp_name)
782 1
        except ModuleNotFoundError as err:
783 1
            self.log.error("Error loading NApp '%s/%s': %s",
784
                           username, napp_name, err)
785 1
            return
786 1
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
787 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
788 1
            self.log.warning(msg, err.filename)
789 1
            return
790
791 1
        try:
792 1
            napp = napp_module.Main(controller=self)
793 1
        except:  # noqa pylint: disable=bare-except
794 1
            self.log.critical("NApp initialization failed: %s/%s",
795
                              username, napp_name, exc_info=True)
796 1
            return
797
798 1
        self.napps[(username, napp_name)] = napp
799
800 1
        napp.start()
801 1
        self.api_server.authenticate_endpoints(napp)
802 1
        self.api_server.register_napp_endpoints(napp)
803
804
        # pylint: disable=protected-access
805 1
        for event, listeners in napp._listeners.items():
806
            self.events_listeners.setdefault(event, []).extend(listeners)
807
        # pylint: enable=protected-access
808
809 1
    def pre_install_napps(self, napps, enable=True):
810
        """Pre install and enable NApps.
811
812
        Before installing, it'll check if it's installed yet.
813
814
        Args:
815
            napps ([str]): List of NApps to be pre-installed and enabled.
816
        """
817 1
        all_napps = self.napps_manager.get_installed_napps()
818 1
        installed = [str(napp) for napp in all_napps]
819 1
        napps_diff = [napp for napp in napps if napp not in installed]
820 1
        for napp in napps_diff:
821 1
            self.napps_manager.install(napp, enable=enable)
822
823 1
    def load_napps(self):
824
        """Load all NApps enabled on the NApps dir."""
825 1
        for napp in self.napps_manager.get_enabled_napps():
826 1
            try:
827 1
                self.log.info("Loading NApp %s", napp.id)
828 1
                self.load_napp(napp.username, napp.name)
829
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
830
                self.log.error("Could not load NApp %s: %s",
831
                               napp.id, exception)
832
833 1
    def unload_napp(self, username, napp_name):
834
        """Unload a specific NApp.
835
836
        Args:
837
            username (str): NApp username.
838
            napp_name (str): Name of the NApp to be unloaded.
839
        """
840 1
        napp = self.napps.pop((username, napp_name), None)
841
842 1
        if napp is None:
843
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
844
        else:
845 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
846 1
            napp_id = NApp(username, napp_name).id
847 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
848 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
849
            # Call listener before removing it from events_listeners
850 1
            napp_shutdown_fn(event)
851
852
            # Remove rest endpoints from that napp
853 1
            self.api_server.remove_napp_endpoints(napp)
854
855
            # Removing listeners from that napp
856
            # pylint: disable=protected-access
857 1
            for event_type, napp_listeners in napp._listeners.items():
858
                event_listeners = self.events_listeners[event_type]
859
                for listener in napp_listeners:
860
                    event_listeners.remove(listener)
861
                if not event_listeners:
862
                    del self.events_listeners[event_type]
863
            # pylint: enable=protected-access
864
865 1
    def unload_napps(self):
866
        """Unload all loaded NApps that are not core NApps."""
867
        # list() is used here to avoid the error:
868
        # 'RuntimeError: dictionary changed size during iteration'
869
        # This is caused by looping over an dictionary while removing
870
        # items from it.
871
        for (username, napp_name) in list(self.napps.keys()):  # noqa
872
            self.unload_napp(username, napp_name)
873
874 1
    def reload_napp_module(self, username, napp_name, napp_file):
875
        """Reload a NApp Module."""
876 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
877 1
        try:
878 1
            napp_module = import_module(mod_name)
879 1
        except ModuleNotFoundError as err:
880 1
            self.log.error("Module '%s' not found", mod_name)
881 1
            raise
882 1
        try:
883 1
            napp_module = reload_module(napp_module)
884 1
        except ImportError as err:
885 1
            self.log.error("Error reloading NApp '%s/%s': %s",
886
                           username, napp_name, err)
887 1
            raise
888
889 1
    def reload_napp(self, username, napp_name):
890
        """Reload a NApp."""
891 1
        self.unload_napp(username, napp_name)
892 1
        try:
893 1
            self.reload_napp_module(username, napp_name, 'settings')
894 1
            self.reload_napp_module(username, napp_name, 'main')
895 1
        except (ModuleNotFoundError, ImportError):
896 1
            return 400
897 1
        self.log.info("NApp '%s/%s' successfully reloaded",
898
                      username, napp_name)
899 1
        self.load_napp(username, napp_name)
900 1
        return 200
901
902 1
    def rest_reload_napp(self, username, napp_name):
903
        """Request reload a NApp."""
904 1
        res = self.reload_napp(username, napp_name)
905 1
        return 'reloaded', res
906
907 1
    def rest_reload_all_napps(self):
908
        """Request reload all NApps."""
909 1
        for napp in self.napps:
910 1
            self.reload_napp(*napp)
911
        return 'reloaded', 200
912