Passed
Push — master ( ca41ce...934b33 )
by Humberto
02:43 queued 13s
created

Controller.set_switch_options()   B

Complexity

Conditions 8

Size

Total Lines 30
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 8.0069

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 2
dl 0
loc 30
ccs 20
cts 21
cp 0.9524
crap 8.0069
rs 7.3333
c 0
b 0
f 0
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
class Controller:
50
    """Main class of Kytos.
51
52
    The main responsibilities of this class are:
53
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
54
        - manage KytosNApps (install, load and unload);
55
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
56
        - manage which event should be sent to NApps methods;
57
        - manage the buffers handlers, considering one thread per handler.
58
    """
59
60
    # Created issue #568 for the disabled checks.
61
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
62 1
    def __init__(self, options=None, loop=None):
63
        """Init method of Controller class takes the parameters below.
64
65
        Args:
66
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
67
                instance of :class:`~kytos.core.config.KytosConfig` class.
68
        """
69 1
        if options is None:
70
            options = KytosConfig().options['daemon']
71
72 1
        self._loop = loop or asyncio.get_event_loop()
73 1
        self._pool = ThreadPoolExecutor(max_workers=1)
74
75
        #: dict: keep the main threads of the controller (buffers and handler)
76 1
        self._threads = {}
77
        #: KytosBuffers: KytosBuffer object with Controller buffers
78 1
        self.buffers = KytosBuffers(loop=self._loop)
79
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
80
        #:
81
        #: This dict stores all connections between the controller and the
82
        #: switches. The key for this dict is a tuple (ip, port). The content
83
        #: is another dict with the connection information.
84 1
        self.connections = {}
85
        #: dict: mapping of events and event listeners.
86
        #:
87
        #: The key of the dict is a KytosEvent (or a string that represent a
88
        #: regex to match against KytosEvents) and the value is a list of
89
        #: methods that will receive the referenced event
90 1
        self.events_listeners = {'kytos/core.connection.new':
91
                                 [self.new_connection]}
92
93
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
94
        #:
95
        #: The key is the napp name (string), while the value is the napp
96
        #: instance itself.
97 1
        self.napps = {}
98
        #: Object generated by ParseArgs on config.py file
99 1
        self.options = options
100
        #: KytosServer: Instance of KytosServer that will be listening to TCP
101
        #: connections.
102 1
        self.server = None
103
        #: dict: Current existing switches.
104
        #:
105
        #: The key is the switch dpid, while the value is a Switch object.
106 1
        self.switches = {}  # dpid: Switch()
107
108
        #: datetime.datetime: Time when the controller finished starting.
109 1
        self.started_at = None
110
111
        #: logging.Logger: Logger instance used by Kytos.
112 1
        self.log = None
113
114
        #: Observer that handle NApps when they are enabled or disabled.
115 1
        self.napp_dir_listener = NAppDirListener(self)
116
117 1
        self.napps_manager = NAppsManager(self)
118
119
        #: API Server used to expose rest endpoints.
120 1
        self.api_server = APIServer(__name__, self.options.listen,
121
                                    self.options.api_port,
122
                                    self.napps_manager, self.options.napps)
123
124 1
        self.auth = Auth(self)
125
126 1
        self._register_endpoints()
127
        #: Adding the napps 'enabled' directory into the PATH
128
        #: Now you can access the enabled napps with:
129
        #: from napps.<username>.<napp_name> import ?....
130 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
131
132 1
    def enable_logs(self):
133
        """Register kytos log and enable the logs."""
134 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
135 1
        LogManager.enable_websocket(self.api_server.server)
136 1
        self.log = logging.getLogger(__name__)
137
138 1
    @staticmethod
139
    def loggers():
140
        """List all logging Loggers.
141
142
        Return a list of Logger objects, with name and logging level.
143
        """
144 1
        return [logging.getLogger(name)
145
                for name in logging.root.manager.loggerDict
146
                if "kytos" in name]
147
148 1
    def toggle_debug(self, name=None):
149
        """Enable/disable logging debug messages to a given logger name.
150
151
        If the name parameter is not specified the debug will be
152
        enabled/disabled following the initial config file. It will decide
153
        to enable/disable using the 'kytos' name to find the current
154
        logger level.
155
        Obs: To disable the debug the logging will be set to NOTSET
156
157
        Args:
158
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
159
        """
