Passed
Push — master ( b99802...4a7861 )
by Beraldo
05:16 queued 02:47
created

kytos/core/controller.py (3 issues)

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43 1
from kytos.core.switch import Switch
44
45 1
__all__ = ('Controller',)
46
47
48 1
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61 1
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67
        """
68 1
        if options is None:
69
            options = KytosConfig().options['daemon']
70
71 1
        self._loop = loop or asyncio.get_event_loop()
72 1
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74
        #: dict: keep the main threads of the controller (buffers and handler)
75 1
        self._threads = {}
76
        #: KytosBuffers: KytosBuffer object with Controller buffers
77 1
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82
        #: is another dict with the connection information.
83 1
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88
        #: methods that will receive the referenced event
89 1
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95
        #: instance itself.
96 1
        self.napps = {}
97
        #: Object generated by ParseArgs on config.py file
98 1
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100
        #: connections.
101 1
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104
        #: The key is the switch dpid, while the value is a Switch object.
105 1
        self.switches = {}  # dpid: Switch()
106
107
        #: datetime.datetime: Time when the controller finished starting.
108 1
        self.started_at = None
109
110
        #: logging.Logger: Logger instance used by Kytos.
111 1
        self.log = None
112
113
        #: Observer that handle NApps when they are enabled or disabled.
114 1
        self.napp_dir_listener = NAppDirListener(self)
115
116 1
        self.napps_manager = NAppsManager(self)
117
118
        #: API Server used to expose rest endpoints.
119 1
        self.api_server = APIServer(__name__, self.options.listen,
120
                                    self.options.api_port,
121
                                    self.napps_manager, self.options.napps)
122
123 1
        self._register_endpoints()
124
        #: Adding the napps 'enabled' directory into the PATH
125
        #: Now you can access the enabled napps with:
126
        #: from napps.<username>.<napp_name> import ?....
127 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
128
129 1
    def enable_logs(self):
130
        """Register kytos log and enable the logs."""
131 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
132 1
        LogManager.enable_websocket(self.api_server.server)
133 1
        self.log = logging.getLogger("controller")
134
135 1
    def start(self, restart=False):
136
        """Create pidfile and call start_controller method."""
137
        self.enable_logs()
138
        if not restart:
139
            self.create_pidfile()
140
        self.start_controller()
141
142 1
    def create_pidfile(self):
143
        """Create a pidfile."""
144
        pid = os.getpid()
145
146
        # Creates directory if it doesn't exist
147
        # System can erase /var/run's content
148
        pid_folder = Path(self.options.pidfile).parent
149
        self.log.info(pid_folder)
150
151
        # Pylint incorrectly infers Path objects
152
        # https://github.com/PyCQA/pylint/issues/224
153
        # pylint: disable=no-member
154
        if not pid_folder.exists():
155
            pid_folder.mkdir()
156
            pid_folder.chmod(0o1777)
157
        # pylint: enable=no-member
158
159
        # Make sure the file is deleted when controller stops
160
        atexit.register(Path(self.options.pidfile).unlink)
161
162
        # Checks if a pidfile exists. Creates a new file.
163
        try:
164
            pidfile = open(self.options.pidfile, mode='x')
165
        except OSError:
166
            # This happens if there is a pidfile already.
167
            # We shall check if the process that created the pidfile is still
168
            # running.
169
            try:
170
                existing_file = open(self.options.pidfile, mode='r')
171
                old_pid = int(existing_file.read())
172
                os.kill(old_pid, 0)
173
                # If kill() doesn't return an error, there's a process running
174
                # with the same PID. We assume it is Kytos and quit.
175
                # Otherwise, overwrite the file and proceed.
176
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
177
                             "running. Aborting.")
178
                sys.exit(error_msg.format(self.options.pidfile))
179
            except OSError:
180
                try:
181
                    pidfile = open(self.options.pidfile, mode='w')
182
                except OSError as exception:
183
                    error_msg = "Failed to create pidfile {}: {}."
184
                    sys.exit(error_msg.format(self.options.pidfile, exception))
185
186
        # Identifies the process that created the pidfile.
187
        pidfile.write(str(pid))
188
        pidfile.close()
189
190 1
    def start_controller(self):
191
        """Start the controller.
192
193
        Starts the KytosServer (TCP Server) coroutine.
