Completed
Push — master ( 539172...725999 )
by Beraldo
13s queued 10s
created

Controller.get_connection_by_id()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 12
rs 10
c 0
b 0
f 0
ccs 1
cts 5
cp 0.2
crap 1.512
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import reload as reload_module
26 1
from importlib import import_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43
from kytos.core.switch import Switch
44 1
45
__all__ = ('Controller',)
46
47 1
48
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60 1
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67 1
        """
68
        if options is None:
69
            options = KytosConfig().options['daemon']
70 1
71 1
        self._loop = loop or asyncio.get_event_loop()
72
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74 1
        #: dict: keep the main threads of the controller (buffers and handler)
75
        self._threads = {}
76 1
        #: KytosBuffers: KytosBuffer object with Controller buffers
77
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82 1
        #: is another dict with the connection information.
83
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88 1
        #: methods that will receive the referenced event
89
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name': napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95 1
        #: instance itself.
96
        self.napps = {}
97 1
        #: Object generated by ParseArgs on config.py file
98
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100 1
        #: connections.
101
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104 1
        #: The key is the switch dpid, while the value is a Switch object.
105
        self.switches = {}  # dpid: Switch()
106
107 1
        #: datetime.datetime: Time when the controller finished starting.
108
        self.started_at = None
109
110 1
        #: logging.Logger: Logger instance used by Kytos.
111
        self.log = None
112
113 1
        #: API Server used to expose rest endpoints.
114
        self.api_server = APIServer(__name__, self.options.listen,
115
                                    self.options.api_port,
116
                                    napps_dir=self.options.napps)
117 1
118
        self._register_endpoints()
119
120 1
        #: Observer that handle NApps when they are enabled or disabled.
121
        self.napp_dir_listener = NAppDirListener(self)
122
123
        self.napps_manager = NAppsManager(self)
124
125 1
        #: Adding the napps 'enabled' directory into the PATH
126
        #: Now you can access the enabled napps with:
127 1
        #: from napps.<username>.<napp_name> import ?....
128
        sys.path.append(os.path.join(self.options.napps, os.pardir))
129 1
130 1
    def enable_logs(self):
131 1
        """Register kytos log and enable the logs."""
132
        LogManager.load_config_file(self.options.logging, self.options.debug)
133 1
        LogManager.enable_websocket(self.api_server.server)
134
        self.log = logging.getLogger("controller")
135
136
    def start(self, restart=False):
137
        """Create pidfile and call start_controller method."""
138
        self.enable_logs()
139
        if not restart:
140 1
            self.create_pidfile()
141
        self.start_controller()
142
143
    def create_pidfile(self):
144
        """Create a pidfile."""
145
        pid = os.getpid()
146
147
        # Creates directory if it doesn't exist
148
        # System can erase /var/run's content
149
        pid_folder = Path(self.options.pidfile).parent
150
        self.log.info(pid_folder)
151
152
        # Pylint incorrectly infers Path objects
153
        # https://github.com/PyCQA/pylint/issues/224
154
        # pylint: disable=no-member
155
        if not pid_folder.exists():
156
            pid_folder.mkdir()
157
            pid_folder.chmod(0o1777)
158
        # pylint: enable=no-member
159
160
        # Make sure the file is deleted when controller stops
161
        atexit.register(Path(self.options.pidfile).unlink)
162
163
        # Checks if a pidfile exists. Creates a new file.
164
        try:
165
            pidfile = open(self.options.pidfile, mode='x')
166
        except OSError:
167
            # This happens if there is a pidfile already.
168
            # We shall check if the process that created the pidfile is still
169
            # running.
170
            try:
171
                existing_file = open(self.options.pidfile, mode='r')
172
                old_pid = int(existing_file.read())
173
                os.kill(old_pid, 0)
174
                # If kill() doesn't return an error, there's a process running
175
                # with the same PID. We assume it is Kytos and quit.
176
                # Otherwise, overwrite the file and proceed.
177
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
178
                             "running. Aborting.")
179
                sys.exit(error_msg.format(self.options.pidfile))
180
            except OSError:
181
                try:
182
                    pidfile = open(self.options.pidfile, mode='w')
183
                except OSError as exception:
184
                    error_msg = "Failed to create pidfile {}: {}."
185
                    sys.exit(error_msg.format(self.options.pidfile, exception))
186
187
        # Identifies the process that created the pidfile.
188 1
        pidfile.write(str(pid))
189
        pidfile.close()
190
191
    def start_controller(self):
192
        """Start the controller.