160 1
        if name and name not in logging.root.manager.loggerDict:
161
            # A Logger name that is not declared in logging will raise an error
162
            # otherwise logging would create a new Logger.
163 1
            raise ValueError(f"Invalid logger name: {name}")
164
165 1
        if not name:
166
            # Logger name not specified.
167 1
            level = logging.getLogger('kytos').getEffectiveLevel()
168 1
            enable_debug = level != logging.DEBUG
169
170
            # Enable/disable default Loggers
171 1
            LogManager.load_config_file(self.options.logging, enable_debug)
172 1
            return
173
174
        # Get effective logger level for the name
175 1
        level = logging.getLogger(name).getEffectiveLevel()
176 1
        logger = logging.getLogger(name)
177
178 1
        if level == logging.DEBUG:
179
            # disable debug
180 1
            logger.setLevel(logging.NOTSET)
181
        else:
182
            # enable debug
183 1
            logger.setLevel(logging.DEBUG)
184
185 1
    def start(self, restart=False):
186
        """Create pidfile and call start_controller method."""
187 1
        self.enable_logs()
188 1
        if not restart:
189 1
            self.create_pidfile()
190 1
        self.start_controller()
191
192 1
    def create_pidfile(self):
193
        """Create a pidfile."""
194 1
        pid = os.getpid()
195
196
        # Creates directory if it doesn't exist
197
        # System can erase /var/run's content
198 1
        pid_folder = Path(self.options.pidfile).parent
199 1
        self.log.info(pid_folder)
200
201
        # Pylint incorrectly infers Path objects
202
        # https://github.com/PyCQA/pylint/issues/224
203
        # pylint: disable=no-member
204 1
        if not pid_folder.exists():
205
            pid_folder.mkdir()
206
            pid_folder.chmod(0o1777)
207
        # pylint: enable=no-member
208
209
        # Make sure the file is deleted when controller stops
210 1
        atexit.register(Path(self.options.pidfile).unlink)
211
212
        # Checks if a pidfile exists. Creates a new file.
213 1
        try:
214 1
            pidfile = open(self.options.pidfile, mode='x')
215 1
        except OSError:
216
            # This happens if there is a pidfile already.
217
            # We shall check if the process that created the pidfile is still
218
            # running.
219 1
            try:
220 1
                existing_file = open(self.options.pidfile, mode='r')
221 1
                old_pid = int(existing_file.read())
222 1
                os.kill(old_pid, 0)
223
                # If kill() doesn't return an error, there's a process running
224
                # with the same PID. We assume it is Kytos and quit.
225
                # Otherwise, overwrite the file and proceed.
226
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
227
                             "running. Aborting.")
228
                sys.exit(error_msg.format(self.options.pidfile))
229 1
            except OSError:
230 1
                try:
231 1
                    pidfile = open(self.options.pidfile, mode='w')
232
                except OSError as exception:
233
                    error_msg = "Failed to create pidfile {}: {}."
234
                    sys.exit(error_msg.format(self.options.pidfile, exception))
235
236
        # Identifies the process that created the pidfile.
237 1
        pidfile.write(str(pid))
238 1
        pidfile.close()
239
240 1
    def start_controller(self):
241
        """Start the controller.
242
243
        Starts the KytosServer (TCP Server) coroutine.
244
        Starts a thread for each buffer handler.
245
        Load the installed apps.
246
        """
247 1
        self.log.info("Starting Kytos - Kytos Controller")
248 1
        self.server = KytosServer((self.options.listen,
249
                                   int(self.options.port)),
250
                                  KytosServerProtocol,
251
                                  self,
252
                                  self.options.protocol_name)
253
254 1
        self.log.info("Starting TCP server: %s", self.server)
255 1
        self.server.serve_forever()
256
257 1
        def _stop_loop(_):
258
            loop = asyncio.get_event_loop()
259
            # print(_.result())
260
            threads = threading.enumerate()
261
            self.log.debug("%s threads before loop.stop: %s",
262
                           len(threads), threads)
263
            loop.stop()
264
265 1
        async def _run_api_server_thread(executor):
266
            log = logging.getLogger('kytos.core.controller.api_server_thread')
267
            log.debug('starting')
268
            # log.debug('creating tasks')
269
            loop = asyncio.get_event_loop()