194
        Starts a thread for each buffer handler.
195
        Load the installed apps.
196
        """
197
        self.log.info("Starting Kytos - Kytos Controller")
198
        self.server = KytosServer((self.options.listen,
199
                                   int(self.options.port)),
200
                                  KytosServerProtocol,
201
                                  self,
202
                                  self.options.protocol_name)
203
204
        self.log.info("Starting TCP server: %s", self.server)
205
        self.server.serve_forever()
206
207
        def _stop_loop(_):
208
            loop = asyncio.get_event_loop()
209
            # print(_.result())
210
            threads = threading.enumerate()
211
            self.log.debug("%s threads before loop.stop: %s",
212
                           len(threads), threads)
213
            loop.stop()
214
215
        async def _run_api_server_thread(executor):
216
            log = logging.getLogger('controller.api_server_thread')
217
            log.debug('starting')
218
            # log.debug('creating tasks')
219
            loop = asyncio.get_event_loop()
220
            blocking_tasks = [
221
                loop.run_in_executor(executor, self.api_server.run)
222
            ]
223
            # log.debug('waiting for tasks')
224
            completed, pending = await asyncio.wait(blocking_tasks)
225
            # results = [t.result() for t in completed]
226
            # log.debug('results: {!r}'.format(results))
227
            log.debug('completed: %d, pending: %d',
228
                      len(completed), len(pending))
229
230
        task = self._loop.create_task(self.raw_event_handler())
231
        task = self._loop.create_task(self.msg_in_event_handler())
232
        task = self._loop.create_task(self.msg_out_event_handler())
233
        task = self._loop.create_task(self.app_event_handler())
234
        task = self._loop.create_task(_run_api_server_thread(self._pool))
235
        task.add_done_callback(_stop_loop)
236
237
        self.log.info("ThreadPool started: %s", self._pool)
238
239
        # ASYNC TODO: ensure all threads started correctly
240
        # This is critical, if any of them failed starting we should exit.
241
        # sys.exit(error_msg.format(thread, exception))
242
243
        self.log.info("Loading Kytos NApps...")
244
        self.napp_dir_listener.start()
245
        self.pre_install_napps(self.options.napps_pre_installed)
246
        self.load_napps()
247
248
        self.started_at = now()
249
250 1
    def _register_endpoints(self):
251
        """Register all rest endpoint served by kytos.
252
253
        -   Register APIServer endpoints
254
        -   Register WebUI endpoints
255
        -   Register ``/api/kytos/core/config`` endpoint
256
        """
257 1
        self.api_server.start_api()
258
        # Register controller endpoints as /api/kytos/core/...
259 1
        self.api_server.register_core_endpoint('config/',
260
                                               self.configuration_endpoint)
261
262 1
        self.api_server.register_core_endpoint(
263
            'reload/<username>/<napp_name>/',
264
            self.rest_reload_napp)
265 1
        self.api_server.register_core_endpoint('reload/all',
266
                                               self.rest_reload_all_napps)
267
268 1
    def register_rest_endpoint(self, url, function, methods):
269
        """Deprecate in favor of @rest decorator."""
270 1
        self.api_server.register_rest_endpoint(url, function, methods)
271
272 1
    def configuration_endpoint(self):
273
        """Return the configuration options used by Kytos.
274
275
        Returns:
276
            string: Json with current configurations used by kytos.
277
278
        """
279 1
        return json.dumps(self.options.__dict__)
280
281 1
    def restart(self, graceful=True):
282
        """Restart Kytos SDN Controller.
283
284
        Args:
285
            graceful(bool): Represents the way that Kytos will restart.
286
        """
287
        if self.started_at is not None:
288
            self.stop(graceful)
289
            self.__init__(self.options)
290
291
        self.start(restart=True)
292
293 1
    def stop(self, graceful=True):
294
        """Shutdown all services used by kytos.
295
296
        This method should:
297
            - stop all Websockets
298
            - stop the API Server
299
            - stop the Controller
300
        """
301
        if self.started_at:
302
            self.stop_controller(graceful)
303
304 1
    def stop_controller(self, graceful=True):
305
        """Stop the controller.
306
307
        This method should:
308
            - announce on the network that the controller will shutdown;
309
            - stop receiving incoming packages;
