Completed
Push — master ( 76cc93...764801 )
by Beraldo
14s queued 11s
created

Controller.set_switch_options()   C

Complexity

Conditions 9

Size

Total Lines 34
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 55.8693

Importance

Changes 0
Metric Value
cc 9
eloc 25
nop 2
dl 0
loc 34
ccs 2
cts 12
cp 0.1666
crap 55.8693
rs 6.6666
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.logs import LogManager
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43
from kytos.core.switch import Switch
44 1
45
__all__ = ('Controller',)
46
47 1
48
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60 1
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67 1
        """
68
        if options is None:
69
            options = KytosConfig().options['daemon']
70 1
71 1
        self._loop = loop or asyncio.get_event_loop()
72
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74 1
        #: dict: keep the main threads of the controller (buffers and handler)
75
        self._threads = {}
76 1
        #: KytosBuffers: KytosBuffer object with Controller buffers
77
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82 1
        #: is another dict with the connection information.
83
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88 1
        #: methods that will receive the referenced event
89
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name': napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95 1
        #: instance itself.
96
        self.napps = {}
97 1
        #: Object generated by ParseArgs on config.py file
98
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100 1
        #: connections.
101
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104 1
        #: The key is the switch dpid, while the value is a Switch object.
105
        self.switches = {}  # dpid: Switch()
106
107 1
        #: datetime.datetime: Time when the controller finished starting.
108
        self.started_at = None
109
110 1
        #: logging.Logger: Logger instance used by Kytos.
111
        self.log = None
112
113 1
        #: API Server used to expose rest endpoints.
114
        self.api_server = APIServer(__name__, self.options.listen,
115
                                    self.options.api_port,
116
                                    napps_dir=self.options.napps)
117 1
118
        self._register_endpoints()
119
120 1
        #: Observer that handle NApps when they are enabled or disabled.
121
        self.napp_dir_listener = NAppDirListener(self)
122
123
        self.napps_manager = NAppsManager(self)
124
125 1
        #: Adding the napps 'enabled' directory into the PATH
126
        #: Now you can access the enabled napps with:
127 1
        #: from napps.<username>.<napp_name> import ?....
128
        sys.path.append(os.path.join(self.options.napps, os.pardir))
129 1
130 1
    def enable_logs(self):
131 1
        """Register kytos log and enable the logs."""
132
        LogManager.load_config_file(self.options.logging, self.options.debug)
133 1
        LogManager.enable_websocket(self.api_server.server)
134
        self.log = logging.getLogger("controller")
135
136
    def start(self, restart=False):
137
        """Create pidfile and call start_controller method."""
138
        self.enable_logs()
139
        if not restart:
140 1
            self.create_pidfile()
141
        self.start_controller()
142
143
    def create_pidfile(self):
144
        """Create a pidfile."""
145
        pid = os.getpid()
146
147
        # Creates directory if it doesn't exist
148
        # System can erase /var/run's content
149
        pid_folder = Path(self.options.pidfile).parent
150
        self.log.info(pid_folder)
151
152
        # Pylint incorrectly infers Path objects
153
        # https://github.com/PyCQA/pylint/issues/224
154
        # pylint: disable=no-member
155
        if not pid_folder.exists():
156
            pid_folder.mkdir()
157
            pid_folder.chmod(0o1777)
158
        # pylint: enable=no-member
159
160
        # Make sure the file is deleted when controller stops
161
        atexit.register(Path(self.options.pidfile).unlink)
162
163
        # Checks if a pidfile exists. Creates a new file.
164
        try:
165
            pidfile = open(self.options.pidfile, mode='x')
166
        except OSError:
167
            # This happens if there is a pidfile already.
168
            # We shall check if the process that created the pidfile is still
169
            # running.
170
            try:
171
                existing_file = open(self.options.pidfile, mode='r')
172
                old_pid = int(existing_file.read())
173
                os.kill(old_pid, 0)
174
                # If kill() doesn't return an error, there's a process running
175
                # with the same PID. We assume it is Kytos and quit.
176
                # Otherwise, overwrite the file and proceed.
177
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
178
                             "running. Aborting.")
179
                sys.exit(error_msg.format(self.options.pidfile))
180
            except OSError:
181
                try:
182
                    pidfile = open(self.options.pidfile, mode='w')
183
                except OSError as exception:
184
                    error_msg = "Failed to create pidfile {}: {}."
185
                    sys.exit(error_msg.format(self.options.pidfile, exception))
186
187
        # Identifies the process that created the pidfile.
188 1
        pidfile.write(str(pid))
189
        pidfile.close()
190
191
    def start_controller(self):
192
        """Start the controller.