270
            blocking_tasks = [
271
                loop.run_in_executor(executor, self.api_server.run)
272
            ]
273
            # log.debug('waiting for tasks')
274
            completed, pending = await asyncio.wait(blocking_tasks)
275
            # results = [t.result() for t in completed]
276
            # log.debug('results: {!r}'.format(results))
277
            log.debug('completed: %d, pending: %d',
278
                      len(completed), len(pending))
279
280 1
        task = self._loop.create_task(self.raw_event_handler())
281 1
        task = self._loop.create_task(self.msg_in_event_handler())
282 1
        task = self._loop.create_task(self.msg_out_event_handler())
283 1
        task = self._loop.create_task(self.app_event_handler())
284 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
285 1
        task.add_done_callback(_stop_loop)
286
287 1
        self.log.info("ThreadPool started: %s", self._pool)
288
289
        # ASYNC TODO: ensure all threads started correctly
290
        # This is critical, if any of them failed starting we should exit.
291
        # sys.exit(error_msg.format(thread, exception))
292
293 1
        self.log.info("Loading Kytos NApps...")
294 1
        self.napp_dir_listener.start()
295 1
        self.pre_install_napps(self.options.napps_pre_installed)
296 1
        self.load_napps()
297
298 1
        self.started_at = now()
299
300 1
    def _register_endpoints(self):
301
        """Register all rest endpoint served by kytos.
302
303
        -   Register APIServer endpoints
304
        -   Register WebUI endpoints
305
        -   Register ``/api/kytos/core/config`` endpoint
306
        """
307 1
        self.api_server.start_api()
308
        # Register controller endpoints as /api/kytos/core/...
309 1
        self.api_server.register_core_endpoint('config/',
310
                                               self.configuration_endpoint)
311 1
        self.api_server.register_core_endpoint('metadata/',
312
                                               Controller.metadata_endpoint)
313 1
        self.api_server.register_core_endpoint(
314
            'reload/<username>/<napp_name>/',
315
            self.rest_reload_napp)
316 1
        self.api_server.register_core_endpoint('reload/all',
317
                                               self.rest_reload_all_napps)
318 1
        self.auth.register_core_auth_services()
319
320 1
    def register_rest_endpoint(self, url, function, methods):
321
        """Deprecate in favor of @rest decorator."""
322 1
        self.api_server.register_rest_endpoint(url, function, methods)
323
324 1
    @classmethod
325
    def metadata_endpoint(cls):
326
        """Return the Kytos metadata.
327
328
        Returns:
329
            string: Json with current kytos metadata.
330
331
        """
332 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
333 1
        meta_file = open(meta_path).read()
334 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
335 1
        return json.dumps(metadata)
336
337 1
    def configuration_endpoint(self):
338
        """Return the configuration options used by Kytos.
339
340
        Returns:
341
            string: Json with current configurations used by kytos.
342
343
        """
344 1
        return json.dumps(self.options.__dict__)
345
346 1
    def restart(self, graceful=True):
347
        """Restart Kytos SDN Controller.
348
349
        Args:
350
            graceful(bool): Represents the way that Kytos will restart.
351
        """
352 1
        if self.started_at is not None:
353 1
            self.stop(graceful)
354 1
            self.__init__(self.options)
355
356 1
        self.start(restart=True)
357
358 1
    def stop(self, graceful=True):
359
        """Shutdown all services used by kytos.
360
361
        This method should:
362
            - stop all Websockets
363
            - stop the API Server
364
            - stop the Controller
365
        """
366 1
        if self.started_at:
367 1
            self.stop_controller(graceful)
368
369 1
    def stop_controller(self, graceful=True):
370
        """Stop the controller.
371
372
        This method should:
373
            - announce on the network that the controller will shutdown;
374
            - stop receiving incoming packages;
375
            - call the 'shutdown' method of each KytosNApp that is running;
376
            - finish reading the events on all buffers;
377
            - stop each running handler;
378
            - stop all running threads;
379
            - stop the KytosServer;
380
        """
381 1
        self.log.info("Stopping Kytos")
382
383 1
        self.buffers.send_stop_signal()
384 1
        self.api_server.stop_api_server()
385 1
        self.napp_dir_listener.stop()
386
387 1
        self.log.info("Stopping threadpool: %s", self._pool)
388
389 1
        threads = threading.enumerate()