193
194
        Starts the KytosServer (TCP Server) coroutine.
195
        Starts a thread for each buffer handler.
196
        Load the installed apps.
197
        """
198
        self.log.info("Starting Kytos - Kytos Controller")
199
        self.server = KytosServer((self.options.listen,
200
                                   int(self.options.port)),
201
                                  KytosServerProtocol,
202
                                  self,
203
                                  self.options.protocol_name)
204
205
        self.log.info("Starting TCP server: %s", self.server)
206
        self.server.serve_forever()
207
208
        def _stop_loop(_):
209
            loop = asyncio.get_event_loop()
210
            # print(_.result())
211
            threads = threading.enumerate()
212
            self.log.debug("%s threads before loop.stop: %s",
213
                           len(threads), threads)
214
            loop.stop()
215
216
        async def _run_api_server_thread(executor):
217
            log = logging.getLogger('controller.api_server_thread')
218
            log.debug('starting')
219
            # log.debug('creating tasks')
220
            loop = asyncio.get_event_loop()
221
            blocking_tasks = [
222
                loop.run_in_executor(executor, self.api_server.run)
223
            ]
224
            # log.debug('waiting for tasks')
225
            completed, pending = await asyncio.wait(blocking_tasks)
226
            # results = [t.result() for t in completed]
227
            # log.debug('results: {!r}'.format(results))
228
            log.debug('completed: %d, pending: %d',
229
                      len(completed), len(pending))
230
231
        task = self._loop.create_task(self.raw_event_handler())
232
        task = self._loop.create_task(self.msg_in_event_handler())
233
        task = self._loop.create_task(self.msg_out_event_handler())
234
        task = self._loop.create_task(self.app_event_handler())
235
        task = self._loop.create_task(_run_api_server_thread(self._pool))
236
        task.add_done_callback(_stop_loop)
237
238
        self.log.info("ThreadPool started: %s", self._pool)
239
240
        # ASYNC TODO: ensure all threads started correctly
241
        # This is critical, if any of them failed starting we should exit.
242
        # sys.exit(error_msg.format(thread, exception))
243
244
        self.log.info("Loading Kytos NApps...")
245
        self.napp_dir_listener.start()
246
        self.pre_install_napps(self.options.napps_pre_installed)
247
        self.load_napps()
248 1
249
        self.started_at = now()
250
251
    def _register_endpoints(self):
252
        """Register all rest endpoint served by kytos.
253
254
        -   Register APIServer endpoints
255 1
        -   Register WebUI endpoints
256
        -   Register ``/api/kytos/core/config`` endpoint
257 1
        """
258
        self.api_server.start_api()
259
        # Register controller endpoints as /api/kytos/core/...
260 1
        self.api_server.register_core_endpoint('config/',
261
                                               self.configuration_endpoint)
262
263 1
        self.api_server.register_core_endpoint(
264
            'reload/<username>/<napp_name>/',
265
            self.rest_reload_napp)
266 1
        self.api_server.register_core_endpoint('reload/all',
267
                                               self.rest_reload_all_napps)
268 1
269
    def register_rest_endpoint(self, url, function, methods):
270 1
        """Deprecate in favor of @rest decorator."""
271
        self.api_server.register_rest_endpoint(url, function, methods)
272
273
    def configuration_endpoint(self):
274
        """Return the configuration options used by Kytos.
275
276
        Returns:
277 1
            string: Json with current configurations used by kytos.
278
279 1
        """
280
        return json.dumps(self.options.__dict__)
281
282
    def restart(self, graceful=True):
283
        """Restart Kytos SDN Controller.
284
285
        Args:
286
            graceful(bool): Represents the way that Kytos will restart.
287
        """
288
        if self.started_at is not None:
289
            self.stop(graceful)
290
            self.__init__(self.options)
291 1
292
        self.start(restart=True)
293
294
    def stop(self, graceful=True):
295
        """Shutdown all services used by kytos.
296
297
        This method should:
298
            - stop all Websockets
299
            - stop the API Server
300
            - stop the Controller
301
        """
302 1
        if self.started_at:
303
            self.stop_controller(graceful)
304
305
    def stop_controller(self, graceful=True):
306
        """Stop the controller.