310
            - call the 'shutdown' method of each KytosNApp that is running;
311
            - finish reading the events on all buffers;
312
            - stop each running handler;
313
            - stop all running threads;
314
            - stop the KytosServer;
315
        """
316
        self.log.info("Stopping Kytos")
317
318
        self.buffers.send_stop_signal()
319
        self.api_server.stop_api_server()
320
        self.napp_dir_listener.stop()
321
322
        self.log.info("Stopping threadpool: %s", self._pool)
323
324
        threads = threading.enumerate()
325
        self.log.debug("%s threads before threadpool shutdown: %s",
326
                       len(threads), threads)
327
328
        self._pool.shutdown(wait=graceful)
329
330
        # self.server.socket.shutdown()
331
        # self.server.socket.close()
332
333
        # for thread in self._threads.values():
334
        #     self.log.info("Stopping thread: %s", thread.name)
335
        #     thread.join()
336
337
        # for thread in self._threads.values():
338
        #     while thread.is_alive():
339
        #         self.log.info("Thread is alive: %s", thread.name)
340
        #         pass
341
342
        self.started_at = None
343
        self.unload_napps()
344
        self.buffers = KytosBuffers()
345
346
        # ASYNC TODO: close connections
347
        # self.server.server_close()
348
349
        # Shutdown the TCP server and the main asyncio loop
350
        self.server.shutdown()
351
352 1
    def status(self):
353
        """Return status of Kytos Server.
354
355
        If the controller kytos is running this method will be returned
356
        "Running since 'Started_At'", otherwise "Stopped".
357
358
        Returns:
359
            string: String with kytos status.
360
361
        """
362
        if self.started_at:
363
            return "Running since %s" % self.started_at
364
        return "Stopped"
365
366 1
    def uptime(self):
367
        """Return the uptime of kytos server.
368
369
        This method should return:
370
            - 0 if Kytos Server is stopped.
371
            - (kytos.start_at - datetime.now) if Kytos Server is running.
372
373
        Returns:
374
           datetime.timedelta: The uptime interval.
375
376
        """
377
        return now() - self.started_at if self.started_at else 0
378
379 1
    def notify_listeners(self, event):
380
        """Send the event to the specified listeners.
381
382
        Loops over self.events_listeners matching (by regexp) the attribute
383
        name of the event with the keys of events_listeners. If a match occurs,
384
        then send the event to each registered listener.
385
386
        Args:
387
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
388
        """
389
        self.log.debug("looking for listeners for %s", event)
390
        for event_regex, listeners in dict(self.events_listeners).items():
391
            # self.log.debug("listeners found for %s: %r => %s", event,
392
            #                event_regex, [l.__qualname__ for l in listeners])
393
            # Do not match if the event has more characters
394
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
395
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
396
                event_regex += '$'
397
            if re.match(event_regex, event.name):
398
                # self.log.debug('Calling listeners for %s', event)
399
                for listener in listeners:
400
                    listener(event)
401
402 1
    async def raw_event_handler(self):
403
        """Handle raw events.
404
405
        Listen to the raw_buffer and send all its events to the
406
        corresponding listeners.
407
        """
408
        self.log.info("Raw Event Handler started")
409
        while True:
410
            event = await self.buffers.raw.aget()
411
            self.notify_listeners(event)
412
            self.log.debug("Raw Event handler called")
413
414
            if event.name == "kytos/core.shutdown":
415
                self.log.debug("Raw Event handler stopped")
416
                break
417
418 1
    async def msg_in_event_handler(self):
419
        """Handle msg_in events.
420
421
        Listen to the msg_in buffer and send all its events to the
422
        corresponding listeners.
423
        """
424
        self.log.info("Message In Event Handler started")
425
        while True:
426
            event = await self.buffers.msg_in.aget()
427
            self.notify_listeners(event)
428
            self.log.debug("Message In Event handler called")
429
430
            if event.name == "kytos/core.shutdown":
431
                self.log.debug("Message In Event handler stopped")
432
                break
433
434 1
    async def msg_out_event_handler(self):
435
        """Handle msg_out events.
436
437
        Listen to the msg_out buffer and send all its events to the
438
        corresponding listeners.