193
194
        Starts the KytosServer (TCP Server) coroutine.
195
        Starts a thread for each buffer handler.
196
        Load the installed apps.
197
        """
198
        self.log.info("Starting Kytos - Kytos Controller")
199
        self.server = KytosServer((self.options.listen,
200
                                   int(self.options.port)),
201
                                  KytosServerProtocol,
202
                                  self,
203
                                  self.options.protocol_name)
204
205
        self.log.info("Starting TCP server: %s", self.server)
206
        self.server.serve_forever()
207
208
        def _stop_loop(_):
209
            loop = asyncio.get_event_loop()
210
            # print(_.result())
211
            threads = threading.enumerate()
212
            self.log.debug("%s threads before loop.stop: %s",
213
                           len(threads), threads)
214
            loop.stop()
215
216
        async def _run_api_server_thread(executor):
217
            log = logging.getLogger('controller.api_server_thread')
218
            log.debug('starting')
219
            # log.debug('creating tasks')
220
            loop = asyncio.get_event_loop()
221
            blocking_tasks = [
222
                loop.run_in_executor(executor, self.api_server.run)
223
            ]
224
            # log.debug('waiting for tasks')
225
            completed, pending = await asyncio.wait(blocking_tasks)
226
            # results = [t.result() for t in completed]
227
            # log.debug('results: {!r}'.format(results))
228
            log.debug('completed: %d, pending: %d',
229
                      len(completed), len(pending))
230
231
        task = self._loop.create_task(self.raw_event_handler())
232
        task = self._loop.create_task(self.msg_in_event_handler())
233
        task = self._loop.create_task(self.msg_out_event_handler())
234
        task = self._loop.create_task(self.app_event_handler())
235
        task = self._loop.create_task(_run_api_server_thread(self._pool))
236
        task.add_done_callback(_stop_loop)
237
238
        self.log.info("ThreadPool started: %s", self._pool)
239
240
        # ASYNC TODO: ensure all threads started correctly
241
        # This is critical, if any of them failed starting we should exit.
242
        # sys.exit(error_msg.format(thread, exception))
243
244
        self.log.info("Loading Kytos NApps...")
245
        self.napp_dir_listener.start()
246
        self.pre_install_napps(self.options.napps_pre_installed)
247
        self.load_napps()
248 1
249
        self.started_at = now()
250
251
    def _register_endpoints(self):
252
        """Register all rest endpoint served by kytos.
253
254
        -   Register APIServer endpoints
255 1
        -   Register WebUI endpoints
256
        -   Register ``/api/kytos/core/config`` endpoint
257 1
        """
258
        self.api_server.start_api()
259
        # Register controller endpoints as /api/kytos/core/...
260 1
        self.api_server.register_core_endpoint('config/',
261
                                               self.configuration_endpoint)
262
263 1
        self.api_server.register_core_endpoint(
264
            'reload/<username>/<napp_name>/',
265
            self.rest_reload_napp)
266 1
        self.api_server.register_core_endpoint('reload/all',
267
                                               self.rest_reload_all_napps)
268 1
269
    def register_rest_endpoint(self, url, function, methods):
270 1
        """Deprecate in favor of @rest decorator."""
271
        self.api_server.register_rest_endpoint(url, function, methods)
272
273
    def configuration_endpoint(self):
274
        """Return the configuration options used by Kytos.
275
276
        Returns:
277 1
            string: Json with current configurations used by kytos.
278
279 1
        """
280
        return json.dumps(self.options.__dict__)
281
282
    def restart(self, graceful=True):
283
        """Restart Kytos SDN Controller.
284
285
        Args:
286
            graceful(bool): Represents the way that Kytos will restart.
287
        """
288
        if self.started_at is not None:
289
            self.stop(graceful)
290
            self.__init__(self.options)
291 1
292
        self.start(restart=True)
293
294
    def stop(self, graceful=True):
295
        """Shutdown all services used by kytos.
296
297
        This method should:
298
            - stop all Websockets
