Passed
Pull Request — master (#957)
by Rogerio
02:39
created

kytos.core.controller.Controller.loggers()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 0
dl 0
loc 12
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43 1
from kytos.core.switch import Switch
44
45 1
__all__ = ('Controller',)
46
47
48 1
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61 1
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67
        """
68 1
        if options is None:
69
            options = KytosConfig().options['daemon']
70
71 1
        self._loop = loop or asyncio.get_event_loop()
72 1
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74
        #: dict: keep the main threads of the controller (buffers and handler)
75 1
        self._threads = {}
76
        #: KytosBuffers: KytosBuffer object with Controller buffers
77 1
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82
        #: is another dict with the connection information.
83 1
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88
        #: methods that will receive the referenced event
89 1
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95
        #: instance itself.
96 1
        self.napps = {}
97
        #: Object generated by ParseArgs on config.py file
98 1
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100
        #: connections.
101 1
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104
        #: The key is the switch dpid, while the value is a Switch object.
105 1
        self.switches = {}  # dpid: Switch()
106
107
        #: datetime.datetime: Time when the controller finished starting.
108 1
        self.started_at = None
109
110
        #: logging.Logger: Logger instance used by Kytos.
111 1
        self.log = None
112
113
        #: Observer that handle NApps when they are enabled or disabled.
114 1
        self.napp_dir_listener = NAppDirListener(self)
115
116 1
        self.napps_manager = NAppsManager(self)
117
118
        #: API Server used to expose rest endpoints.
119 1
        self.api_server = APIServer(__name__, self.options.listen,
120
                                    self.options.api_port,
121
                                    self.napps_manager, self.options.napps)
122
123 1
        self._register_endpoints()
124
        #: Adding the napps 'enabled' directory into the PATH
125
        #: Now you can access the enabled napps with:
126
        #: from napps.<username>.<napp_name> import ?....
127 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
128
129 1
    def enable_logs(self):
130
        """Register kytos log and enable the logs."""
131 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
132 1
        LogManager.enable_websocket(self.api_server.server)
133 1
        self.log = logging.getLogger(__name__)
134
135 1
    @staticmethod
136
    def loggers():
137
        """List all logging Loggers.
138
139
        Return a list of Logger objects, with name and logging level.
140
        """
141 1
        loggers = [
142
            logging.getLogger(name)
143
            for name in logging.root.manager.loggerDict
144
            if "kytos" in name
145
        ]
146 1
        return loggers
147
148 1
    def toggle_debug(self, name=None):
149
        """Enable/disable logging debug messages to given logger name.
150
151
        If the name parameter is not specified the debug is activated to all
152
        loggers, including the root Logger.
153
        To disable the debug the logging will be set to NOTSET
154
155
        Args:
156
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
157
        """
158 1
        if not name:
159 1
            level = logging.getLogger('kytos').getEffectiveLevel()
160 1
            activate_debug = level != logging.DEBUG
161
162
            # Activating all default Loggers
163 1
            LogManager.load_config_file(self.options.logging, activate_debug)
164 1
            return
165
166 1
        if name not in logging.root.manager.loggerDict:
167
            # Logger name not specified.
168
            # It will raise an error otherwise logging would create a new
169
            # Logger with the specified name.
170 1
            raise ValueError("Invalid logger name.")
171
172
        # Change effective logger level
173 1
        level = logging.getLogger(name).getEffectiveLevel()
174
175 1
        if level == logging.DEBUG:
176 1
            self._debug_off(name)
177
        else:
178 1
            self._debug_on(name)
179
180 1
    @classmethod
181
    def _debug_on(cls, name):
182
        """Turn on the debug.
183
184
        Turn on the logging debug to a specific Logger name, or to all
185
        Loggers, including the root Logger.
186
187
        Args:
188
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
189
        """
190 1
        if name:
191
            # If logger name do not exist, raise an error.
192
            # otherwise the getLogger will create another Logger
193 1
            logger = logging.getLogger(name)
194 1
            logger.setLevel(logging.DEBUG)
195
196 1
    @classmethod
197
    def _debug_off(cls, name):
198
        """Turn on the debug.
199
200
        Turn off the logging debug to a specific Logger name, or to all
201
        Loggers, reloading the logging configuration file defaults.
202
203
        Args:
204
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
205
        """
206 1
        if name:
207
            # If logger name do not exist, raise an error.
208
            # otherwise the getLogger will create another Logger
209 1
            logger = logging.getLogger(name)
210 1
            logger.setLevel(logging.NOTSET)
211
212 1
    def start(self, restart=False):
213
        """Create pidfile and call start_controller method."""
214
        self.enable_logs()
215
        if not restart:
216
            self.create_pidfile()
217
        self.start_controller()
218
219 1
    def create_pidfile(self):
220
        """Create a pidfile."""
221
        pid = os.getpid()
222
223
        # Creates directory if it doesn't exist
224
        # System can erase /var/run's content
225
        pid_folder = Path(self.options.pidfile).parent
226
        self.log.info(pid_folder)
227
228
        # Pylint incorrectly infers Path objects
229
        # https://github.com/PyCQA/pylint/issues/224
230
        # pylint: disable=no-member
231
        if not pid_folder.exists():
232
            pid_folder.mkdir()
233
            pid_folder.chmod(0o1777)
234
        # pylint: enable=no-member
235
236
        # Make sure the file is deleted when controller stops
237
        atexit.register(Path(self.options.pidfile).unlink)
238
239
        # Checks if a pidfile exists. Creates a new file.
240
        try:
241
            pidfile = open(self.options.pidfile, mode='x')
242
        except OSError:
243
            # This happens if there is a pidfile already.
244
            # We shall check if the process that created the pidfile is still
245
            # running.
246
            try:
247
                existing_file = open(self.options.pidfile, mode='r')
248
                old_pid = int(existing_file.read())
249
                os.kill(old_pid, 0)
250
                # If kill() doesn't return an error, there's a process running
251
                # with the same PID. We assume it is Kytos and quit.
252
                # Otherwise, overwrite the file and proceed.
253
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
254
                             "running. Aborting.")
255
                sys.exit(error_msg.format(self.options.pidfile))
256
            except OSError:
257
                try:
258
                    pidfile = open(self.options.pidfile, mode='w')
259
                except OSError as exception:
260
                    error_msg = "Failed to create pidfile {}: {}."
261
                    sys.exit(error_msg.format(self.options.pidfile, exception))
262
263
        # Identifies the process that created the pidfile.
264
        pidfile.write(str(pid))
265
        pidfile.close()
266
267 1
    def start_controller(self):
268
        """Start the controller.
269
270
        Starts the KytosServer (TCP Server) coroutine.
271
        Starts a thread for each buffer handler.
272
        Load the installed apps.
273
        """
274
        self.log.info("Starting Kytos - Kytos Controller")
275
        self.server = KytosServer((self.options.listen,
276
                                   int(self.options.port)),
277
                                  KytosServerProtocol,
278
                                  self,
279
                                  self.options.protocol_name)
280
281
        self.log.info("Starting TCP server: %s", self.server)
282
        self.server.serve_forever()
283
284
        def _stop_loop(_):
285
            loop = asyncio.get_event_loop()
286
            # print(_.result())
287
            threads = threading.enumerate()
288
            self.log.debug("%s threads before loop.stop: %s",
289
                           len(threads), threads)
290
            loop.stop()
291
292
        async def _run_api_server_thread(executor):
293
            log = logging.getLogger('kytos.core.controller.api_server_thread')
294
            log.debug('starting')
295
            # log.debug('creating tasks')
296
            loop = asyncio.get_event_loop()
297
            blocking_tasks = [
298
                loop.run_in_executor(executor, self.api_server.run)
299
            ]
300
            # log.debug('waiting for tasks')
301
            completed, pending = await asyncio.wait(blocking_tasks)
302
            # results = [t.result() for t in completed]
303
            # log.debug('results: {!r}'.format(results))
304
            log.debug('completed: %d, pending: %d',
305
                      len(completed), len(pending))
306
307
        task = self._loop.create_task(self.raw_event_handler())
308
        task = self._loop.create_task(self.msg_in_event_handler())
309
        task = self._loop.create_task(self.msg_out_event_handler())
310
        task = self._loop.create_task(self.app_event_handler())
311
        task = self._loop.create_task(_run_api_server_thread(self._pool))
312
        task.add_done_callback(_stop_loop)
313
314
        self.log.info("ThreadPool started: %s", self._pool)
315
316
        # ASYNC TODO: ensure all threads started correctly
317
        # This is critical, if any of them failed starting we should exit.
318
        # sys.exit(error_msg.format(thread, exception))
319
320
        self.log.info("Loading Kytos NApps...")
321
        self.napp_dir_listener.start()
322
        self.pre_install_napps(self.options.napps_pre_installed)
323
        self.load_napps()
324
325
        self.started_at = now()
326
327 1
    def _register_endpoints(self):
328
        """Register all rest endpoint served by kytos.
329
330
        -   Register APIServer endpoints
331
        -   Register WebUI endpoints
332
        -   Register ``/api/kytos/core/config`` endpoint
333
        """
334 1
        self.api_server.start_api()
335
        # Register controller endpoints as /api/kytos/core/...
336 1
        self.api_server.register_core_endpoint('config/',
337
                                               self.configuration_endpoint)
338
339 1
        self.api_server.register_core_endpoint(
340
            'reload/<username>/<napp_name>/',
341
            self.rest_reload_napp)
342 1
        self.api_server.register_core_endpoint('reload/all',
343
                                               self.rest_reload_all_napps)
344
345 1
    def register_rest_endpoint(self, url, function, methods):
346
        """Deprecate in favor of @rest decorator."""
347 1
        self.api_server.register_rest_endpoint(url, function, methods)
348
349 1
    def configuration_endpoint(self):
350
        """Return the configuration options used by Kytos.
351
352
        Returns:
353
            string: Json with current configurations used by kytos.
354
355
        """
356 1
        return json.dumps(self.options.__dict__)
357
358 1
    def restart(self, graceful=True):
359
        """Restart Kytos SDN Controller.
360
361
        Args:
362
            graceful(bool): Represents the way that Kytos will restart.
363
        """
364
        if self.started_at is not None:
365
            self.stop(graceful)
366
            self.__init__(self.options)
367
368
        self.start(restart=True)
369
370 1
    def stop(self, graceful=True):
371
        """Shutdown all services used by kytos.
372
373
        This method should:
374
            - stop all Websockets
375
            - stop the API Server
376
            - stop the Controller
377
        """
378
        if self.started_at:
379
            self.stop_controller(graceful)
380
381 1
    def stop_controller(self, graceful=True):
382
        """Stop the controller.
383
384
        This method should:
385
            - announce on the network that the controller will shutdown;
386
            - stop receiving incoming packages;
387
            - call the 'shutdown' method of each KytosNApp that is running;
388
            - finish reading the events on all buffers;
389
            - stop each running handler;
390
            - stop all running threads;
391
            - stop the KytosServer;
392
        """
393
        self.log.info("Stopping Kytos")
394
395
        self.buffers.send_stop_signal()
396
        self.api_server.stop_api_server()
397
        self.napp_dir_listener.stop()
398
399
        self.log.info("Stopping threadpool: %s", self._pool)
400
401
        threads = threading.enumerate()
402
        self.log.debug("%s threads before threadpool shutdown: %s",
403
                       len(threads), threads)
404
405
        self._pool.shutdown(wait=graceful)
406
407
        # self.server.socket.shutdown()
408
        # self.server.socket.close()
409
410
        # for thread in self._threads.values():
411
        #     self.log.info("Stopping thread: %s", thread.name)
412
        #     thread.join()
413
414
        # for thread in self._threads.values():
415
        #     while thread.is_alive():
416
        #         self.log.info("Thread is alive: %s", thread.name)
417
        #         pass
418
419
        self.started_at = None
420
        self.unload_napps()
421
        self.buffers = KytosBuffers()
422
423
        # ASYNC TODO: close connections
424
        # self.server.server_close()
425
426
        # Shutdown the TCP server and the main asyncio loop
427
        self.server.shutdown()
428
429 1
    def status(self):
430
        """Return status of Kytos Server.
431
432
        If the controller kytos is running this method will be returned
433
        "Running since 'Started_At'", otherwise "Stopped".
434
435
        Returns:
436
            string: String with kytos status.
437
438
        """
439
        if self.started_at:
440
            return "Running since %s" % self.started_at
441
        return "Stopped"
442
443 1
    def uptime(self):
444
        """Return the uptime of kytos server.
445
446
        This method should return:
447
            - 0 if Kytos Server is stopped.
448
            - (kytos.start_at - datetime.now) if Kytos Server is running.
449
450
        Returns:
451
           datetime.timedelta: The uptime interval.
452
453
        """
454
        return now() - self.started_at if self.started_at else 0
455
456 1
    def notify_listeners(self, event):
457
        """Send the event to the specified listeners.
458
459
        Loops over self.events_listeners matching (by regexp) the attribute
460
        name of the event with the keys of events_listeners. If a match occurs,
461
        then send the event to each registered listener.
462
463
        Args:
464
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
465
        """
466
        self.log.debug("looking for listeners for %s", event)
467
        for event_regex, listeners in dict(self.events_listeners).items():
468
            # self.log.debug("listeners found for %s: %r => %s", event,
469
            #                event_regex, [l.__qualname__ for l in listeners])
470
            # Do not match if the event has more characters
471
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
472
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
473
                event_regex += '$'
474
            if re.match(event_regex, event.name):
475
                # self.log.debug('Calling listeners for %s', event)
476
                for listener in listeners:
477
                    listener(event)
478
479 1
    async def raw_event_handler(self):
480
        """Handle raw events.
481
482
        Listen to the raw_buffer and send all its events to the
483
        corresponding listeners.
484
        """
485
        self.log.info("Raw Event Handler started")
486
        while True:
487
            event = await self.buffers.raw.aget()
488
            self.notify_listeners(event)
489
            self.log.debug("Raw Event handler called")
490
491
            if event.name == "kytos/core.shutdown":
492
                self.log.debug("Raw Event handler stopped")
493
                break
494
495 1
    async def msg_in_event_handler(self):
496
        """Handle msg_in events.
497
498
        Listen to the msg_in buffer and send all its events to the
499
        corresponding listeners.
500
        """
501
        self.log.info("Message In Event Handler started")
502
        while True:
503
            event = await self.buffers.msg_in.aget()
504
            self.notify_listeners(event)
505
            self.log.debug("Message In Event handler called")
506
507
            if event.name == "kytos/core.shutdown":
508
                self.log.debug("Message In Event handler stopped")
509
                break
510
511 1
    async def msg_out_event_handler(self):
512
        """Handle msg_out events.
513
514
        Listen to the msg_out buffer and send all its events to the
515
        corresponding listeners.
516
        """
517
        self.log.info("Message Out Event Handler started")
518
        while True:
519
            triggered_event = await self.buffers.msg_out.aget()
520
521
            if triggered_event.name == "kytos/core.shutdown":
522
                self.log.debug("Message Out Event handler stopped")
523
                break
524
525
            message = triggered_event.content['message']
526
            destination = triggered_event.destination
527
            if (destination and
528
                    not destination.state == ConnectionState.FINISHED):
529
                packet = message.pack()
530
                destination.send(packet)
531
                self.log.debug('Connection %s: OUT OFP, '
532
                               'version: %s, type: %s, xid: %s - %s',
533
                               destination.id,
534
                               message.header.version,
535
                               message.header.message_type,
536
                               message.header.xid,
537
                               packet.hex())
538
                self.notify_listeners(triggered_event)
539
                self.log.debug("Message Out Event handler called")
540
            else:
541
                self.log.info("connection closed. Cannot send message")
542
543 1
    async def app_event_handler(self):
544
        """Handle app events.
545
546
        Listen to the app buffer and send all its events to the
547
        corresponding listeners.
548
        """
549
        self.log.info("App Event Handler started")
550
        while True:
551
            event = await self.buffers.app.aget()
552
            self.notify_listeners(event)
553
            self.log.debug("App Event handler called")
554
555
            if event.name == "kytos/core.shutdown":
556
                self.log.debug("App Event handler stopped")
557
                break
558
559 1
    def get_interface_by_id(self, interface_id):
560
        """Find a Interface  with interface_id.
561
562
        Args:
563
            interface_id(str): Interface Identifier.
564
565
        Returns:
566
            Interface: Instance of Interface with the id given.
567
568
        """
569
        if interface_id is None:
570
            return None
571
572
        switch_id = ":".join(interface_id.split(":")[:-1])
573
        interface_number = int(interface_id.split(":")[-1])
574
575
        switch = self.switches.get(switch_id)
576
577
        if not switch:
578
            return None
579
580
        return switch.interfaces.get(interface_number, None)
581
582 1
    def get_switch_by_dpid(self, dpid):
583
        """Return a specific switch by dpid.
584
585
        Args:
586
            dpid (|DPID|): dpid object used to identify a switch.
587
588
        Returns:
589
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
590
591
        """
592 1
        return self.switches.get(dpid)
593
594 1
    def get_switch_or_create(self, dpid, connection):
595
        """Return switch or create it if necessary.
596
597
        Args:
598
            dpid (|DPID|): dpid object used to identify a switch.
599
            connection (:class:`~kytos.core.connection.Connection`):
600
                connection used by switch. If a switch has a connection that
601
                will be updated.
602
603
        Returns:
604
            :class:`~kytos.core.switch.Switch`: new or existent switch.
605
606
        """
607 1
        self.create_or_update_connection(connection)
608 1
        switch = self.get_switch_by_dpid(dpid)
609 1
        event_name = 'kytos/core.switch.'
610
611 1
        if switch is None:
612
            switch = Switch(dpid=dpid)
613
            self.add_new_switch(switch)
614
            event_name += 'new'
615
        else:
616 1
            event_name += 'reconnected'
617
618 1
        self.set_switch_options(dpid=dpid)
619 1
        event = KytosEvent(name=event_name, content={'switch': switch})
620
621 1
        old_connection = switch.connection
622 1
        switch.update_connection(connection)
623
624 1
        if old_connection is not connection:
625
            self.remove_connection(old_connection)
626
627 1
        self.buffers.app.put(event)
628
629 1
        return switch
630
631 1
    def set_switch_options(self, dpid):
632
        """Update the switch settings based on kytos.conf options.
633
634
        Args:
635
            dpid (str): dpid used to identify a switch.
636
637
        """
638 1
        switch = self.switches.get(dpid)
639 1
        if not switch:
640
            return
641
642 1
        vlan_pool = {}
643 1
        try:
644 1
            vlan_pool = json.loads(self.options.vlan_pool)
645 1
            if not vlan_pool:
646
                return
647
        except (TypeError, json.JSONDecodeError) as err:
648
            self.log.error("Invalid vlan_pool settings: %s", err)
649
650 1
        if vlan_pool.get(dpid):
651 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
652 1
            for intf_num, port_list in vlan_pool[dpid].items():
653 1
                if not switch.interfaces.get((intf_num)):
654 1
                    vlan_ids = set()
655 1
                    for vlan_range in port_list:
656 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
657 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
658 1
                            vlan_ids.add(vlan_id)
659 1
                    intf_num = int(intf_num)
660 1
                    intf = Interface(name=intf_num, port_number=intf_num,
661
                                     switch=switch)
662 1
                    intf.set_available_tags(vlan_ids)
663 1
                    switch.update_interface(intf)
664
665 1
    def create_or_update_connection(self, connection):
666
        """Update a connection.
667
668
        Args:
669
            connection (:class:`~kytos.core.connection.Connection`):
670
                Instance of connection that will be updated.
671
        """
672 1
        self.connections[connection.id] = connection
673
674 1
    def get_connection_by_id(self, conn_id):
675
        """Return a existent connection by id.
676
677
        Args:
678
            id (int): id from a connection.
679
680
        Returns:
681
            :class:`~kytos.core.connection.Connection`: Instance of connection
682
                or None Type.
683
684
        """
685
        return self.connections.get(conn_id)
686
687 1
    def remove_connection(self, connection):
688
        """Close a existent connection and remove it.
689
690
        Args:
691
            connection (:class:`~kytos.core.connection.Connection`):
692
                Instance of connection that will be removed.
693
        """
694
        if connection is None:
695
            return False
696
697
        try:
698
            connection.close()
699
            del self.connections[connection.id]
700
        except KeyError:
701
            return False
702
        return True
703
704 1
    def remove_switch(self, switch):
705
        """Remove an existent switch.
706
707
        Args:
708
            switch (:class:`~kytos.core.switch.Switch`):
709
                Instance of switch that will be removed.
710
        """
711
        try:
712
            del self.switches[switch.dpid]
713
        except KeyError:
714
            return False
715
        return True
716
717 1
    def new_connection(self, event):
718
        """Handle a kytos/core.connection.new event.
719
720
        This method will read new connection event and store the connection
721
        (socket) into the connections attribute on the controller.
722
723
        It also clear all references to the connection since it is a new
724
        connection on the same ip:port.
725
726
        Args:
727
            event (~kytos.core.KytosEvent):
728
                The received event (``kytos/core.connection.new``) with the
729
                needed info.
730
        """
731
        self.log.info("Handling %s...", event)
732
733
        connection = event.source
734
        self.log.debug("Event source: %s", event.source)
735
736
        # Remove old connection (aka cleanup) if it exists
737
        if self.get_connection_by_id(connection.id):
738
            self.remove_connection(connection.id)
739
740
        # Update connections with the new connection
741
        self.create_or_update_connection(connection)
742
743 1
    def add_new_switch(self, switch):
744
        """Add a new switch on the controller.
745
746
        Args:
747
            switch (Switch): A Switch object
748
        """
749
        self.switches[switch.dpid] = switch
750
751 1
    def _import_napp(self, username, napp_name):
752
        """Import a NApp module.
753
754
        Raises:
755
            FileNotFoundError: if NApp's main.py is not found.
756
            ModuleNotFoundError: if any NApp requirement is not installed.
757
758
        """
759
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
760
        path = os.path.join(self.options.napps, username, napp_name,
761
                            'main.py')
762
        napp_spec = spec_from_file_location(mod_name, path)
763
        napp_module = module_from_spec(napp_spec)
764
        sys.modules[napp_spec.name] = napp_module
765
        napp_spec.loader.exec_module(napp_module)
766
        return napp_module
767
768 1
    def load_napp(self, username, napp_name):
769
        """Load a single NApp.
770
771
        Args:
772
            username (str): NApp username (makes up NApp's path).
773
            napp_name (str): Name of the NApp to be loaded.
774
        """
775
        if (username, napp_name) in self.napps:
776
            message = 'NApp %s/%s was already loaded'
777
            self.log.warning(message, username, napp_name)
778
            return
779
780
        try:
781
            napp_module = self._import_napp(username, napp_name)
782
        except ModuleNotFoundError as err:
783
            self.log.error("Error loading NApp '%s/%s': %s",
784
                           username, napp_name, err)
785
            return
786
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
787
            msg = "NApp module not found, assuming it's a meta napp: %s"
788
            self.log.warning(msg, err.filename)
789
            return
790
791
        napp = napp_module.Main(controller=self)
792
793
        self.napps[(username, napp_name)] = napp
794
795
        napp.start()
796
        self.api_server.register_napp_endpoints(napp)
797
798
        # pylint: disable=protected-access
799
        for event, listeners in napp._listeners.items():
800
            self.events_listeners.setdefault(event, []).extend(listeners)
801
        # pylint: enable=protected-access
802
803 1
    def pre_install_napps(self, napps, enable=True):
804
        """Pre install and enable NApps.
805
806
        Before installing, it'll check if it's installed yet.
807
808
        Args:
809
            napps ([str]): List of NApps to be pre-installed and enabled.
810
        """
811
        all_napps = self.napps_manager.get_installed_napps()
812
        installed = [str(napp) for napp in all_napps]
813
        napps_diff = [napp for napp in napps if napp not in installed]
814
        for napp in napps_diff:
815
            self.napps_manager.install(napp, enable=enable)
816
817 1
    def load_napps(self):
818
        """Load all NApps enabled on the NApps dir."""
819
        for napp in self.napps_manager.get_enabled_napps():
820
            try:
821
                self.log.info("Loading NApp %s", napp.id)
822
                self.load_napp(napp.username, napp.name)
823
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
824
                self.log.error("Could not load NApp %s: %s",
825
                               napp.id, exception)
826
827 1
    def unload_napp(self, username, napp_name):
828
        """Unload a specific NApp.
829
830
        Args:
831
            username (str): NApp username.
832
            napp_name (str): Name of the NApp to be unloaded.
833
        """
834 1
        napp = self.napps.pop((username, napp_name), None)
835
836 1
        if napp is None:
837
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
838
        else:
839 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
840 1
            napp_id = NApp(username, napp_name).id
841 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
842 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
843
            # Call listener before removing it from events_listeners
844 1
            napp_shutdown_fn(event)
845
846
            # Remove rest endpoints from that napp
847 1
            self.api_server.remove_napp_endpoints(napp)
848
849
            # Removing listeners from that napp
850
            # pylint: disable=protected-access
851 1
            for event_type, napp_listeners in napp._listeners.items():
852
                event_listeners = self.events_listeners[event_type]
853
                for listener in napp_listeners:
854
                    event_listeners.remove(listener)
855
                if not event_listeners:
856
                    del self.events_listeners[event_type]
857
            # pylint: enable=protected-access
858
859 1
    def unload_napps(self):
860
        """Unload all loaded NApps that are not core NApps."""
861
        # list() is used here to avoid the error:
862
        # 'RuntimeError: dictionary changed size during iteration'
863
        # This is caused by looping over an dictionary while removing
864
        # items from it.
865
        for (username, napp_name) in list(self.napps.keys()):  # noqa
866
            self.unload_napp(username, napp_name)
867
868 1
    def reload_napp_module(self, username, napp_name, napp_file):
869
        """Reload a NApp Module."""
870
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
871
        try:
872
            napp_module = import_module(mod_name)
873
        except ModuleNotFoundError as err:
874
            self.log.error("Module '%s' not found", mod_name)
875
            raise
876
        try:
877
            napp_module = reload_module(napp_module)
878
        except ImportError as err:
879
            self.log.error("Error reloading NApp '%s/%s': %s",
880
                           username, napp_name, err)
881
            raise
882
883 1
    def reload_napp(self, username, napp_name):
884
        """Reload a NApp."""
885
        self.unload_napp(username, napp_name)
886
        try:
887
            self.reload_napp_module(username, napp_name, 'settings')
888
            self.reload_napp_module(username, napp_name, 'main')
889
        except (ModuleNotFoundError, ImportError):
890
            return 400
891
        self.log.info("NApp '%s/%s' successfully reloaded",
892
                      username, napp_name)
893
        self.load_napp(username, napp_name)
894
        return 200
895
896 1
    def rest_reload_napp(self, username, napp_name):
897
        """Request reload a NApp."""
898
        res = self.reload_napp(username, napp_name)
899
        return 'reloaded', res
900
901 1
    def rest_reload_all_napps(self):
902
        """Request reload all NApps."""
903
        for napp in self.napps:
904
            self.reload_napp(*napp)
905
        return 'reloaded', 200
906