439
        """
440
        self.log.info("Message Out Event Handler started")
441
        while True:
442
            triggered_event = await self.buffers.msg_out.aget()
443
444
            if triggered_event.name == "kytos/core.shutdown":
445
                self.log.debug("Message Out Event handler stopped")
446
                break
447
448
            message = triggered_event.content['message']
449
            destination = triggered_event.destination
450
            if (destination and
451
                    not destination.state == ConnectionState.FINISHED):
452
                packet = message.pack()
453
                destination.send(packet)
454
                self.log.debug('Connection %s: OUT OFP, '
455
                               'version: %s, type: %s, xid: %s - %s',
456
                               destination.id,
457
                               message.header.version,
458
                               message.header.message_type,
459
                               message.header.xid,
460
                               packet.hex())
461
                self.notify_listeners(triggered_event)
462
                self.log.debug("Message Out Event handler called")
463
            else:
464
                self.log.info("connection closed. Cannot send message")
465
466 1
    async def app_event_handler(self):
467
        """Handle app events.
468
469
        Listen to the app buffer and send all its events to the
470
        corresponding listeners.
471
        """
472
        self.log.info("App Event Handler started")
473
        while True:
474
            event = await self.buffers.app.aget()
475
            self.notify_listeners(event)
476
            self.log.debug("App Event handler called")
477
478
            if event.name == "kytos/core.shutdown":
479
                self.log.debug("App Event handler stopped")
480
                break
481
482 1
    def get_interface_by_id(self, interface_id):
483
        """Find a Interface  with interface_id.
484
485
        Args:
486
            interface_id(str): Interface Identifier.
487
488
        Returns:
489
            Interface: Instance of Interface with the id given.
490
491
        """
492
        if interface_id is None:
493
            return None
494
495
        switch_id = ":".join(interface_id.split(":")[:-1])
496
        interface_number = int(interface_id.split(":")[-1])
497
498
        switch = self.switches.get(switch_id)
499
500
        if not switch:
501
            return None
502
503
        return switch.interfaces.get(interface_number, None)
504
505 1
    def get_switch_by_dpid(self, dpid):
506
        """Return a specific switch by dpid.
507
508
        Args:
509
            dpid (|DPID|): dpid object used to identify a switch.
510
511
        Returns:
512
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
513
514
        """
515 1
        return self.switches.get(dpid)
516
517 1
    def get_switch_or_create(self, dpid, connection):
518
        """Return switch or create it if necessary.
519
520
        Args:
521
            dpid (|DPID|): dpid object used to identify a switch.
522
            connection (:class:`~kytos.core.connection.Connection`):
523
                connection used by switch. If a switch has a connection that
524
                will be updated.
525
526
        Returns:
527
            :class:`~kytos.core.switch.Switch`: new or existent switch.
528
529
        """
530 1
        self.create_or_update_connection(connection)
531 1
        switch = self.get_switch_by_dpid(dpid)
532 1
        event_name = 'kytos/core.switch.'
533
534 1
        if switch is None:
535
            switch = Switch(dpid=dpid)
536
            self.add_new_switch(switch)
537
            event_name += 'new'
538
        else:
539 1
            event_name += 'reconnected'
540
541 1
        self.set_switch_options(dpid=dpid)
542 1
        event = KytosEvent(name=event_name, content={'switch': switch})
543
544 1
        old_connection = switch.connection
545 1
        switch.update_connection(connection)
546
547 1
        if old_connection is not connection:
548
            self.remove_connection(old_connection)
549
550 1
        self.buffers.app.put(event)
551
552 1
        return switch
553
554 1
    def set_switch_options(self, dpid):
555
        """Update the switch settings based on kytos.conf options.
556
557
        Args:
558
            dpid (str): dpid used to identify a switch.
559
560
        """
561 1
        switch = self.switches.get(dpid)
562 1
        if not switch:
563
            return
564
565 1
        vlan_pool = {}
566 1
        try:
567 1
            vlan_pool = json.loads(self.options.vlan_pool)
568 1
            if not vlan_pool:
569
                return
570
        except (TypeError, json.JSONDecodeError) as err:
571
            self.log.error("Invalid vlan_pool settings: %s", err)
572
573 1
        if vlan_pool.get(dpid):
574 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
575 1
            for intf_num, port_list in vlan_pool[dpid].items():
576 1
                if not switch.interfaces.get((intf_num)):
577 1
                    vlan_ids = set()
578 1
                    for vlan_range in port_list:
579 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
580 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
581 1
                            vlan_ids.add(vlan_id)
582 1
                    intf_num = int(intf_num)
583 1
                    intf = Interface(name=intf_num, port_number=intf_num,
584
                                     switch=switch)
585 1
                    intf.set_available_tags(vlan_ids)
586 1
                    switch.update_interface(intf)
587
588 1
    def create_or_update_connection(self, connection):
589
        """Update a connection.