299
            - stop the API Server
300
            - stop the Controller
301
        """
302 1
        if self.started_at:
303
            self.stop_controller(graceful)
304
305
    def stop_controller(self, graceful=True):
306
        """Stop the controller.
307
308
        This method should:
309
            - announce on the network that the controller will shutdown;
310
            - stop receiving incoming packages;
311
            - call the 'shutdown' method of each KytosNApp that is running;
312
            - finish reading the events on all buffers;
313
            - stop each running handler;
314
            - stop all running threads;
315
            - stop the KytosServer;
316
        """
317
        self.log.info("Stopping Kytos")
318
319
        self.buffers.send_stop_signal()
320
        self.api_server.stop_api_server()
321
        self.napp_dir_listener.stop()
322
323
        self.log.info("Stopping threadpool: %s", self._pool)
324
325
        threads = threading.enumerate()
326
        self.log.debug("%s threads before threadpool shutdown: %s",
327
                       len(threads), threads)
328
329
        self._pool.shutdown(wait=graceful)
330
331
        # self.server.socket.shutdown()
332
        # self.server.socket.close()
333
334
        # for thread in self._threads.values():
335
        #     self.log.info("Stopping thread: %s", thread.name)
336
        #     thread.join()
337
338
        # for thread in self._threads.values():
339
        #     while thread.is_alive():
340
        #         self.log.info("Thread is alive: %s", thread.name)
341
        #         pass
342
343
        self.started_at = None
344
        self.unload_napps()
345
        self.buffers = KytosBuffers()
346
347
        # ASYNC TODO: close connections
348
        # self.server.server_close()
349
350 1
        # Shutdown the TCP server and the main asyncio loop
351
        self.server.shutdown()
352
353
    def status(self):
354
        """Return status of Kytos Server.
355
356
        If the controller kytos is running this method will be returned
357
        "Running since 'Started_At'", otherwise "Stopped".
358
359
        Returns:
360
            string: String with kytos status.
361
362
        """
363
        if self.started_at:
364 1
            return "Running since %s" % self.started_at
365
        return "Stopped"
366
367
    def uptime(self):
368
        """Return the uptime of kytos server.
369
370
        This method should return:
371
            - 0 if Kytos Server is stopped.
372
            - (kytos.start_at - datetime.now) if Kytos Server is running.
373
374
        Returns:
375
           datetime.timedelta: The uptime interval.
376
377 1
        """
378
        return now() - self.started_at if self.started_at else 0
379
380
    def notify_listeners(self, event):
381
        """Send the event to the specified listeners.
382
383
        Loops over self.events_listeners matching (by regexp) the attribute
384
        name of the event with the keys of events_listeners. If a match occurs,
385
        then send the event to each registered listener.
386
387
        Args:
388
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
389
        """
390
        self.log.debug("looking for listeners for %s", event)
391
        for event_regex, listeners in dict(self.events_listeners).items():
392
            # self.log.debug("listeners found for %s: %r => %s", event,
393
            #                event_regex, [l.__qualname__ for l in listeners])
394
            # Do not match if the event has more characters
395
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
396
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
397
                event_regex += '$'
398
            if re.match(event_regex, event.name):
399
                # self.log.debug('Calling listeners for %s', event)
400 1
                for listener in listeners:
401
                    listener(event)
402
403
    async def raw_event_handler(self):
404
        """Handle raw events.
405
406
        Listen to the raw_buffer and send all its events to the
407
        corresponding listeners.
408
        """
409
        self.log.info("Raw Event Handler started")
410
        while True:
411
            event = await self.buffers.raw.aget()
412
            self.notify_listeners(event)
413
            self.log.debug("Raw Event handler called")
414
415
            if event.name == "kytos/core.shutdown":
416 1
                self.log.debug("Raw Event handler stopped")
417
                break
418
419
    async def msg_in_event_handler(self):
420
        """Handle msg_in events.
421
422
        Listen to the msg_in buffer and send all its events to the
423
        corresponding listeners.
424
        """
425
        self.log.info("Message In Event Handler started")
426
        while True:
427
            event = await self.buffers.msg_in.aget()
428
            self.notify_listeners(event)
429
            self.log.debug("Message In Event handler called")
430
431
            if event.name == "kytos/core.shutdown":
432 1
                self.log.debug("Message In Event handler stopped")
433
                break
434
435
    async def msg_out_event_handler(self):
436
        """Handle msg_out events.