307
308
        This method should:
309
            - announce on the network that the controller will shutdown;
310
            - stop receiving incoming packages;
311
            - call the 'shutdown' method of each KytosNApp that is running;
312
            - finish reading the events on all buffers;
313
            - stop each running handler;
314
            - stop all running threads;
315
            - stop the KytosServer;
316
        """
317
        self.log.info("Stopping Kytos")
318
319
        self.buffers.send_stop_signal()
320
        self.api_server.stop_api_server()
321
        self.napp_dir_listener.stop()
322
323
        self.log.info("Stopping threadpool: %s", self._pool)
324
325
        threads = threading.enumerate()
326
        self.log.debug("%s threads before threadpool shutdown: %s",
327
                       len(threads), threads)
328
329
        self._pool.shutdown(wait=graceful)
330
331
        # self.server.socket.shutdown()
332
        # self.server.socket.close()
333
334
        # for thread in self._threads.values():
335
        #     self.log.info("Stopping thread: %s", thread.name)
336
        #     thread.join()
337
338
        # for thread in self._threads.values():
339
        #     while thread.is_alive():
340
        #         self.log.info("Thread is alive: %s", thread.name)
341
        #         pass
342
343
        self.started_at = None
344
        self.unload_napps()
345
        self.buffers = KytosBuffers()
346
347
        # ASYNC TODO: close connections
348
        # self.server.server_close()
349
350 1
        # Shutdown the TCP server and the main asyncio loop
351
        self.server.shutdown()
352
353
    def status(self):
354
        """Return status of Kytos Server.
355
356
        If the controller kytos is running this method will be returned
357
        "Running since 'Started_At'", otherwise "Stopped".
358
359
        Returns:
360
            string: String with kytos status.
361
362
        """
363
        if self.started_at:
364 1
            return "Running since %s" % self.started_at
365
        return "Stopped"
366
367
    def uptime(self):
368
        """Return the uptime of kytos server.
369
370
        This method should return:
371
            - 0 if Kytos Server is stopped.
372
            - (kytos.start_at - datetime.now) if Kytos Server is running.
373
374
        Returns:
375
           datetime.timedelta: The uptime interval.
376
377 1
        """
378
        return now() - self.started_at if self.started_at else 0
379
380
    def notify_listeners(self, event):
381
        """Send the event to the specified listeners.
382
383
        Loops over self.events_listeners matching (by regexp) the attribute
384
        name of the event with the keys of events_listeners. If a match occurs,
385
        then send the event to each registered listener.
386
387
        Args:
388
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
389
        """
390
        self.log.debug("looking for listeners for %s", event)
391
        for event_regex, listeners in dict(self.events_listeners).items():
392
            # self.log.debug("listeners found for %s: %r => %s", event,
393
            #                event_regex, [l.__qualname__ for l in listeners])
394
            # Do not match if the event has more characters
395
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
396
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
397
                event_regex += '$'
398
            if re.match(event_regex, event.name):
399
                # self.log.debug('Calling listeners for %s', event)
400 1
                for listener in listeners:
401
                    listener(event)
402
403
    async def raw_event_handler(self):
404
        """Handle raw events.
405
406
        Listen to the raw_buffer and send all its events to the
407
        corresponding listeners.
408
        """
409
        self.log.info("Raw Event Handler started")
410
        while True:
411
            event = await self.buffers.raw.aget()
412
            self.notify_listeners(event)
413
            self.log.debug("Raw Event handler called")
414
415
            if event.name == "kytos/core.shutdown":
416 1
                self.log.debug("Raw Event handler stopped")
417
                break
418
419
    async def msg_in_event_handler(self):
420
        """Handle msg_in events.
421
422
        Listen to the msg_in buffer and send all its events to the
423
        corresponding listeners.
424
        """
425
        self.log.info("Message In Event Handler started")
426
        while True:
427
            event = await self.buffers.msg_in.aget()
428
            self.notify_listeners(event)
429
            self.log.debug("Message In Event handler called")
430
431
            if event.name == "kytos/core.shutdown":
432 1
                self.log.debug("Message In Event handler stopped")
433
                break
434
435
    async def msg_out_event_handler(self):
436
        """Handle msg_out events.
437
438
        Listen to the msg_out buffer and send all its events to the
439
        corresponding listeners.