390 1
        self.log.debug("%s threads before threadpool shutdown: %s",
391
                       len(threads), threads)
392
393 1
        self._pool.shutdown(wait=graceful)
394
395
        # self.server.socket.shutdown()
396
        # self.server.socket.close()
397
398
        # for thread in self._threads.values():
399
        #     self.log.info("Stopping thread: %s", thread.name)
400
        #     thread.join()
401
402
        # for thread in self._threads.values():
403
        #     while thread.is_alive():
404
        #         self.log.info("Thread is alive: %s", thread.name)
405
        #         pass
406
407 1
        self.started_at = None
408 1
        self.unload_napps()
409 1
        self.buffers = KytosBuffers()
410
411
        # ASYNC TODO: close connections
412
        # self.server.server_close()
413
414
        # Shutdown the TCP server and the main asyncio loop
415 1
        self.server.shutdown()
416
417 1
    def status(self):
418
        """Return status of Kytos Server.
419
420
        If the controller kytos is running this method will be returned
421
        "Running since 'Started_At'", otherwise "Stopped".
422
423
        Returns:
424
            string: String with kytos status.
425
426
        """
427 1
        if self.started_at:
428 1
            return "Running since %s" % self.started_at
429 1
        return "Stopped"
430
431 1
    def uptime(self):
432
        """Return the uptime of kytos server.
433
434
        This method should return:
435
            - 0 if Kytos Server is stopped.
436
            - (kytos.start_at - datetime.now) if Kytos Server is running.
437
438
        Returns:
439
           datetime.timedelta: The uptime interval.
440
441
        """
442 1
        return now() - self.started_at if self.started_at else 0
443
444 1
    def notify_listeners(self, event):
445
        """Send the event to the specified listeners.
446
447
        Loops over self.events_listeners matching (by regexp) the attribute
448
        name of the event with the keys of events_listeners. If a match occurs,
449
        then send the event to each registered listener.
450
451
        Args:
452
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
453
        """
454 1
        self.log.debug("looking for listeners for %s", event)
455 1
        for event_regex, listeners in dict(self.events_listeners).items():
456
            # self.log.debug("listeners found for %s: %r => %s", event,
457
            #                event_regex, [l.__qualname__ for l in listeners])
458
            # Do not match if the event has more characters
459
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
460 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
461 1
                event_regex += '$'
462 1
            if re.match(event_regex, event.name):
463
                # self.log.debug('Calling listeners for %s', event)
464 1
                for listener in listeners:
465 1
                    listener(event)
466
467 1
    async def raw_event_handler(self):
468
        """Handle raw events.
469
470
        Listen to the raw_buffer and send all its events to the
471
        corresponding listeners.
472
        """
473 1
        self.log.info("Raw Event Handler started")
474 1
        while True:
475 1
            event = await self.buffers.raw.aget()
476 1
            self.notify_listeners(event)
477 1
            self.log.debug("Raw Event handler called")
478
479 1
            if event.name == "kytos/core.shutdown":
480 1
                self.log.debug("Raw Event handler stopped")
481 1
                break
482
483 1
    async def msg_in_event_handler(self):
484
        """Handle msg_in events.
485
486
        Listen to the msg_in buffer and send all its events to the
487
        corresponding listeners.
488
        """
489 1
        self.log.info("Message In Event Handler started")
490 1
        while True:
491 1
            event = await self.buffers.msg_in.aget()
492 1
            self.notify_listeners(event)
493 1
            self.log.debug("Message In Event handler called")
494
495 1
            if event.name == "kytos/core.shutdown":
496 1
                self.log.debug("Message In Event handler stopped")
497 1
                break
498
499 1
    async def msg_out_event_handler(self):
500
        """Handle msg_out events.
501
502
        Listen to the msg_out buffer and send all its events to the
503
        corresponding listeners.
504
        """
505 1
        self.log.info("Message Out Event Handler started")
506 1
        while True:
507 1
            triggered_event = await self.buffers.msg_out.aget()
508
509 1
            if triggered_event.name == "kytos/core.shutdown":
510 1
                self.log.debug("Message Out Event handler stopped")
511 1
                break
512
513 1
            message = triggered_event.content['message']
514 1
            destination = triggered_event.destination
515 1
            if (destination and
516
                    not destination.state == ConnectionState.FINISHED):