437
438
        Listen to the msg_out buffer and send all its events to the
439
        corresponding listeners.
440
        """
441
        self.log.info("Message Out Event Handler started")
442
        while True:
443
            triggered_event = await self.buffers.msg_out.aget()
444
445
            if triggered_event.name == "kytos/core.shutdown":
446
                self.log.debug("Message Out Event handler stopped")
447
                break
448
449
            message = triggered_event.content['message']
450
            destination = triggered_event.destination
451
            if (destination and
452
                    not destination.state == ConnectionState.FINISHED):
453
                packet = message.pack()
454
                destination.send(packet)
455
                self.log.debug('Connection %s: OUT OFP, '
456
                               'version: %s, type: %s, xid: %s - %s',
457
                               destination.id,
458
                               message.header.version,
459
                               message.header.message_type,
460
                               message.header.xid,
461
                               packet.hex())
462
                self.notify_listeners(triggered_event)
463
                self.log.debug("Message Out Event handler called")
464 1
            else:
465
                self.log.info("connection closed. Cannot send message")
466
467
    async def app_event_handler(self):
468
        """Handle app events.
469
470
        Listen to the app buffer and send all its events to the
471
        corresponding listeners.
472
        """
473
        self.log.info("App Event Handler started")
474
        while True:
475
            event = await self.buffers.app.aget()
476
            self.notify_listeners(event)
477
            self.log.debug("App Event handler called")
478
479
            if event.name == "kytos/core.shutdown":
480 1
                self.log.debug("App Event handler stopped")
481
                break
482
483
    def get_interface_by_id(self, interface_id):
484
        """Find a Interface  with interface_id.
485
486
        Args:
487
            interface_id(str): Interface Identifier.
488
489
        Returns:
490
            Interface: Instance of Interface with the id given.
491
492
        """
493
        if interface_id is None:
494
            return None
495
496
        switch_id = ":".join(interface_id.split(":")[:-1])
497
        interface_number = int(interface_id.split(":")[-1])
498
499
        switch = self.switches.get(switch_id)
500
501
        if not switch:
502
            return None
503 1
504
        return switch.interfaces.get(interface_number, None)
505
506
    def get_switch_by_dpid(self, dpid):
507
        """Return a specific switch by dpid.
508
509
        Args:
510
            dpid (|DPID|): dpid object used to identify a switch.
511
512
        Returns:
513
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
514
515 1
        """
516
        return self.switches.get(dpid)
517
518
    def get_switch_or_create(self, dpid, connection):
519
        """Return switch or create it if necessary.
520
521
        Args:
522
            dpid (|DPID|): dpid object used to identify a switch.
523
            connection (:class:`~kytos.core.connection.Connection`):
524
                connection used by switch. If a switch has a connection that
525
                will be updated.
526
527
        Returns:
528
            :class:`~kytos.core.switch.Switch`: new or existent switch.
529
530
        """
531
        self.create_or_update_connection(connection)
532
        switch = self.get_switch_by_dpid(dpid)
533
        event_name = 'kytos/core.switch.'
534
535
        if switch is None:
536
            switch = Switch(dpid=dpid)
537
            self.add_new_switch(switch)
538
            event_name += 'new'
539
        else:
540
            event_name += 'reconnected'
541
542
        self.set_switch_options(dpid=dpid)
543
        event = KytosEvent(name=event_name, content={'switch': switch})
544
545
        old_connection = switch.connection
546
        switch.update_connection(connection)
547
548
        if old_connection is not connection:
549
            self.remove_connection(old_connection)
550
551 1
        self.buffers.app.put(event)
552
553
        return switch
554
555
    def set_switch_options(self, dpid):
556
        """Update the switch settings based on kytos.conf options.
557
558
        Args:
559
            dpid (|str|): dpid str used to identify a switch.