440
        """
441
        self.log.info("Message Out Event Handler started")
442
        while True:
443
            triggered_event = await self.buffers.msg_out.aget()
444
445
            if triggered_event.name == "kytos/core.shutdown":
446
                self.log.debug("Message Out Event handler stopped")
447
                break
448
449
            message = triggered_event.content['message']
450
            destination = triggered_event.destination
451
            if (destination and
452
                    not destination.state == ConnectionState.FINISHED):
453
                packet = message.pack()
454
                destination.send(packet)
455
                self.log.debug('Connection %s: OUT OFP, '
456
                               'version: %s, type: %s, xid: %s - %s',
457
                               destination.id,
458
                               message.header.version,
459
                               message.header.message_type,
460
                               message.header.xid,
461
                               packet.hex())
462
                self.notify_listeners(triggered_event)
463
                self.log.debug("Message Out Event handler called")
464 1
            else:
465
                self.log.info("connection closed. Cannot send message")
466
467
    async def app_event_handler(self):
468
        """Handle app events.
469
470
        Listen to the app buffer and send all its events to the
471
        corresponding listeners.
472
        """
473
        self.log.info("App Event Handler started")
474
        while True:
475
            event = await self.buffers.app.aget()
476
            self.notify_listeners(event)
477
            self.log.debug("App Event handler called")
478
479
            if event.name == "kytos/core.shutdown":
480 1
                self.log.debug("App Event handler stopped")
481
                break
482
483
    def get_interface_by_id(self, interface_id):
484
        """Find a Interface  with interface_id.
485
486
        Args:
487
            interface_id(str): Interface Identifier.
488
489
        Returns:
490
            Interface: Instance of Interface with the id given.
491
492
        """
493
        if interface_id is None:
494
            return None
495
496
        switch_id = ":".join(interface_id.split(":")[:-1])
497
        interface_number = int(interface_id.split(":")[-1])
498
499
        switch = self.switches.get(switch_id)
500
501
        if not switch:
502
            return None
503 1
504
        return switch.interfaces.get(interface_number, None)
505
506
    def get_switch_by_dpid(self, dpid):
507
        """Return a specific switch by dpid.
508
509
        Args:
510
            dpid (|DPID|): dpid object used to identify a switch.
511
512
        Returns:
513
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
514
515 1
        """
516
        return self.switches.get(dpid)
517
518
    def get_switch_or_create(self, dpid, connection):
519
        """Return switch or create it if necessary.
520
521
        Args:
522
            dpid (|DPID|): dpid object used to identify a switch.
523
            connection (:class:`~kytos.core.connection.Connection`):
524
                connection used by switch. If a switch has a connection that
525
                will be updated.
526
527
        Returns:
528
            :class:`~kytos.core.switch.Switch`: new or existent switch.
529
530
        """
531
        self.create_or_update_connection(connection)
532
        switch = self.get_switch_by_dpid(dpid)
533
        event_name = 'kytos/core.switch.'
534
535
        if switch is None:
536
            switch = Switch(dpid=dpid)
537
            self.add_new_switch(switch)
538
            event_name += 'new'
539
        else:
540
            event_name += 'reconnected'
541
542
        self.set_switch_options(dpid=dpid)
543
        event = KytosEvent(name=event_name, content={'switch': switch})
544
545
        old_connection = switch.connection
546
        switch.update_connection(connection)
547
548
        if old_connection is not connection:
549
            self.remove_connection(old_connection)
550
551 1
        self.buffers.app.put(event)
552
553
        return switch
554
555
    def set_switch_options(self, dpid):
556
        """Update the switch settings based on kytos.conf options.
557
558
        Args:
559
            dpid (str): dpid used to identify a switch.
560 1
561
        """
562
        switch = self.switches.get(dpid)
563
        if not switch:
564
            return
565
566
        vlan_pool = {}
567
        try:
568
            vlan_pool = json.loads(self.options.vlan_pool)
569
            if not vlan_pool:
570
                return
571
        except (TypeError, json.JSONDecodeError) as err:
572
            self.log.error("Invalid vlan_pool settings: %s", err)
573 1
574
        if vlan_pool.get(dpid):
575
            self.log.info(f"Loading vlan_pool configuration for dpid {dpid}")
576
            for intf_num, port_list in vlan_pool[dpid].items():
577
                if not switch.interfaces.get((intf_num)):
578
                    vlan_ids = set()
579
                    for vlan_range in port_list:
580
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
581
                        for vlan_id in range(vlan_begin, vlan_end):
582
                            vlan_ids.add(vlan_id)
583
                    intf_num = int(intf_num)
584
                    intf = Interface(name=intf_num, port_number=intf_num,
585
                                     switch=switch)
586
                    intf.set_available_tags(vlan_ids)
587
                    switch.update_interface(intf)
588
589
    def create_or_update_connection(self, connection):
590 1
        """Update a connection.