590
591
        Args:
592
            connection (:class:`~kytos.core.connection.Connection`):
593
                Instance of connection that will be updated.
594
        """
595 1
        self.connections[connection.id] = connection
596
597 1
    def get_connection_by_id(self, conn_id):
598
        """Return a existent connection by id.
599
600
        Args:
601
            id (int): id from a connection.
602
603
        Returns:
604
            :class:`~kytos.core.connection.Connection`: Instance of connection
605
                or None Type.
606
607
        """
608
        return self.connections.get(conn_id)
609
610 1
    def remove_connection(self, connection):
611
        """Close a existent connection and remove it.
612
613
        Args:
614
            connection (:class:`~kytos.core.connection.Connection`):
615
                Instance of connection that will be removed.
616
        """
617
        if connection is None:
618
            return False
619
620
        try:
621
            connection.close()
622
            del self.connections[connection.id]
623
        except KeyError:
624
            return False
625
        return True
626
627 1
    def remove_switch(self, switch):
628
        """Remove an existent switch.
629
630
        Args:
631
            switch (:class:`~kytos.core.switch.Switch`):
632
                Instance of switch that will be removed.
633
        """
634
        try:
635
            del self.switches[switch.dpid]
636
        except KeyError:
637
            return False
638
        return True
639
640 1
    def new_connection(self, event):
641
        """Handle a kytos/core.connection.new event.
642
643
        This method will read new connection event and store the connection
644
        (socket) into the connections attribute on the controller.
645
646
        It also clear all references to the connection since it is a new
647
        connection on the same ip:port.
648
649
        Args:
650
            event (~kytos.core.KytosEvent):
651
                The received event (``kytos/core.connection.new``) with the
652
                needed info.
653
        """
654
        self.log.info("Handling %s...", event)
655
656
        connection = event.source
657
        self.log.debug("Event source: %s", event.source)
658
659
        # Remove old connection (aka cleanup) if it exists
660
        if self.get_connection_by_id(connection.id):
661
            self.remove_connection(connection.id)
662
663
        # Update connections with the new connection
664
        self.create_or_update_connection(connection)
665
666 1
    def add_new_switch(self, switch):
667
        """Add a new switch on the controller.
668
669
        Args:
670
            switch (Switch): A Switch object
671
        """
672
        self.switches[switch.dpid] = switch
673
674 1
    def _import_napp(self, username, napp_name):
675
        """Import a NApp module.
676
677
        Raises:
678
            FileNotFoundError: if NApp's main.py is not found.
679
            ModuleNotFoundError: if any NApp requirement is not installed.
680
681
        """
682
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
683
        path = os.path.join(self.options.napps, username, napp_name,
684
                            'main.py')
685
        napp_spec = spec_from_file_location(mod_name, path)
686
        napp_module = module_from_spec(napp_spec)
687
        sys.modules[napp_spec.name] = napp_module
688
        napp_spec.loader.exec_module(napp_module)
689
        return napp_module
690
691 1
    def load_napp(self, username, napp_name):
692
        """Load a single NApp.
693
694
        Args:
695
            username (str): NApp username (makes up NApp's path).
696
            napp_name (str): Name of the NApp to be loaded.
697
        """
698
        if (username, napp_name) in self.napps:
699
            message = 'NApp %s/%s was already loaded'
700
            self.log.warning(message, username, napp_name)
701
            return
702
703
        try:
704
            napp_module = self._import_napp(username, napp_name)
705
        except ModuleNotFoundError as err:
706
            self.log.error("Error loading NApp '%s/%s': %s",
707
                           username, napp_name, err)
708
            return
709
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
710
            msg = "NApp module not found, assuming it's a meta napp: %s"
711
            self.log.warning(msg, err.filename)
712
            return
713
714
        napp = napp_module.Main(controller=self)
715
716
        self.napps[(username, napp_name)] = napp
717
718
        napp.start()
719
        self.api_server.register_napp_endpoints(napp)
720
721
        # pylint: disable=protected-access
722
        for event, listeners in napp._listeners.items():
723
            self.events_listeners.setdefault(event, []).extend(listeners)
724
        # pylint: enable=protected-access
725
726 1
    def pre_install_napps(self, napps, enable=True):
727
        """Pre install and enable NApps.
728
729
        Before installing, it'll check if it's installed yet.
730
731
        Args:
732
            napps ([str]): List of NApps to be pre-installed and enabled.
733
        """
734
        all_napps = self.napps_manager.get_installed_napps()
735
        installed = [str(napp) for napp in all_napps]
736
        napps_diff = [napp for napp in napps if napp not in installed]
737
        for napp in napps_diff:
738
            self.napps_manager.install(napp, enable=enable)
739
740 1
    def load_napps(self):
741
        """Load all NApps enabled on the NApps dir."""
742
        for napp in self.napps_manager.get_enabled_napps():
743
            try:
744
                self.log.info("Loading NApp %s", napp.id)
745
                self.load_napp(napp.username, napp.name)
746
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
747
                self.log.error("Could not load NApp %s: %s",
748
                               napp.id, exception)
749
750 1
    def unload_napp(self, username, napp_name):
751
        """Unload a specific NApp.
752
753
        Args:
754
            username (str): NApp username.
755
            napp_name (str): Name of the NApp to be unloaded.
756
        """
757 1
        napp = self.napps.pop((username, napp_name), None)
758
759 1
        if napp is None:
760
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
761
        else:
762 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
763 1
            napp_id = NApp(username, napp_name).id
764 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
765 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
766
            # Call listener before removing it from events_listeners
767 1
            napp_shutdown_fn(event)
768
769
            # Remove rest endpoints from that napp
770 1
            self.api_server.remove_napp_endpoints(napp)
771
772
            # Removing listeners from that napp
773
            # pylint: disable=protected-access
774 1
            for event_type, napp_listeners in napp._listeners.items():
775
                event_listeners = self.events_listeners[event_type]
776
                for listener in napp_listeners:
777
                    event_listeners.remove(listener)
778
                if not event_listeners:
779
                    del self.events_listeners[event_type]
780
            # pylint: enable=protected-access
781
782 1
    def unload_napps(self):
783
        """Unload all loaded NApps that are not core NApps."""
784
        # list() is used here to avoid the error:
785
        # 'RuntimeError: dictionary changed size during iteration'
786
        # This is caused by looping over an dictionary while removing
787
        # items from it.
788
        for (username, napp_name) in list(self.napps.keys()):  # noqa
789
            self.unload_napp(username, napp_name)
790
791 1
    def reload_napp_module(self, username, napp_name, napp_file):
792
        """Reload a NApp Module."""
793
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
794
        try:
795
            napp_module = import_module(mod_name)
796
        except ModuleNotFoundError as err:
797
            self.log.error("Module '%s' not found", mod_name)
798
            raise
799
        try:
800
            napp_module = reload_module(napp_module)
801
        except ImportError as err:
802
            self.log.error("Error reloading NApp '%s/%s': %s",
803
                           username, napp_name, err)
804
            raise
805
806 1
    def reload_napp(self, username, napp_name):
807
        """Reload a NApp."""
808
        self.unload_napp(username, napp_name)
809
        try:
810
            self.reload_napp_module(username, napp_name, 'settings')
811
            self.reload_napp_module(username, napp_name, 'main')
812
        except (ModuleNotFoundError, ImportError):
813
            return 400
814
        self.log.info("NApp '%s/%s' successfully reloaded",
815
                      username, napp_name)
816
        self.load_napp(username, napp_name)
817
        return 200
818
819 1
    def rest_reload_napp(self, username, napp_name):
820
        """Request reload a NApp."""
821
        res = self.reload_napp(username, napp_name)
822
        return 'reloaded', res
823
824 1
    def rest_reload_all_napps(self):
825
        """Request reload all NApps."""
826
        for napp in self.napps:
827
            self.reload_napp(*napp)
828
        return 'reloaded', 200
829