560 1
561
        """
562
563
        switch = self.switches.get(dpid)
564
        if not switch:
565
            return
566
567
        vlan_pool = {}
568
        try:
569
            vlan_pool = json.loads(self.options.vlan_pool)
570
            if not vlan_pool:
571
                return
572
        except json.JSONDecodeError as e:
573 1
            self.log.error(f"Invalid vlan_pool settings: {str(e)}")
574
575
        if vlan_pool.get(dpid):
576
            self.log.info(f"Loading vlan_pool configuration for dpid {dpid}")
577
            for intf_num, port_list in vlan_pool[dpid].items():
578
                if not switch.interfaces.get((intf_num)):
579
                    vlan_ids = set()
580
                    for vlan_range in port_list:
581
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
582
                        for vlan_id in range(vlan_begin, vlan_end):
583
                            vlan_ids.add(vlan_id)
584
                    intf_num = int(intf_num)
585
                    intf = Interface(name=intf_num, port_number=intf_num,
586
                                     switch=switch)
587
                    intf.set_available_tags(vlan_ids)
588
                    switch.update_interface(intf)
589
590 1
    def create_or_update_connection(self, connection):
591
        """Update a connection.
592
593
        Args:
594
            connection (:class:`~kytos.core.connection.Connection`):
595
                Instance of connection that will be updated.
596
        """
597
        self.connections[connection.id] = connection
598
599
    def get_connection_by_id(self, conn_id):
600
        """Return a existent connection by id.
601
602
        Args:
603 1
            id (int): id from a connection.
604
605
        Returns:
606
            :class:`~kytos.core.connection.Connection`: Instance of connection
607
                or None Type.
608
609
        """
610
        return self.connections.get(conn_id)
611
612
    def remove_connection(self, connection):
613
        """Close a existent connection and remove it.
614
615
        Args:
616
            connection (:class:`~kytos.core.connection.Connection`):
617
                Instance of connection that will be removed.
618
        """
619
        if connection is None:
620
            return False
621
622
        try:
623
            connection.close()
624
            del self.connections[connection.id]
625
        except KeyError:
626
            return False
627
        return True
628
629 1
    def remove_switch(self, switch):
630
        """Remove an existent switch.
631
632
        Args:
633
            switch (:class:`~kytos.core.switch.Switch`):
634
                Instance of switch that will be removed.
635
        """
636
        try:
637 1
            del self.switches[switch.dpid]
638
        except KeyError:
639
            return False
640
        return True
641
642
    def new_connection(self, event):
643
        """Handle a kytos/core.connection.new event.
644
645
        This method will read new connection event and store the connection
646
        (socket) into the connections attribute on the controller.
647
648
        It also clear all references to the connection since it is a new
649
        connection on the same ip:port.
650
651
        Args:
652
            event (~kytos.core.KytosEvent):
653
                The received event (``kytos/core.connection.new``) with the
654 1
                needed info.
655
        """
656
        self.log.info("Handling %s...", event)
657
658
        connection = event.source
659
        self.log.debug("Event source: %s", event.source)
660
661
        # Remove old connection (aka cleanup) if it exists
662
        if self.get_connection_by_id(connection.id):
663
            self.remove_connection(connection.id)
664
665
        # Update connections with the new connection
666
        self.create_or_update_connection(connection)
667
668
    def add_new_switch(self, switch):
669
        """Add a new switch on the controller.
670
671
        Args:
672
            switch (Switch): A Switch object
673
        """
674
        self.switches[switch.dpid] = switch
675
676
    def _import_napp(self, username, napp_name):
677
        """Import a NApp module.
678
679
        Raises:
680
            FileNotFoundError: if NApp's main.py is not found.
681
            ModuleNotFoundError: if any NApp requirement is not installed.
682
683
        """
684
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
685 1
        path = os.path.join(self.options.napps, username, napp_name,
686
                            'main.py')
687
        napp_spec = spec_from_file_location(mod_name, path)
688
        napp_module = module_from_spec(napp_spec)
689
        sys.modules[napp_spec.name] = napp_module
690
        napp_spec.loader.exec_module(napp_module)
691
        return napp_module
692
693
    def load_napp(self, username, napp_name):
694
        """Load a single NApp.
695
696
        Args:
697
            username (str): NApp username (makes up NApp's path).
698
            napp_name (str): Name of the NApp to be loaded.
699 1
        """
700
        if (username, napp_name) in self.napps:
701
            message = 'NApp %s/%s was already loaded'
702
            self.log.warning(message, username, napp_name)
703
            return
704
705
        try:
706
            napp_module = self._import_napp(username, napp_name)
707
        except ModuleNotFoundError as err:
1 ignored issue
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
708
            self.log.error("Error loading NApp '%s/%s': %s",
709
                           username, napp_name, err)
710 1
            return
711
        except FileNotFoundError as err:
712
            msg = "NApp module not found, assuming it's a meta napp: %s"
713
            self.log.warning(msg, err.filename)
714
            return
715
716
        napp = napp_module.Main(controller=self)
717 1
718
        self.napps[(username, napp_name)] = napp
719 1
720
        napp.start()
721
        self.api_server.register_napp_endpoints(napp)
722 1
723 1
        # pylint: disable=protected-access
724 1
        for event, listeners in napp._listeners.items():
725 1
            self.events_listeners.setdefault(event, []).extend(listeners)
726
        # pylint: enable=protected-access
727 1
728
    def pre_install_napps(self, napps, enable=True):
729
        """Pre install and enable NApps.
730 1
731
        Before installing, it'll check if it's installed yet.
732
733
        Args:
734 1
            napps ([str]): List of NApps to be pre-installed and enabled.
735
        """
736
        all_napps = self.napps_manager.get_installed_napps()
737
        installed = [str(napp) for napp in all_napps]
738
        napps_diff = [napp for napp in napps if napp not in installed]
739
        for napp in napps_diff:
740
            self.napps_manager.install(napp, enable=enable)
741
742 1
    def load_napps(self):
743
        """Load all NApps enabled on the NApps dir."""
744
        for napp in self.napps_manager.get_enabled_napps():
745
            try:
746
                self.log.info("Loading NApp %s", napp.id)
747
                self.load_napp(napp.username, napp.name)
748
            except FileNotFoundError as exception:
749
                self.log.error("Could not load NApp %s: %s",
750
                               napp.id, exception)
751 1
752
    def unload_napp(self, username, napp_name):
753
        """Unload a specific NApp.
754
755
        Args:
756
            username (str): NApp username.
757
            napp_name (str): Name of the NApp to be unloaded.
758
        """
759
        napp = self.napps.pop((username, napp_name), None)
760
761
        if napp is None:
762
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
763
        else:
764
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
765
            napp_id = NApp(username, napp_name).id
766
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
767
            napp_shutdown_fn = self.events_listeners[event.name][0]
768
            # Call listener before removing it from events_listeners
769
            napp_shutdown_fn(event)
770
771
            # Remove rest endpoints from that napp
772
            self.api_server.remove_napp_endpoints(napp)
773
774 1
            # Removing listeners from that napp
775
            # pylint: disable=protected-access
776
            for event_type, napp_listeners in napp._listeners.items():
777
                event_listeners = self.events_listeners[event_type]
778
                for listener in napp_listeners:
779 1
                    event_listeners.remove(listener)
780
                if not event_listeners:
781
                    del self.events_listeners[event_type]
782
            # pylint: enable=protected-access
783
784
    def unload_napps(self):
785
        """Unload all loaded NApps that are not core NApps."""
786
        # list() is used here to avoid the error:
787
        # 'RuntimeError: dictionary changed size during iteration'
788
        # This is caused by looping over an dictionary while removing
789
        # items from it.
790
        for (username, napp_name) in list(self.napps.keys()):  # noqa
791
            self.unload_napp(username, napp_name)
792
793
    def reload_napp(self, username, napp_name):
794
        """Reload a NApp."""
795
        self.unload_napp(username, napp_name)
796
797
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
798
        try:
799
            napp_module = import_module(mod_name)
800
        except ModuleNotFoundError as err:
1 ignored issue
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
801
            self.log.error("Module '%s' not found", mod_name)
802
            return 400
803
804
        try:
805
            napp_module = reload_module(napp_module)
806
            self.log.info("NApp '%s/%s' successfully reloaded",
807
                          username, napp_name)
808
        except ImportError as err:
809
            self.log.error("Error reloading NApp '%s/%s': %s",
810
                           username, napp_name, err)
811
            return 400
812
813
        self.load_napp(username, napp_name)
814
        return 200
815
816
    def rest_reload_napp(self, username, napp_name):
817
        """Request reload a NApp."""
818
        res = self.reload_napp(username, napp_name)
819
        return 'reloaded', res
820
821
    def rest_reload_all_napps(self):
822
        """Request reload all NApps."""
823
        for napp in self.napps:
824
            self.reload_napp(*napp)
825
        return 'reloaded', 200
826