Passed
Pull Request — master (#957)
by Rogerio
02:39
created

kytos.core.controller.Controller.loggers()   A

Complexity

Conditions 3

Size

Total Lines 11
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 0
dl 0
loc 11
ccs 6
cts 6
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43 1
from kytos.core.switch import Switch
44
45 1
__all__ = ('Controller',)
46
47
48 1
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61 1
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67
        """
68 1
        if options is None:
69
            options = KytosConfig().options['daemon']
70
71 1
        self._loop = loop or asyncio.get_event_loop()
72 1
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74
        #: dict: keep the main threads of the controller (buffers and handler)
75 1
        self._threads = {}
76
        #: KytosBuffers: KytosBuffer object with Controller buffers
77 1
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82
        #: is another dict with the connection information.
83 1
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88
        #: methods that will receive the referenced event
89 1
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95
        #: instance itself.
96 1
        self.napps = {}
97
        #: Object generated by ParseArgs on config.py file
98 1
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100
        #: connections.
101 1
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104
        #: The key is the switch dpid, while the value is a Switch object.
105 1
        self.switches = {}  # dpid: Switch()
106
107
        #: datetime.datetime: Time when the controller finished starting.
108 1
        self.started_at = None
109
110
        #: logging.Logger: Logger instance used by Kytos.
111 1
        self.log = None
112
113
        #: Observer that handle NApps when they are enabled or disabled.
114 1
        self.napp_dir_listener = NAppDirListener(self)
115
116 1
        self.napps_manager = NAppsManager(self)
117
118
        #: API Server used to expose rest endpoints.
119 1
        self.api_server = APIServer(__name__, self.options.listen,
120
                                    self.options.api_port,
121
                                    self.napps_manager, self.options.napps)
122
123 1
        self._register_endpoints()
124
        #: Adding the napps 'enabled' directory into the PATH
125
        #: Now you can access the enabled napps with:
126
        #: from napps.<username>.<napp_name> import ?....
127 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
128
129 1
    def enable_logs(self):
130
        """Register kytos log and enable the logs."""
131 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
132 1
        LogManager.enable_websocket(self.api_server.server)
133 1
        self.log = logging.getLogger(__name__)
134
135 1
    @staticmethod
136
    def loggers():
137
        """List all logging Loggers.
138
139
        Displays the Logger object, with name and logging level.
140
        """
141 1
        loggers = []
142 1
        for name in logging.root.manager.loggerDict:
143 1
            if "kytos" in name:
144 1
                loggers.append(logging.getLogger(name))
145 1
        return loggers
146
147 1
    def debug(self, name=None, enable=True):
148
        """Enable/disable logging debug messages to given logger name.
149
150
        If the name parameter is not specified the debug is activated to all
151
        loggers, including the root Logger.
152
        The debug is enable by default. It is the last parameter to be easier
153
        to the user to disable it using the console. Disabling the debug
154
        will reload the logging configuration file defaults.
155
156
        Args:
157
            name(text): Full hierarch Logger name. Ex: "kytos.core.controller"
158
            enable: True to enable the debug.
159
        """
160 1
        if enable:
161 1
            self._debug_on(name)
162
        else:
163 1
            self._debug_off(name)
164
165 1
    def _debug_on(self, name=None):
166
        """Turn on the debug.
167
168
        Turn on the logging debug to a specific Logger name, or to all
169
        Loggers, including the root Logger.
170
171
        Args:
172
            name(text): Full hierarch Logger name. Ex: "kytos.core.controller"
173
        """
174 1
        if name:
175
            # If logger name do not exist, raise an error.
176
            # otherwise the getLogger will create another Logger
177 1
            if name not in logging.root.manager.loggerDict:
178
                raise ValueError("Invalid logger name.")
179
180 1
            logger = logging.getLogger(name)
181 1
            logger.setLevel(logging.DEBUG)
182
        else:
183
            LogManager.load_config_file(self.options.logging, True)
184
185 1
    def _debug_off(self, name=None):
186
        """Turn on the debug.
187
188
        Turn off the logging debug to a specific Logger name, or to all
189
        Loggers, reloading the logging configuration file defaults.
190
191
        Args:
192
            name(text): Full hierarch Logger name. Ex: "kytos.core.controller"
193
        """
194 1
        if name:
195
            # If logger name do not exist, raise an error.
196
            # otherwise the getLogger will create another Logger
197 1
            if name not in logging.root.manager.loggerDict:
198
                raise ValueError("Invalid logger name.")
199
200 1
            logger = logging.getLogger(name)
201 1
            logger.setLevel(logging.NOTSET)
202
        else:
203
            LogManager.load_config_file(self.options.logging, False)
204
205 1
    def start(self, restart=False):
206
        """Create pidfile and call start_controller method."""
207
        self.enable_logs()
208
        if not restart:
209
            self.create_pidfile()
210
        self.start_controller()
211
212 1
    def create_pidfile(self):
213
        """Create a pidfile."""
214
        pid = os.getpid()
215
216
        # Creates directory if it doesn't exist
217
        # System can erase /var/run's content
218
        pid_folder = Path(self.options.pidfile).parent
219
        self.log.info(pid_folder)
220
221
        # Pylint incorrectly infers Path objects
222
        # https://github.com/PyCQA/pylint/issues/224
223
        # pylint: disable=no-member
224
        if not pid_folder.exists():
225
            pid_folder.mkdir()
226
            pid_folder.chmod(0o1777)
227
        # pylint: enable=no-member
228
229
        # Make sure the file is deleted when controller stops
230
        atexit.register(Path(self.options.pidfile).unlink)
231
232
        # Checks if a pidfile exists. Creates a new file.
233
        try:
234
            pidfile = open(self.options.pidfile, mode='x')
235
        except OSError:
236
            # This happens if there is a pidfile already.
237
            # We shall check if the process that created the pidfile is still
238
            # running.
239
            try:
240
                existing_file = open(self.options.pidfile, mode='r')
241
                old_pid = int(existing_file.read())
242
                os.kill(old_pid, 0)
243
                # If kill() doesn't return an error, there's a process running
244
                # with the same PID. We assume it is Kytos and quit.
245
                # Otherwise, overwrite the file and proceed.
246
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
247
                             "running. Aborting.")
248
                sys.exit(error_msg.format(self.options.pidfile))
249
            except OSError:
250
                try:
251
                    pidfile = open(self.options.pidfile, mode='w')
252
                except OSError as exception:
253
                    error_msg = "Failed to create pidfile {}: {}."
254
                    sys.exit(error_msg.format(self.options.pidfile, exception))
255
256
        # Identifies the process that created the pidfile.
257
        pidfile.write(str(pid))
258
        pidfile.close()
259
260 1
    def start_controller(self):
261
        """Start the controller.
262
263
        Starts the KytosServer (TCP Server) coroutine.
264
        Starts a thread for each buffer handler.
265
        Load the installed apps.
266
        """
267
        self.log.info("Starting Kytos - Kytos Controller")
268
        self.server = KytosServer((self.options.listen,
269
                                   int(self.options.port)),
270
                                  KytosServerProtocol,
271
                                  self,
272
                                  self.options.protocol_name)
273
274
        self.log.info("Starting TCP server: %s", self.server)
275
        self.server.serve_forever()
276
277
        def _stop_loop(_):
278
            loop = asyncio.get_event_loop()
279
            # print(_.result())
280
            threads = threading.enumerate()
281
            self.log.debug("%s threads before loop.stop: %s",
282
                           len(threads), threads)
283
            loop.stop()
284
285
        async def _run_api_server_thread(executor):
286
            log = logging.getLogger('kytos.core.controller.api_server_thread')
287
            log.debug('starting')
288
            # log.debug('creating tasks')
289
            loop = asyncio.get_event_loop()
290
            blocking_tasks = [
291
                loop.run_in_executor(executor, self.api_server.run)
292
            ]
293
            # log.debug('waiting for tasks')
294
            completed, pending = await asyncio.wait(blocking_tasks)
295
            # results = [t.result() for t in completed]
296
            # log.debug('results: {!r}'.format(results))
297
            log.debug('completed: %d, pending: %d',
298
                      len(completed), len(pending))
299
300
        task = self._loop.create_task(self.raw_event_handler())
301
        task = self._loop.create_task(self.msg_in_event_handler())
302
        task = self._loop.create_task(self.msg_out_event_handler())
303
        task = self._loop.create_task(self.app_event_handler())
304
        task = self._loop.create_task(_run_api_server_thread(self._pool))
305
        task.add_done_callback(_stop_loop)
306
307
        self.log.info("ThreadPool started: %s", self._pool)
308
309
        # ASYNC TODO: ensure all threads started correctly
310
        # This is critical, if any of them failed starting we should exit.
311
        # sys.exit(error_msg.format(thread, exception))
312
313
        self.log.info("Loading Kytos NApps...")
314
        self.napp_dir_listener.start()
315
        self.pre_install_napps(self.options.napps_pre_installed)
316
        self.load_napps()
317
318
        self.started_at = now()
319
320 1
    def _register_endpoints(self):
321
        """Register all rest endpoint served by kytos.
322
323
        -   Register APIServer endpoints
324
        -   Register WebUI endpoints
325
        -   Register ``/api/kytos/core/config`` endpoint
326
        """
327 1
        self.api_server.start_api()
328
        # Register controller endpoints as /api/kytos/core/...
329 1
        self.api_server.register_core_endpoint('config/',
330
                                               self.configuration_endpoint)
331
332 1
        self.api_server.register_core_endpoint(
333
            'reload/<username>/<napp_name>/',
334
            self.rest_reload_napp)
335 1
        self.api_server.register_core_endpoint('reload/all',
336
                                               self.rest_reload_all_napps)
337
338 1
    def register_rest_endpoint(self, url, function, methods):
339
        """Deprecate in favor of @rest decorator."""
340 1
        self.api_server.register_rest_endpoint(url, function, methods)
341
342 1
    def configuration_endpoint(self):
343
        """Return the configuration options used by Kytos.
344
345
        Returns:
346
            string: Json with current configurations used by kytos.
347
348
        """
349 1
        return json.dumps(self.options.__dict__)
350
351 1
    def restart(self, graceful=True):
352
        """Restart Kytos SDN Controller.
353
354
        Args:
355
            graceful(bool): Represents the way that Kytos will restart.
356
        """
357
        if self.started_at is not None:
358
            self.stop(graceful)
359
            self.__init__(self.options)
360
361
        self.start(restart=True)
362
363 1
    def stop(self, graceful=True):
364
        """Shutdown all services used by kytos.
365
366
        This method should:
367
            - stop all Websockets
368
            - stop the API Server
369
            - stop the Controller
370
        """
371
        if self.started_at:
372
            self.stop_controller(graceful)
373
374 1
    def stop_controller(self, graceful=True):
375
        """Stop the controller.
376
377
        This method should:
378
            - announce on the network that the controller will shutdown;
379
            - stop receiving incoming packages;
380
            - call the 'shutdown' method of each KytosNApp that is running;
381
            - finish reading the events on all buffers;
382
            - stop each running handler;
383
            - stop all running threads;
384
            - stop the KytosServer;
385
        """
386
        self.log.info("Stopping Kytos")
387
388
        self.buffers.send_stop_signal()
389
        self.api_server.stop_api_server()
390
        self.napp_dir_listener.stop()
391
392
        self.log.info("Stopping threadpool: %s", self._pool)
393
394
        threads = threading.enumerate()
395
        self.log.debug("%s threads before threadpool shutdown: %s",
396
                       len(threads), threads)
397
398
        self._pool.shutdown(wait=graceful)
399
400
        # self.server.socket.shutdown()
401
        # self.server.socket.close()
402
403
        # for thread in self._threads.values():
404
        #     self.log.info("Stopping thread: %s", thread.name)
405
        #     thread.join()
406
407
        # for thread in self._threads.values():
408
        #     while thread.is_alive():
409
        #         self.log.info("Thread is alive: %s", thread.name)
410
        #         pass
411
412
        self.started_at = None
413
        self.unload_napps()
414
        self.buffers = KytosBuffers()
415
416
        # ASYNC TODO: close connections
417
        # self.server.server_close()
418
419
        # Shutdown the TCP server and the main asyncio loop
420
        self.server.shutdown()
421
422 1
    def status(self):
423
        """Return status of Kytos Server.
424
425
        If the controller kytos is running this method will be returned
426
        "Running since 'Started_At'", otherwise "Stopped".
427
428
        Returns:
429
            string: String with kytos status.
430
431
        """
432
        if self.started_at:
433
            return "Running since %s" % self.started_at
434
        return "Stopped"
435
436 1
    def uptime(self):
437
        """Return the uptime of kytos server.
438
439
        This method should return:
440
            - 0 if Kytos Server is stopped.
441
            - (kytos.start_at - datetime.now) if Kytos Server is running.
442
443
        Returns:
444
           datetime.timedelta: The uptime interval.
445
446
        """
447
        return now() - self.started_at if self.started_at else 0
448
449 1
    def notify_listeners(self, event):
450
        """Send the event to the specified listeners.
451
452
        Loops over self.events_listeners matching (by regexp) the attribute
453
        name of the event with the keys of events_listeners. If a match occurs,
454
        then send the event to each registered listener.
455
456
        Args:
457
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
458
        """
459
        self.log.debug("looking for listeners for %s", event)
460
        for event_regex, listeners in dict(self.events_listeners).items():
461
            # self.log.debug("listeners found for %s: %r => %s", event,
462
            #                event_regex, [l.__qualname__ for l in listeners])
463
            # Do not match if the event has more characters
464
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
465
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
466
                event_regex += '$'
467
            if re.match(event_regex, event.name):
468
                # self.log.debug('Calling listeners for %s', event)
469
                for listener in listeners:
470
                    listener(event)
471
472 1
    async def raw_event_handler(self):
473
        """Handle raw events.
474
475
        Listen to the raw_buffer and send all its events to the
476
        corresponding listeners.
477
        """
478
        self.log.info("Raw Event Handler started")
479
        while True:
480
            event = await self.buffers.raw.aget()
481
            self.notify_listeners(event)
482
            self.log.debug("Raw Event handler called")
483
484
            if event.name == "kytos/core.shutdown":
485
                self.log.debug("Raw Event handler stopped")
486
                break
487
488 1
    async def msg_in_event_handler(self):
489
        """Handle msg_in events.
490
491
        Listen to the msg_in buffer and send all its events to the
492
        corresponding listeners.
493
        """
494
        self.log.info("Message In Event Handler started")
495
        while True:
496
            event = await self.buffers.msg_in.aget()
497
            self.notify_listeners(event)
498
            self.log.debug("Message In Event handler called")
499
500
            if event.name == "kytos/core.shutdown":
501
                self.log.debug("Message In Event handler stopped")
502
                break
503
504 1
    async def msg_out_event_handler(self):
505
        """Handle msg_out events.
506
507
        Listen to the msg_out buffer and send all its events to the
508
        corresponding listeners.
509
        """
510
        self.log.info("Message Out Event Handler started")
511
        while True:
512
            triggered_event = await self.buffers.msg_out.aget()
513
514
            if triggered_event.name == "kytos/core.shutdown":
515
                self.log.debug("Message Out Event handler stopped")
516
                break
517
518
            message = triggered_event.content['message']
519
            destination = triggered_event.destination
520
            if (destination and
521
                    not destination.state == ConnectionState.FINISHED):
522
                packet = message.pack()
523
                destination.send(packet)
524
                self.log.debug('Connection %s: OUT OFP, '
525
                               'version: %s, type: %s, xid: %s - %s',
526
                               destination.id,
527
                               message.header.version,
528
                               message.header.message_type,
529
                               message.header.xid,
530
                               packet.hex())
531
                self.notify_listeners(triggered_event)
532
                self.log.debug("Message Out Event handler called")
533
            else:
534
                self.log.info("connection closed. Cannot send message")
535
536 1
    async def app_event_handler(self):
537
        """Handle app events.
538
539
        Listen to the app buffer and send all its events to the
540
        corresponding listeners.
541
        """
542
        self.log.info("App Event Handler started")
543
        while True:
544
            event = await self.buffers.app.aget()
545
            self.notify_listeners(event)
546
            self.log.debug("App Event handler called")
547
548
            if event.name == "kytos/core.shutdown":
549
                self.log.debug("App Event handler stopped")
550
                break
551
552 1
    def get_interface_by_id(self, interface_id):
553
        """Find a Interface  with interface_id.
554
555
        Args:
556
            interface_id(str): Interface Identifier.
557
558
        Returns:
559
            Interface: Instance of Interface with the id given.
560
561
        """
562
        if interface_id is None:
563
            return None
564
565
        switch_id = ":".join(interface_id.split(":")[:-1])
566
        interface_number = int(interface_id.split(":")[-1])
567
568
        switch = self.switches.get(switch_id)
569
570
        if not switch:
571
            return None
572
573
        return switch.interfaces.get(interface_number, None)
574
575 1
    def get_switch_by_dpid(self, dpid):
576
        """Return a specific switch by dpid.
577
578
        Args:
579
            dpid (|DPID|): dpid object used to identify a switch.
580
581
        Returns:
582
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
583
584
        """
585 1
        return self.switches.get(dpid)
586
587 1
    def get_switch_or_create(self, dpid, connection):
588
        """Return switch or create it if necessary.
589
590
        Args:
591
            dpid (|DPID|): dpid object used to identify a switch.
592
            connection (:class:`~kytos.core.connection.Connection`):
593
                connection used by switch. If a switch has a connection that
594
                will be updated.
595
596
        Returns:
597
            :class:`~kytos.core.switch.Switch`: new or existent switch.
598
599
        """
600 1
        self.create_or_update_connection(connection)
601 1
        switch = self.get_switch_by_dpid(dpid)
602 1
        event_name = 'kytos/core.switch.'
603
604 1
        if switch is None:
605
            switch = Switch(dpid=dpid)
606
            self.add_new_switch(switch)
607
            event_name += 'new'
608
        else:
609 1
            event_name += 'reconnected'
610
611 1
        self.set_switch_options(dpid=dpid)
612 1
        event = KytosEvent(name=event_name, content={'switch': switch})
613
614 1
        old_connection = switch.connection
615 1
        switch.update_connection(connection)
616
617 1
        if old_connection is not connection:
618
            self.remove_connection(old_connection)
619
620 1
        self.buffers.app.put(event)
621
622 1
        return switch
623
624 1
    def set_switch_options(self, dpid):
625
        """Update the switch settings based on kytos.conf options.
626
627
        Args:
628
            dpid (str): dpid used to identify a switch.
629
630
        """
631 1
        switch = self.switches.get(dpid)
632 1
        if not switch:
633
            return
634
635 1
        vlan_pool = {}
636 1
        try:
637 1
            vlan_pool = json.loads(self.options.vlan_pool)
638 1
            if not vlan_pool:
639
                return
640
        except (TypeError, json.JSONDecodeError) as err:
641
            self.log.error("Invalid vlan_pool settings: %s", err)
642
643 1
        if vlan_pool.get(dpid):
644 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
645 1
            for intf_num, port_list in vlan_pool[dpid].items():
646 1
                if not switch.interfaces.get((intf_num)):
647 1
                    vlan_ids = set()
648 1
                    for vlan_range in port_list:
649 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
650 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
651 1
                            vlan_ids.add(vlan_id)
652 1
                    intf_num = int(intf_num)
653 1
                    intf = Interface(name=intf_num, port_number=intf_num,
654
                                     switch=switch)
655 1
                    intf.set_available_tags(vlan_ids)
656 1
                    switch.update_interface(intf)
657
658 1
    def create_or_update_connection(self, connection):
659
        """Update a connection.
660
661
        Args:
662
            connection (:class:`~kytos.core.connection.Connection`):
663
                Instance of connection that will be updated.
664
        """
665 1
        self.connections[connection.id] = connection
666
667 1
    def get_connection_by_id(self, conn_id):
668
        """Return a existent connection by id.
669
670
        Args:
671
            id (int): id from a connection.
672
673
        Returns:
674
            :class:`~kytos.core.connection.Connection`: Instance of connection
675
                or None Type.
676
677
        """
678
        return self.connections.get(conn_id)
679
680 1
    def remove_connection(self, connection):
681
        """Close a existent connection and remove it.
682
683
        Args:
684
            connection (:class:`~kytos.core.connection.Connection`):
685
                Instance of connection that will be removed.
686
        """
687
        if connection is None:
688
            return False
689
690
        try:
691
            connection.close()
692
            del self.connections[connection.id]
693
        except KeyError:
694
            return False
695
        return True
696
697 1
    def remove_switch(self, switch):
698
        """Remove an existent switch.
699
700
        Args:
701
            switch (:class:`~kytos.core.switch.Switch`):
702
                Instance of switch that will be removed.
703
        """
704
        try:
705
            del self.switches[switch.dpid]
706
        except KeyError:
707
            return False
708
        return True
709
710 1
    def new_connection(self, event):
711
        """Handle a kytos/core.connection.new event.
712
713
        This method will read new connection event and store the connection
714
        (socket) into the connections attribute on the controller.
715
716
        It also clear all references to the connection since it is a new
717
        connection on the same ip:port.
718
719
        Args:
720
            event (~kytos.core.KytosEvent):
721
                The received event (``kytos/core.connection.new``) with the
722
                needed info.
723
        """
724
        self.log.info("Handling %s...", event)
725
726
        connection = event.source
727
        self.log.debug("Event source: %s", event.source)
728
729
        # Remove old connection (aka cleanup) if it exists
730
        if self.get_connection_by_id(connection.id):
731
            self.remove_connection(connection.id)
732
733
        # Update connections with the new connection
734
        self.create_or_update_connection(connection)
735
736 1
    def add_new_switch(self, switch):
737
        """Add a new switch on the controller.
738
739
        Args:
740
            switch (Switch): A Switch object
741
        """
742
        self.switches[switch.dpid] = switch
743
744 1
    def _import_napp(self, username, napp_name):
745
        """Import a NApp module.
746
747
        Raises:
748
            FileNotFoundError: if NApp's main.py is not found.
749
            ModuleNotFoundError: if any NApp requirement is not installed.
750
751
        """
752
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
753
        path = os.path.join(self.options.napps, username, napp_name,
754
                            'main.py')
755
        napp_spec = spec_from_file_location(mod_name, path)
756
        napp_module = module_from_spec(napp_spec)
757
        sys.modules[napp_spec.name] = napp_module
758
        napp_spec.loader.exec_module(napp_module)
759
        return napp_module
760
761 1
    def load_napp(self, username, napp_name):
762
        """Load a single NApp.
763
764
        Args:
765
            username (str): NApp username (makes up NApp's path).
766
            napp_name (str): Name of the NApp to be loaded.
767
        """
768
        if (username, napp_name) in self.napps:
769
            message = 'NApp %s/%s was already loaded'
770
            self.log.warning(message, username, napp_name)
771
            return
772
773
        try:
774
            napp_module = self._import_napp(username, napp_name)
775
        except ModuleNotFoundError as err:
776
            self.log.error("Error loading NApp '%s/%s': %s",
777
                           username, napp_name, err)
778
            return
779
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
780
            msg = "NApp module not found, assuming it's a meta napp: %s"
781
            self.log.warning(msg, err.filename)
782
            return
783
784
        napp = napp_module.Main(controller=self)
785
786
        self.napps[(username, napp_name)] = napp
787
788
        napp.start()
789
        self.api_server.register_napp_endpoints(napp)
790
791
        # pylint: disable=protected-access
792
        for event, listeners in napp._listeners.items():
793
            self.events_listeners.setdefault(event, []).extend(listeners)
794
        # pylint: enable=protected-access
795
796 1
    def pre_install_napps(self, napps, enable=True):
797
        """Pre install and enable NApps.
798
799
        Before installing, it'll check if it's installed yet.
800
801
        Args:
802
            napps ([str]): List of NApps to be pre-installed and enabled.
803
        """
804
        all_napps = self.napps_manager.get_installed_napps()
805
        installed = [str(napp) for napp in all_napps]
806
        napps_diff = [napp for napp in napps if napp not in installed]
807
        for napp in napps_diff:
808
            self.napps_manager.install(napp, enable=enable)
809
810 1
    def load_napps(self):
811
        """Load all NApps enabled on the NApps dir."""
812
        for napp in self.napps_manager.get_enabled_napps():
813
            try:
814
                self.log.info("Loading NApp %s", napp.id)
815
                self.load_napp(napp.username, napp.name)
816
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
817
                self.log.error("Could not load NApp %s: %s",
818
                               napp.id, exception)
819
820 1
    def unload_napp(self, username, napp_name):
821
        """Unload a specific NApp.
822
823
        Args:
824
            username (str): NApp username.
825
            napp_name (str): Name of the NApp to be unloaded.
826
        """
827 1
        napp = self.napps.pop((username, napp_name), None)
828
829 1
        if napp is None:
830
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
831
        else:
832 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
833 1
            napp_id = NApp(username, napp_name).id
834 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
835 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
836
            # Call listener before removing it from events_listeners
837 1
            napp_shutdown_fn(event)
838
839
            # Remove rest endpoints from that napp
840 1
            self.api_server.remove_napp_endpoints(napp)
841
842
            # Removing listeners from that napp
843
            # pylint: disable=protected-access
844 1
            for event_type, napp_listeners in napp._listeners.items():
845
                event_listeners = self.events_listeners[event_type]
846
                for listener in napp_listeners:
847
                    event_listeners.remove(listener)
848
                if not event_listeners:
849
                    del self.events_listeners[event_type]
850
            # pylint: enable=protected-access
851
852 1
    def unload_napps(self):
853
        """Unload all loaded NApps that are not core NApps."""
854
        # list() is used here to avoid the error:
855
        # 'RuntimeError: dictionary changed size during iteration'
856
        # This is caused by looping over an dictionary while removing
857
        # items from it.
858
        for (username, napp_name) in list(self.napps.keys()):  # noqa
859
            self.unload_napp(username, napp_name)
860
861 1
    def reload_napp_module(self, username, napp_name, napp_file):
862
        """Reload a NApp Module."""
863
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
864
        try:
865
            napp_module = import_module(mod_name)
866
        except ModuleNotFoundError as err:
867
            self.log.error("Module '%s' not found", mod_name)
868
            raise
869
        try:
870
            napp_module = reload_module(napp_module)
871
        except ImportError as err:
872
            self.log.error("Error reloading NApp '%s/%s': %s",
873
                           username, napp_name, err)
874
            raise
875
876 1
    def reload_napp(self, username, napp_name):
877
        """Reload a NApp."""
878
        self.unload_napp(username, napp_name)
879
        try:
880
            self.reload_napp_module(username, napp_name, 'settings')
881
            self.reload_napp_module(username, napp_name, 'main')
882
        except (ModuleNotFoundError, ImportError):
883
            return 400
884
        self.log.info("NApp '%s/%s' successfully reloaded",
885
                      username, napp_name)
886
        self.load_napp(username, napp_name)
887
        return 200
888
889 1
    def rest_reload_napp(self, username, napp_name):
890
        """Request reload a NApp."""
891
        res = self.reload_napp(username, napp_name)
892
        return 'reloaded', res
893
894 1
    def rest_reload_all_napps(self):
895
        """Request reload all NApps."""
896
        for napp in self.napps:
897
            self.reload_napp(*napp)
898
        return 'reloaded', 200
899