591
592
        Args:
593
            connection (:class:`~kytos.core.connection.Connection`):
594
                Instance of connection that will be updated.
595
        """
596
        self.connections[connection.id] = connection
597
598
    def get_connection_by_id(self, conn_id):
599
        """Return a existent connection by id.
600
601
        Args:
602
            id (int): id from a connection.
603 1
604
        Returns:
605
            :class:`~kytos.core.connection.Connection`: Instance of connection
606
                or None Type.
607
608
        """
609
        return self.connections.get(conn_id)
610
611
    def remove_connection(self, connection):
612
        """Close a existent connection and remove it.
613
614
        Args:
615
            connection (:class:`~kytos.core.connection.Connection`):
616
                Instance of connection that will be removed.
617
        """
618
        if connection is None:
619
            return False
620
621
        try:
622
            connection.close()
623
            del self.connections[connection.id]
624
        except KeyError:
625
            return False
626
        return True
627
628
    def remove_switch(self, switch):
629 1
        """Remove an existent switch.
630
631
        Args:
632
            switch (:class:`~kytos.core.switch.Switch`):
633
                Instance of switch that will be removed.
634
        """
635
        try:
636
            del self.switches[switch.dpid]
637 1
        except KeyError:
638
            return False
639
        return True
640
641
    def new_connection(self, event):
642
        """Handle a kytos/core.connection.new event.
643
644
        This method will read new connection event and store the connection
645
        (socket) into the connections attribute on the controller.
646
647
        It also clear all references to the connection since it is a new
648
        connection on the same ip:port.
649
650
        Args:
651
            event (~kytos.core.KytosEvent):
652
                The received event (``kytos/core.connection.new``) with the
653
                needed info.
654 1
        """
655
        self.log.info("Handling %s...", event)
656
657
        connection = event.source
658
        self.log.debug("Event source: %s", event.source)
659
660
        # Remove old connection (aka cleanup) if it exists
661
        if self.get_connection_by_id(connection.id):
662
            self.remove_connection(connection.id)
663
664
        # Update connections with the new connection
665
        self.create_or_update_connection(connection)
666
667
    def add_new_switch(self, switch):
668
        """Add a new switch on the controller.
669
670
        Args:
671
            switch (Switch): A Switch object
672
        """
673
        self.switches[switch.dpid] = switch
674
675
    def _import_napp(self, username, napp_name):
676
        """Import a NApp module.
677
678
        Raises:
679
            FileNotFoundError: if NApp's main.py is not found.
680
            ModuleNotFoundError: if any NApp requirement is not installed.
681
682
        """
683
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
684
        path = os.path.join(self.options.napps, username, napp_name,
685 1
                            'main.py')
686
        napp_spec = spec_from_file_location(mod_name, path)
687
        napp_module = module_from_spec(napp_spec)
688
        sys.modules[napp_spec.name] = napp_module
689
        napp_spec.loader.exec_module(napp_module)
690
        return napp_module
691
692
    def load_napp(self, username, napp_name):
693
        """Load a single NApp.
694
695
        Args:
696
            username (str): NApp username (makes up NApp's path).
697
            napp_name (str): Name of the NApp to be loaded.
698
        """
699 1
        if (username, napp_name) in self.napps:
700
            message = 'NApp %s/%s was already loaded'
701
            self.log.warning(message, username, napp_name)
702
            return
703
704
        try:
705
            napp_module = self._import_napp(username, napp_name)
706
        except ModuleNotFoundError as err:
1 ignored issue
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
707
            self.log.error("Error loading NApp '%s/%s': %s",
708
                           username, napp_name, err)
709
            return
710 1
        except FileNotFoundError as err:
711
            msg = "NApp module not found, assuming it's a meta napp: %s"
712
            self.log.warning(msg, err.filename)
713
            return
714
715
        napp = napp_module.Main(controller=self)
716
717 1
        self.napps[(username, napp_name)] = napp
718
719 1
        napp.start()
720
        self.api_server.register_napp_endpoints(napp)
721
722 1
        # pylint: disable=protected-access
723 1
        for event, listeners in napp._listeners.items():
724 1
            self.events_listeners.setdefault(event, []).extend(listeners)
725 1
        # pylint: enable=protected-access
726
727 1
    def pre_install_napps(self, napps, enable=True):
728
        """Pre install and enable NApps.
729
730 1
        Before installing, it'll check if it's installed yet.
731
732
        Args:
733
            napps ([str]): List of NApps to be pre-installed and enabled.
734 1
        """
735
        all_napps = self.napps_manager.get_installed_napps()
736
        installed = [str(napp) for napp in all_napps]
737
        napps_diff = [napp for napp in napps if napp not in installed]
738
        for napp in napps_diff:
739
            self.napps_manager.install(napp, enable=enable)
740
741
    def load_napps(self):
742 1
        """Load all NApps enabled on the NApps dir."""
743
        for napp in self.napps_manager.get_enabled_napps():
744
            try:
745
                self.log.info("Loading NApp %s", napp.id)
746
                self.load_napp(napp.username, napp.name)
747
            except FileNotFoundError as exception:
748
                self.log.error("Could not load NApp %s: %s",
749
                               napp.id, exception)
750
751 1
    def unload_napp(self, username, napp_name):
752
        """Unload a specific NApp.
753
754
        Args:
755
            username (str): NApp username.
756
            napp_name (str): Name of the NApp to be unloaded.
757
        """
758
        napp = self.napps.pop((username, napp_name), None)
759
760
        if napp is None:
761
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
762
        else:
763
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
764
            napp_id = NApp(username, napp_name).id
765
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
766
            napp_shutdown_fn = self.events_listeners[event.name][0]
767
            # Call listener before removing it from events_listeners
768
            napp_shutdown_fn(event)
769
770
            # Remove rest endpoints from that napp
771
            self.api_server.remove_napp_endpoints(napp)
772
773
            # Removing listeners from that napp
774 1
            # pylint: disable=protected-access
775
            for event_type, napp_listeners in napp._listeners.items():
776
                event_listeners = self.events_listeners[event_type]
777
                for listener in napp_listeners:
778
                    event_listeners.remove(listener)
779 1
                if not event_listeners:
780
                    del self.events_listeners[event_type]
781
            # pylint: enable=protected-access
782
783
    def unload_napps(self):
784
        """Unload all loaded NApps that are not core NApps."""
785
        # list() is used here to avoid the error:
786
        # 'RuntimeError: dictionary changed size during iteration'
787
        # This is caused by looping over an dictionary while removing
788
        # items from it.
789
        for (username, napp_name) in list(self.napps.keys()):  # noqa
790
            self.unload_napp(username, napp_name)
791
792
    def reload_napp_module(self, username, napp_name, napp_file):
793
        """Reload a NApp Module."""
794
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
795
        try:
796
            napp_module = import_module(mod_name)
797
        except ModuleNotFoundError as err:
1 ignored issue
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
798
            self.log.error("Module '%s' not found", mod_name)
799
            raise
800
        try:
801
            napp_module = reload_module(napp_module)
802
        except ImportError as err:
803
            self.log.error("Error reloading NApp '%s/%s': %s",
804
                           username, napp_name, err)
805
            raise
806
807
    def reload_napp(self, username, napp_name):
808
        """Reload a NApp."""
809
        self.unload_napp(username, napp_name)
810
        try:
811
            self.reload_napp_module(username, napp_name, 'settings')
812
            self.reload_napp_module(username, napp_name, 'main')
813
        except (ModuleNotFoundError, ImportError):
1 ignored issue
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
814
            return 400
815
        self.log.info("NApp '%s/%s' successfully reloaded",
816
                      username, napp_name)
817
        self.load_napp(username, napp_name)
818
        return 200
819
820
    def rest_reload_napp(self, username, napp_name):
821
        """Request reload a NApp."""
822
        res = self.reload_napp(username, napp_name)
823
        return 'reloaded', res
824
825
    def rest_reload_all_napps(self):
826
        """Request reload all NApps."""
827
        for napp in self.napps:
828
            self.reload_napp(*napp)
829
        return 'reloaded', 200
830