Passed
Pull Request — master (#957)
by Rogerio
02:52
created

kytos.core.controller.Controller.loggers()   A

Complexity

Conditions 1

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 5
nop 0
dl 0
loc 9
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43 1
from kytos.core.switch import Switch
44
45 1
__all__ = ('Controller',)
46
47
48 1
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61 1
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67
        """
68 1
        if options is None:
69
            options = KytosConfig().options['daemon']
70
71 1
        self._loop = loop or asyncio.get_event_loop()
72 1
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74
        #: dict: keep the main threads of the controller (buffers and handler)
75 1
        self._threads = {}
76
        #: KytosBuffers: KytosBuffer object with Controller buffers
77 1
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82
        #: is another dict with the connection information.
83 1
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88
        #: methods that will receive the referenced event
89 1
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95
        #: instance itself.
96 1
        self.napps = {}
97
        #: Object generated by ParseArgs on config.py file
98 1
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100
        #: connections.
101 1
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104
        #: The key is the switch dpid, while the value is a Switch object.
105 1
        self.switches = {}  # dpid: Switch()
106
107
        #: datetime.datetime: Time when the controller finished starting.
108 1
        self.started_at = None
109
110
        #: logging.Logger: Logger instance used by Kytos.
111 1
        self.log = None
112
113
        #: Observer that handle NApps when they are enabled or disabled.
114 1
        self.napp_dir_listener = NAppDirListener(self)
115
116 1
        self.napps_manager = NAppsManager(self)
117
118
        #: API Server used to expose rest endpoints.
119 1
        self.api_server = APIServer(__name__, self.options.listen,
120
                                    self.options.api_port,
121
                                    self.napps_manager, self.options.napps)
122
123 1
        self._register_endpoints()
124
        #: Adding the napps 'enabled' directory into the PATH
125
        #: Now you can access the enabled napps with:
126
        #: from napps.<username>.<napp_name> import ?....
127 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
128
129 1
    def enable_logs(self):
130
        """Register kytos log and enable the logs."""
131 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
132 1
        LogManager.enable_websocket(self.api_server.server)
133 1
        self.log = logging.getLogger(__name__)
134
135 1
    @staticmethod
136
    def loggers():
137
        """List all logging Loggers.
138
139
        Return a list of Logger objects, with name and logging level.
140
        """
141 1
        return [logging.getLogger(name)
142
                for name in logging.root.manager.loggerDict
143
                if "kytos" in name]
144
145 1
    def toggle_debug(self, name=None):
146
        """Enable/disable logging debug messages to a given logger name.
147
148
        If the name parameter is not specified the debug will be
149
        enabled/disabled following the initial config file. It will decide
150
        to enable/disable using the 'kytos' name to find the current
151
        logger level.
152
        Obs: To disable the debug the logging will be set to NOTSET
153
154
        Args:
155
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
156
        """
157 1
        if name and name not in logging.root.manager.loggerDict:
158
            # A Logger name that is not declared in logging will raise an error
159
            # otherwise logging would create a new Logger.
160 1
            raise ValueError("Invalid logger name.")
161
162 1
        if not name:
163
            # Logger name not specified.
164 1
            level = logging.getLogger('kytos').getEffectiveLevel()
165 1
            enable_debug = level != logging.DEBUG
166
167
            # Enable/disable default Loggers
168 1
            LogManager.load_config_file(self.options.logging, enable_debug)
169 1
            return
170
171
        # Get effective logger level for the name
172 1
        level = logging.getLogger(name).getEffectiveLevel()
173 1
        logger = logging.getLogger(name)
174
175 1
        if level == logging.DEBUG:
176
            # disable debug
177 1
            logger.setLevel(logging.NOTSET)
178
        else:
179
            # enable debug
180 1
            logger.setLevel(logging.DEBUG)
181
182 1
    def start(self, restart=False):
183
        """Create pidfile and call start_controller method."""
184
        self.enable_logs()
185
        if not restart:
186
            self.create_pidfile()
187
        self.start_controller()
188
189 1
    def create_pidfile(self):
190
        """Create a pidfile."""
191
        pid = os.getpid()
192
193
        # Creates directory if it doesn't exist
194
        # System can erase /var/run's content
195
        pid_folder = Path(self.options.pidfile).parent
196
        self.log.info(pid_folder)
197
198
        # Pylint incorrectly infers Path objects
199
        # https://github.com/PyCQA/pylint/issues/224
200
        # pylint: disable=no-member
201
        if not pid_folder.exists():
202
            pid_folder.mkdir()
203
            pid_folder.chmod(0o1777)
204
        # pylint: enable=no-member
205
206
        # Make sure the file is deleted when controller stops
207
        atexit.register(Path(self.options.pidfile).unlink)
208
209
        # Checks if a pidfile exists. Creates a new file.
210
        try:
211
            pidfile = open(self.options.pidfile, mode='x')
212
        except OSError:
213
            # This happens if there is a pidfile already.
214
            # We shall check if the process that created the pidfile is still
215
            # running.
216
            try:
217
                existing_file = open(self.options.pidfile, mode='r')
218
                old_pid = int(existing_file.read())
219
                os.kill(old_pid, 0)
220
                # If kill() doesn't return an error, there's a process running
221
                # with the same PID. We assume it is Kytos and quit.
222
                # Otherwise, overwrite the file and proceed.
223
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
224
                             "running. Aborting.")
225
                sys.exit(error_msg.format(self.options.pidfile))
226
            except OSError:
227
                try:
228
                    pidfile = open(self.options.pidfile, mode='w')
229
                except OSError as exception:
230
                    error_msg = "Failed to create pidfile {}: {}."
231
                    sys.exit(error_msg.format(self.options.pidfile, exception))
232
233
        # Identifies the process that created the pidfile.
234
        pidfile.write(str(pid))
235
        pidfile.close()
236
237 1
    def start_controller(self):
238
        """Start the controller.
239
240
        Starts the KytosServer (TCP Server) coroutine.
241
        Starts a thread for each buffer handler.
242
        Load the installed apps.
243
        """
244
        self.log.info("Starting Kytos - Kytos Controller")
245
        self.server = KytosServer((self.options.listen,
246
                                   int(self.options.port)),
247
                                  KytosServerProtocol,
248
                                  self,
249
                                  self.options.protocol_name)
250
251
        self.log.info("Starting TCP server: %s", self.server)
252
        self.server.serve_forever()
253
254
        def _stop_loop(_):
255
            loop = asyncio.get_event_loop()
256
            # print(_.result())
257
            threads = threading.enumerate()
258
            self.log.debug("%s threads before loop.stop: %s",
259
                           len(threads), threads)
260
            loop.stop()
261
262
        async def _run_api_server_thread(executor):
263
            log = logging.getLogger('kytos.core.controller.api_server_thread')
264
            log.debug('starting')
265
            # log.debug('creating tasks')
266
            loop = asyncio.get_event_loop()
267
            blocking_tasks = [
268
                loop.run_in_executor(executor, self.api_server.run)
269
            ]
270
            # log.debug('waiting for tasks')
271
            completed, pending = await asyncio.wait(blocking_tasks)
272
            # results = [t.result() for t in completed]
273
            # log.debug('results: {!r}'.format(results))
274
            log.debug('completed: %d, pending: %d',
275
                      len(completed), len(pending))
276
277
        task = self._loop.create_task(self.raw_event_handler())
278
        task = self._loop.create_task(self.msg_in_event_handler())
279
        task = self._loop.create_task(self.msg_out_event_handler())
280
        task = self._loop.create_task(self.app_event_handler())
281
        task = self._loop.create_task(_run_api_server_thread(self._pool))
282
        task.add_done_callback(_stop_loop)
283
284
        self.log.info("ThreadPool started: %s", self._pool)
285
286
        # ASYNC TODO: ensure all threads started correctly
287
        # This is critical, if any of them failed starting we should exit.
288
        # sys.exit(error_msg.format(thread, exception))
289
290
        self.log.info("Loading Kytos NApps...")
291
        self.napp_dir_listener.start()
292
        self.pre_install_napps(self.options.napps_pre_installed)
293
        self.load_napps()
294
295
        self.started_at = now()
296
297 1
    def _register_endpoints(self):
298
        """Register all rest endpoint served by kytos.
299
300
        -   Register APIServer endpoints
301
        -   Register WebUI endpoints
302
        -   Register ``/api/kytos/core/config`` endpoint
303
        """
304 1
        self.api_server.start_api()
305
        # Register controller endpoints as /api/kytos/core/...
306 1
        self.api_server.register_core_endpoint('config/',
307
                                               self.configuration_endpoint)
308
309 1
        self.api_server.register_core_endpoint(
310
            'reload/<username>/<napp_name>/',
311
            self.rest_reload_napp)
312 1
        self.api_server.register_core_endpoint('reload/all',
313
                                               self.rest_reload_all_napps)
314
315 1
    def register_rest_endpoint(self, url, function, methods):
316
        """Deprecate in favor of @rest decorator."""
317 1
        self.api_server.register_rest_endpoint(url, function, methods)
318
319 1
    def configuration_endpoint(self):
320
        """Return the configuration options used by Kytos.
321
322
        Returns:
323
            string: Json with current configurations used by kytos.
324
325
        """
326 1
        return json.dumps(self.options.__dict__)
327
328 1
    def restart(self, graceful=True):
329
        """Restart Kytos SDN Controller.
330
331
        Args:
332
            graceful(bool): Represents the way that Kytos will restart.
333
        """
334
        if self.started_at is not None:
335
            self.stop(graceful)
336
            self.__init__(self.options)
337
338
        self.start(restart=True)
339
340 1
    def stop(self, graceful=True):
341
        """Shutdown all services used by kytos.
342
343
        This method should:
344
            - stop all Websockets
345
            - stop the API Server
346
            - stop the Controller
347
        """
348
        if self.started_at:
349
            self.stop_controller(graceful)
350
351 1
    def stop_controller(self, graceful=True):
352
        """Stop the controller.
353
354
        This method should:
355
            - announce on the network that the controller will shutdown;
356
            - stop receiving incoming packages;
357
            - call the 'shutdown' method of each KytosNApp that is running;
358
            - finish reading the events on all buffers;
359
            - stop each running handler;
360
            - stop all running threads;
361
            - stop the KytosServer;
362
        """
363
        self.log.info("Stopping Kytos")
364
365
        self.buffers.send_stop_signal()
366
        self.api_server.stop_api_server()
367
        self.napp_dir_listener.stop()
368
369
        self.log.info("Stopping threadpool: %s", self._pool)
370
371
        threads = threading.enumerate()
372
        self.log.debug("%s threads before threadpool shutdown: %s",
373
                       len(threads), threads)
374
375
        self._pool.shutdown(wait=graceful)
376
377
        # self.server.socket.shutdown()
378
        # self.server.socket.close()
379
380
        # for thread in self._threads.values():
381
        #     self.log.info("Stopping thread: %s", thread.name)
382
        #     thread.join()
383
384
        # for thread in self._threads.values():
385
        #     while thread.is_alive():
386
        #         self.log.info("Thread is alive: %s", thread.name)
387
        #         pass
388
389
        self.started_at = None
390
        self.unload_napps()
391
        self.buffers = KytosBuffers()
392
393
        # ASYNC TODO: close connections
394
        # self.server.server_close()
395
396
        # Shutdown the TCP server and the main asyncio loop
397
        self.server.shutdown()
398
399 1
    def status(self):
400
        """Return status of Kytos Server.
401
402
        If the controller kytos is running this method will be returned
403
        "Running since 'Started_At'", otherwise "Stopped".
404
405
        Returns:
406
            string: String with kytos status.
407
408
        """
409
        if self.started_at:
410
            return "Running since %s" % self.started_at
411
        return "Stopped"
412
413 1
    def uptime(self):
414
        """Return the uptime of kytos server.
415
416
        This method should return:
417
            - 0 if Kytos Server is stopped.
418
            - (kytos.start_at - datetime.now) if Kytos Server is running.
419
420
        Returns:
421
           datetime.timedelta: The uptime interval.
422
423
        """
424
        return now() - self.started_at if self.started_at else 0
425
426 1
    def notify_listeners(self, event):
427
        """Send the event to the specified listeners.
428
429
        Loops over self.events_listeners matching (by regexp) the attribute
430
        name of the event with the keys of events_listeners. If a match occurs,
431
        then send the event to each registered listener.
432
433
        Args:
434
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
435
        """
436
        self.log.debug("looking for listeners for %s", event)
437
        for event_regex, listeners in dict(self.events_listeners).items():
438
            # self.log.debug("listeners found for %s: %r => %s", event,
439
            #                event_regex, [l.__qualname__ for l in listeners])
440
            # Do not match if the event has more characters
441
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
442
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
443
                event_regex += '$'
444
            if re.match(event_regex, event.name):
445
                # self.log.debug('Calling listeners for %s', event)
446
                for listener in listeners:
447
                    listener(event)
448
449 1
    async def raw_event_handler(self):
450
        """Handle raw events.
451
452
        Listen to the raw_buffer and send all its events to the
453
        corresponding listeners.
454
        """
455
        self.log.info("Raw Event Handler started")
456
        while True:
457
            event = await self.buffers.raw.aget()
458
            self.notify_listeners(event)
459
            self.log.debug("Raw Event handler called")
460
461
            if event.name == "kytos/core.shutdown":
462
                self.log.debug("Raw Event handler stopped")
463
                break
464
465 1
    async def msg_in_event_handler(self):
466
        """Handle msg_in events.
467
468
        Listen to the msg_in buffer and send all its events to the
469
        corresponding listeners.
470
        """
471
        self.log.info("Message In Event Handler started")
472
        while True:
473
            event = await self.buffers.msg_in.aget()
474
            self.notify_listeners(event)
475
            self.log.debug("Message In Event handler called")
476
477
            if event.name == "kytos/core.shutdown":
478
                self.log.debug("Message In Event handler stopped")
479
                break
480
481 1
    async def msg_out_event_handler(self):
482
        """Handle msg_out events.
483
484
        Listen to the msg_out buffer and send all its events to the
485
        corresponding listeners.
486
        """
487
        self.log.info("Message Out Event Handler started")
488
        while True:
489
            triggered_event = await self.buffers.msg_out.aget()
490
491
            if triggered_event.name == "kytos/core.shutdown":
492
                self.log.debug("Message Out Event handler stopped")
493
                break
494
495
            message = triggered_event.content['message']
496
            destination = triggered_event.destination
497
            if (destination and
498
                    not destination.state == ConnectionState.FINISHED):
499
                packet = message.pack()
500
                destination.send(packet)
501
                self.log.debug('Connection %s: OUT OFP, '
502
                               'version: %s, type: %s, xid: %s - %s',
503
                               destination.id,
504
                               message.header.version,
505
                               message.header.message_type,
506
                               message.header.xid,
507
                               packet.hex())
508
                self.notify_listeners(triggered_event)
509
                self.log.debug("Message Out Event handler called")
510
            else:
511
                self.log.info("connection closed. Cannot send message")
512
513 1
    async def app_event_handler(self):
514
        """Handle app events.
515
516
        Listen to the app buffer and send all its events to the
517
        corresponding listeners.
518
        """
519
        self.log.info("App Event Handler started")
520
        while True:
521
            event = await self.buffers.app.aget()
522
            self.notify_listeners(event)
523
            self.log.debug("App Event handler called")
524
525
            if event.name == "kytos/core.shutdown":
526
                self.log.debug("App Event handler stopped")
527
                break
528
529 1
    def get_interface_by_id(self, interface_id):
530
        """Find a Interface  with interface_id.
531
532
        Args:
533
            interface_id(str): Interface Identifier.
534
535
        Returns:
536
            Interface: Instance of Interface with the id given.
537
538
        """
539
        if interface_id is None:
540
            return None
541
542
        switch_id = ":".join(interface_id.split(":")[:-1])
543
        interface_number = int(interface_id.split(":")[-1])
544
545
        switch = self.switches.get(switch_id)
546
547
        if not switch:
548
            return None
549
550
        return switch.interfaces.get(interface_number, None)
551
552 1
    def get_switch_by_dpid(self, dpid):
553
        """Return a specific switch by dpid.
554
555
        Args:
556
            dpid (|DPID|): dpid object used to identify a switch.
557
558
        Returns:
559
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
560
561
        """
562 1
        return self.switches.get(dpid)
563
564 1
    def get_switch_or_create(self, dpid, connection):
565
        """Return switch or create it if necessary.
566
567
        Args:
568
            dpid (|DPID|): dpid object used to identify a switch.
569
            connection (:class:`~kytos.core.connection.Connection`):
570
                connection used by switch. If a switch has a connection that
571
                will be updated.
572
573
        Returns:
574
            :class:`~kytos.core.switch.Switch`: new or existent switch.
575
576
        """
577 1
        self.create_or_update_connection(connection)
578 1
        switch = self.get_switch_by_dpid(dpid)
579 1
        event_name = 'kytos/core.switch.'
580
581 1
        if switch is None:
582
            switch = Switch(dpid=dpid)
583
            self.add_new_switch(switch)
584
            event_name += 'new'
585
        else:
586 1
            event_name += 'reconnected'
587
588 1
        self.set_switch_options(dpid=dpid)
589 1
        event = KytosEvent(name=event_name, content={'switch': switch})
590
591 1
        old_connection = switch.connection
592 1
        switch.update_connection(connection)
593
594 1
        if old_connection is not connection:
595
            self.remove_connection(old_connection)
596
597 1
        self.buffers.app.put(event)
598
599 1
        return switch
600
601 1
    def set_switch_options(self, dpid):
602
        """Update the switch settings based on kytos.conf options.
603
604
        Args:
605
            dpid (str): dpid used to identify a switch.
606
607
        """
608 1
        switch = self.switches.get(dpid)
609 1
        if not switch:
610
            return
611
612 1
        vlan_pool = {}
613 1
        try:
614 1
            vlan_pool = json.loads(self.options.vlan_pool)
615 1
            if not vlan_pool:
616
                return
617
        except (TypeError, json.JSONDecodeError) as err:
618
            self.log.error("Invalid vlan_pool settings: %s", err)
619
620 1
        if vlan_pool.get(dpid):
621 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
622 1
            for intf_num, port_list in vlan_pool[dpid].items():
623 1
                if not switch.interfaces.get((intf_num)):
624 1
                    vlan_ids = set()
625 1
                    for vlan_range in port_list:
626 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
627 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
628 1
                            vlan_ids.add(vlan_id)
629 1
                    intf_num = int(intf_num)
630 1
                    intf = Interface(name=intf_num, port_number=intf_num,
631
                                     switch=switch)
632 1
                    intf.set_available_tags(vlan_ids)
633 1
                    switch.update_interface(intf)
634
635 1
    def create_or_update_connection(self, connection):
636
        """Update a connection.
637
638
        Args:
639
            connection (:class:`~kytos.core.connection.Connection`):
640
                Instance of connection that will be updated.
641
        """
642 1
        self.connections[connection.id] = connection
643
644 1
    def get_connection_by_id(self, conn_id):
645
        """Return a existent connection by id.
646
647
        Args:
648
            id (int): id from a connection.
649
650
        Returns:
651
            :class:`~kytos.core.connection.Connection`: Instance of connection
652
                or None Type.
653
654
        """
655
        return self.connections.get(conn_id)
656
657 1
    def remove_connection(self, connection):
658
        """Close a existent connection and remove it.
659
660
        Args:
661
            connection (:class:`~kytos.core.connection.Connection`):
662
                Instance of connection that will be removed.
663
        """
664
        if connection is None:
665
            return False
666
667
        try:
668
            connection.close()
669
            del self.connections[connection.id]
670
        except KeyError:
671
            return False
672
        return True
673
674 1
    def remove_switch(self, switch):
675
        """Remove an existent switch.
676
677
        Args:
678
            switch (:class:`~kytos.core.switch.Switch`):
679
                Instance of switch that will be removed.
680
        """
681
        try:
682
            del self.switches[switch.dpid]
683
        except KeyError:
684
            return False
685
        return True
686
687 1
    def new_connection(self, event):
688
        """Handle a kytos/core.connection.new event.
689
690
        This method will read new connection event and store the connection
691
        (socket) into the connections attribute on the controller.
692
693
        It also clear all references to the connection since it is a new
694
        connection on the same ip:port.
695
696
        Args:
697
            event (~kytos.core.KytosEvent):
698
                The received event (``kytos/core.connection.new``) with the
699
                needed info.
700
        """
701
        self.log.info("Handling %s...", event)
702
703
        connection = event.source
704
        self.log.debug("Event source: %s", event.source)
705
706
        # Remove old connection (aka cleanup) if it exists
707
        if self.get_connection_by_id(connection.id):
708
            self.remove_connection(connection.id)
709
710
        # Update connections with the new connection
711
        self.create_or_update_connection(connection)
712
713 1
    def add_new_switch(self, switch):
714
        """Add a new switch on the controller.
715
716
        Args:
717
            switch (Switch): A Switch object
718
        """
719
        self.switches[switch.dpid] = switch
720
721 1
    def _import_napp(self, username, napp_name):
722
        """Import a NApp module.
723
724
        Raises:
725
            FileNotFoundError: if NApp's main.py is not found.
726
            ModuleNotFoundError: if any NApp requirement is not installed.
727
728
        """
729
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
730
        path = os.path.join(self.options.napps, username, napp_name,
731
                            'main.py')
732
        napp_spec = spec_from_file_location(mod_name, path)
733
        napp_module = module_from_spec(napp_spec)
734
        sys.modules[napp_spec.name] = napp_module
735
        napp_spec.loader.exec_module(napp_module)
736
        return napp_module
737
738 1
    def load_napp(self, username, napp_name):
739
        """Load a single NApp.
740
741
        Args:
742
            username (str): NApp username (makes up NApp's path).
743
            napp_name (str): Name of the NApp to be loaded.
744
        """
745
        if (username, napp_name) in self.napps:
746
            message = 'NApp %s/%s was already loaded'
747
            self.log.warning(message, username, napp_name)
748
            return
749
750
        try:
751
            napp_module = self._import_napp(username, napp_name)
752
        except ModuleNotFoundError as err:
753
            self.log.error("Error loading NApp '%s/%s': %s",
754
                           username, napp_name, err)
755
            return
756
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
757
            msg = "NApp module not found, assuming it's a meta napp: %s"
758
            self.log.warning(msg, err.filename)
759
            return
760
761
        napp = napp_module.Main(controller=self)
762
763
        self.napps[(username, napp_name)] = napp
764
765
        napp.start()
766
        self.api_server.register_napp_endpoints(napp)
767
768
        # pylint: disable=protected-access
769
        for event, listeners in napp._listeners.items():
770
            self.events_listeners.setdefault(event, []).extend(listeners)
771
        # pylint: enable=protected-access
772
773 1
    def pre_install_napps(self, napps, enable=True):
774
        """Pre install and enable NApps.
775
776
        Before installing, it'll check if it's installed yet.
777
778
        Args:
779
            napps ([str]): List of NApps to be pre-installed and enabled.
780
        """
781
        all_napps = self.napps_manager.get_installed_napps()
782
        installed = [str(napp) for napp in all_napps]
783
        napps_diff = [napp for napp in napps if napp not in installed]
784
        for napp in napps_diff:
785
            self.napps_manager.install(napp, enable=enable)
786
787 1
    def load_napps(self):
788
        """Load all NApps enabled on the NApps dir."""
789
        for napp in self.napps_manager.get_enabled_napps():
790
            try:
791
                self.log.info("Loading NApp %s", napp.id)
792
                self.load_napp(napp.username, napp.name)
793
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
794
                self.log.error("Could not load NApp %s: %s",
795
                               napp.id, exception)
796
797 1
    def unload_napp(self, username, napp_name):
798
        """Unload a specific NApp.
799
800
        Args:
801
            username (str): NApp username.
802
            napp_name (str): Name of the NApp to be unloaded.
803
        """
804 1
        napp = self.napps.pop((username, napp_name), None)
805
806 1
        if napp is None:
807
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
808
        else:
809 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
810 1
            napp_id = NApp(username, napp_name).id
811 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
812 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
813
            # Call listener before removing it from events_listeners
814 1
            napp_shutdown_fn(event)
815
816
            # Remove rest endpoints from that napp
817 1
            self.api_server.remove_napp_endpoints(napp)
818
819
            # Removing listeners from that napp
820
            # pylint: disable=protected-access
821 1
            for event_type, napp_listeners in napp._listeners.items():
822
                event_listeners = self.events_listeners[event_type]
823
                for listener in napp_listeners:
824
                    event_listeners.remove(listener)
825
                if not event_listeners:
826
                    del self.events_listeners[event_type]
827
            # pylint: enable=protected-access
828
829 1
    def unload_napps(self):
830
        """Unload all loaded NApps that are not core NApps."""
831
        # list() is used here to avoid the error:
832
        # 'RuntimeError: dictionary changed size during iteration'
833
        # This is caused by looping over an dictionary while removing
834
        # items from it.
835
        for (username, napp_name) in list(self.napps.keys()):  # noqa
836
            self.unload_napp(username, napp_name)
837
838 1
    def reload_napp_module(self, username, napp_name, napp_file):
839
        """Reload a NApp Module."""
840
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
841
        try:
842
            napp_module = import_module(mod_name)
843
        except ModuleNotFoundError as err:
844
            self.log.error("Module '%s' not found", mod_name)
845
            raise
846
        try:
847
            napp_module = reload_module(napp_module)
848
        except ImportError as err:
849
            self.log.error("Error reloading NApp '%s/%s': %s",
850
                           username, napp_name, err)
851
            raise
852
853 1
    def reload_napp(self, username, napp_name):
854
        """Reload a NApp."""
855
        self.unload_napp(username, napp_name)
856
        try:
857
            self.reload_napp_module(username, napp_name, 'settings')
858
            self.reload_napp_module(username, napp_name, 'main')
859
        except (ModuleNotFoundError, ImportError):
860
            return 400
861
        self.log.info("NApp '%s/%s' successfully reloaded",
862
                      username, napp_name)
863
        self.load_napp(username, napp_name)
864
        return 200
865
866 1
    def rest_reload_napp(self, username, napp_name):
867
        """Request reload a NApp."""
868
        res = self.reload_napp(username, napp_name)
869
        return 'reloaded', res
870
871 1
    def rest_reload_all_napps(self):
872
        """Request reload all NApps."""
873
        for napp in self.napps:
874
            self.reload_napp(*napp)
875
        return 'reloaded', 200
876