517 1
                packet = message.pack()
518 1
                destination.send(packet)
519 1
                self.log.debug('Connection %s: OUT OFP, '
520
                               'version: %s, type: %s, xid: %s - %s',
521
                               destination.id,
522
                               message.header.version,
523
                               message.header.message_type,
524
                               message.header.xid,
525
                               packet.hex())
526 1
                self.notify_listeners(triggered_event)
527 1
                self.log.debug("Message Out Event handler called")
528
            else:
529
                self.log.info("connection closed. Cannot send message")
530
531 1
    async def app_event_handler(self):
532
        """Handle app events.
533
534
        Listen to the app buffer and send all its events to the
535
        corresponding listeners.
536
        """
537 1
        self.log.info("App Event Handler started")
538 1
        while True:
539 1
            event = await self.buffers.app.aget()
540 1
            self.notify_listeners(event)
541 1
            self.log.debug("App Event handler called")
542
543 1
            if event.name == "kytos/core.shutdown":
544 1
                self.log.debug("App Event handler stopped")
545 1
                break
546
547 1
    def get_interface_by_id(self, interface_id):
548
        """Find a Interface  with interface_id.
549
550
        Args:
551
            interface_id(str): Interface Identifier.
552
553
        Returns:
554
            Interface: Instance of Interface with the id given.
555
556
        """
557 1
        if interface_id is None:
558 1
            return None
559
560 1
        switch_id = ":".join(interface_id.split(":")[:-1])
561 1
        interface_number = int(interface_id.split(":")[-1])
562
563 1
        switch = self.switches.get(switch_id)
564
565 1
        if not switch:
566 1
            return None
567
568 1
        return switch.interfaces.get(interface_number, None)
569
570 1
    def get_switch_by_dpid(self, dpid):
571
        """Return a specific switch by dpid.
572
573
        Args:
574
            dpid (|DPID|): dpid object used to identify a switch.
575
576
        Returns:
577
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
578
579
        """
580 1
        return self.switches.get(dpid)
581
582 1
    def get_switch_or_create(self, dpid, connection):
583
        """Return switch or create it if necessary.
584
585
        Args:
586
            dpid (|DPID|): dpid object used to identify a switch.
587
            connection (:class:`~kytos.core.connection.Connection`):
588
                connection used by switch. If a switch has a connection that
589
                will be updated.
590
591
        Returns:
592
            :class:`~kytos.core.switch.Switch`: new or existent switch.
593
594
        """
595 1
        self.create_or_update_connection(connection)
596 1
        switch = self.get_switch_by_dpid(dpid)
597 1
        event_name = 'kytos/core.switch.'
598
599 1
        if switch is None:
600 1
            switch = Switch(dpid=dpid)
601 1
            self.add_new_switch(switch)
602 1
            event_name += 'new'
603
        else:
604 1
            event_name += 'reconnected'
605
606 1
        self.set_switch_options(dpid=dpid)
607 1
        event = KytosEvent(name=event_name, content={'switch': switch})
608
609 1
        old_connection = switch.connection
610 1
        switch.update_connection(connection)
611
612 1
        if old_connection is not connection:
613 1
            self.remove_connection(old_connection)
614
615 1
        self.buffers.app.put(event)
616
617 1
        return switch
618
619 1
    def set_switch_options(self, dpid):
620
        """Update the switch settings based on kytos.conf options.
621
622
        Args:
623
            dpid (str): dpid used to identify a switch.
624
625
        """
626 1
        switch = self.switches.get(dpid)
627 1
        if not switch:
628
            return
629
630 1
        vlan_pool = {}
631 1
        vlan_pool = self.options.vlan_pool
632 1
        if not vlan_pool:
633 1
            return
634
635 1
        if vlan_pool.get(dpid):
636 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
637 1
            for intf_num, port_list in vlan_pool[dpid].items():
638 1
                if not switch.interfaces.get((intf_num)):
639 1
                    vlan_ids = set()
640 1
                    for vlan_range in port_list:
641 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
642 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
643 1
                            vlan_ids.add(vlan_id)
644 1
                    intf_num = int(intf_num)
645 1
                    intf = Interface(name=intf_num, port_number=intf_num,
646
                                     switch=switch)
647 1
                    intf.set_available_tags(vlan_ids)
648 1
                    switch.update_interface(intf)
649
650 1
    def create_or_update_connection(self, connection):
651
        """Update a connection.
652
653
        Args:
654
            connection (:class:`~kytos.core.connection.Connection`):
655
                Instance of connection that will be updated.
656
        """
657 1
        self.connections[connection.id] = connection
658
659 1
    def get_connection_by_id(self, conn_id):
660
        """Return a existent connection by id.
661
662
        Args:
663
            id (int): id from a connection.
664
665
        Returns:
666
            :class:`~kytos.core.connection.Connection`: Instance of connection
667
                or None Type.
668
669
        """
670 1
        return self.connections.get(conn_id)
671
672 1
    def remove_connection(self, connection):
673
        """Close a existent connection and remove it.
674
675
        Args:
676
            connection (:class:`~kytos.core.connection.Connection`):
677
                Instance of connection that will be removed.
678
        """
679 1
        if connection is None:
680 1
            return False
681
682 1
        try:
683 1
            connection.close()
684 1
            del self.connections[connection.id]
685 1
        except KeyError:
686 1
            return False
687 1
        return True
688
689 1
    def remove_switch(self, switch):
690
        """Remove an existent switch.
691
692
        Args:
693
            switch (:class:`~kytos.core.switch.Switch`):
694
                Instance of switch that will be removed.
695
        """
696 1
        try:
697 1
            del self.switches[switch.dpid]
698 1
        except KeyError:
699 1
            return False
700 1
        return True
701
702 1
    def new_connection(self, event):
703
        """Handle a kytos/core.connection.new event.
704
705
        This method will read new connection event and store the connection
706
        (socket) into the connections attribute on the controller.
707
708
        It also clear all references to the connection since it is a new
709
        connection on the same ip:port.
710
711
        Args:
712
            event (~kytos.core.KytosEvent):
713
                The received event (``kytos/core.connection.new``) with the
714
                needed info.
715
        """
716 1
        self.log.info("Handling %s...", event)
717
718 1
        connection = event.source
719 1
        self.log.debug("Event source: %s", event.source)
720
721
        # Remove old connection (aka cleanup) if it exists
722 1
        if self.get_connection_by_id(connection.id):
723
            self.remove_connection(connection.id)
724
725
        # Update connections with the new connection
726 1
        self.create_or_update_connection(connection)
727
728 1
    def add_new_switch(self, switch):
729
        """Add a new switch on the controller.
730
731
        Args:
732
            switch (Switch): A Switch object
733
        """
734 1
        self.switches[switch.dpid] = switch
735
736 1
    def _import_napp(self, username, napp_name):
737
        """Import a NApp module.
738
739
        Raises:
740
            FileNotFoundError: if NApp's main.py is not found.
741
            ModuleNotFoundError: if any NApp requirement is not installed.
742
743
        """
744 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
745 1
        path = os.path.join(self.options.napps, username, napp_name,
746
                            'main.py')
747 1
        napp_spec = spec_from_file_location(mod_name, path)
748 1
        napp_module = module_from_spec(napp_spec)
749 1
        sys.modules[napp_spec.name] = napp_module
750 1
        napp_spec.loader.exec_module(napp_module)
751 1
        return napp_module
752
753 1
    def load_napp(self, username, napp_name):
754
        """Load a single NApp.
755
756
        Args:
757
            username (str): NApp username (makes up NApp's path).
758
            napp_name (str): Name of the NApp to be loaded.
759
        """
760 1
        if (username, napp_name) in self.napps:
761 1
            message = 'NApp %s/%s was already loaded'
762 1
            self.log.warning(message, username, napp_name)
763 1
            return
764
765 1
        try:
766 1
            napp_module = self._import_napp(username, napp_name)
767 1
        except ModuleNotFoundError as err:
768 1
            self.log.error("Error loading NApp '%s/%s': %s",
769
                           username, napp_name, err)
770 1
            return
771 1
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
772 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
773 1
            self.log.warning(msg, err.filename)
774 1
            return
775
776 1
        try:
777 1
            napp = napp_module.Main(controller=self)
778 1
        except:  # noqa pylint: disable=bare-except
779 1
            self.log.critical("NApp initialization failed: %s/%s",
780
                              username, napp_name, exc_info=True)
781 1
            return
782
783 1
        self.napps[(username, napp_name)] = napp
784
785 1
        napp.start()
786 1
        self.api_server.register_napp_endpoints(napp)
787
788
        # pylint: disable=protected-access
789 1
        for event, listeners in napp._listeners.items():
790
            self.events_listeners.setdefault(event, []).extend(listeners)
791
        # pylint: enable=protected-access
792
793 1
    def pre_install_napps(self, napps, enable=True):
794
        """Pre install and enable NApps.
795
796
        Before installing, it'll check if it's installed yet.
797
798
        Args:
799
            napps ([str]): List of NApps to be pre-installed and enabled.
800
        """
801 1
        all_napps = self.napps_manager.get_installed_napps()
802 1
        installed = [str(napp) for napp in all_napps]
803 1
        napps_diff = [napp for napp in napps if napp not in installed]
804 1
        for napp in napps_diff:
805 1
            self.napps_manager.install(napp, enable=enable)
806
807 1
    def load_napps(self):
808
        """Load all NApps enabled on the NApps dir."""
809 1
        for napp in self.napps_manager.get_enabled_napps():
810 1
            try:
811 1
                self.log.info("Loading NApp %s", napp.id)
812 1
                self.load_napp(napp.username, napp.name)
813
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
814
                self.log.error("Could not load NApp %s: %s",
815
                               napp.id, exception)
816
817 1
    def unload_napp(self, username, napp_name):
818
        """Unload a specific NApp.
819
820
        Args:
821
            username (str): NApp username.
822
            napp_name (str): Name of the NApp to be unloaded.
823
        """
824 1
        napp = self.napps.pop((username, napp_name), None)
825
826 1
        if napp is None:
827
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
828
        else:
829 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
830 1
            napp_id = NApp(username, napp_name).id
831 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
832 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
833
            # Call listener before removing it from events_listeners
834 1
            napp_shutdown_fn(event)
835
836
            # Remove rest endpoints from that napp
837 1
            self.api_server.remove_napp_endpoints(napp)
838
839
            # Removing listeners from that napp
840
            # pylint: disable=protected-access
841 1
            for event_type, napp_listeners in napp._listeners.items():
842
                event_listeners = self.events_listeners[event_type]
843
                for listener in napp_listeners:
844
                    event_listeners.remove(listener)
845
                if not event_listeners:
846
                    del self.events_listeners[event_type]
847
            # pylint: enable=protected-access
848
849 1
    def unload_napps(self):
850
        """Unload all loaded NApps that are not core NApps."""
851
        # list() is used here to avoid the error:
852
        # 'RuntimeError: dictionary changed size during iteration'
853
        # This is caused by looping over an dictionary while removing
854
        # items from it.
855
        for (username, napp_name) in list(self.napps.keys()):  # noqa
856
            self.unload_napp(username, napp_name)
857
858 1
    def reload_napp_module(self, username, napp_name, napp_file):
859
        """Reload a NApp Module."""
860 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
861 1
        try:
862 1
            napp_module = import_module(mod_name)
863 1
        except ModuleNotFoundError as err:
864 1
            self.log.error("Module '%s' not found", mod_name)
865 1
            raise
866 1
        try:
867 1
            napp_module = reload_module(napp_module)
868 1
        except ImportError as err:
869 1
            self.log.error("Error reloading NApp '%s/%s': %s",
870
                           username, napp_name, err)
871 1
            raise
872
873 1
    def reload_napp(self, username, napp_name):
874
        """Reload a NApp."""
875 1
        self.unload_napp(username, napp_name)
876 1
        try:
877 1
            self.reload_napp_module(username, napp_name, 'settings')
878 1
            self.reload_napp_module(username, napp_name, 'main')
879 1
        except (ModuleNotFoundError, ImportError):
880 1
            return 400
881 1
        self.log.info("NApp '%s/%s' successfully reloaded",
882
                      username, napp_name)
883 1
        self.load_napp(username, napp_name)
884 1
        return 200
885
886 1
    def rest_reload_napp(self, username, napp_name):
887
        """Request reload a NApp."""
888 1
        res = self.reload_napp(username, napp_name)
889 1
        return 'reloaded', res
890
891 1
    def rest_reload_all_napps(self):
892
        """Request reload all NApps."""
893 1
        for napp in self.napps:
894 1
            self.reload_napp(*napp)
895
        return 'reloaded', 200
896