Passed
Push — master ( 925b90...ee7e16 )
by Humberto
02:25 queued 17s
created

kytos.core.controller   F

Complexity

Total Complexity 96

Size/Duplication

Total Lines 847
Duplicated Lines 0 %

Test Coverage

Coverage 37.23%

Importance

Changes 0
Metric Value
eloc 407
dl 0
loc 847
ccs 140
cts 376
cp 0.3723
rs 2
c 0
b 0
f 0
wmc 96

39 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.start() 0 6 2
B Controller.create_pidfile() 0 47 5
A Controller.start_controller() 0 59 1
A Controller.__init__() 0 69 2
A Controller.enable_logs() 0 5 1
A Controller.rest_reload_napp() 0 4 1
B Controller.notify_listeners() 0 22 6
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
A Controller.get_switch_or_create() 0 36 3
A Controller.raw_event_handler() 0 15 3
A Controller.configuration_endpoint() 0 8 1
A Controller.msg_in_event_handler() 0 15 3
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.register_rest_endpoint() 0 3 1
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
C Controller.set_switch_options() 0 33 9
A Controller.metadata_endpoint() 0 12 1
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 31 5
A Controller.stop_controller() 0 47 1
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller.pre_install_napps() 0 13 2
A Controller.status() 0 13 2
A Controller.app_event_handler() 0 15 3
A Controller._register_endpoints() 0 19 1
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 33 5
A Controller.unload_napp() 0 30 5

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.auth import Auth
34 1
from kytos.core.buffers import KytosBuffers
35 1
from kytos.core.config import KytosConfig
36 1
from kytos.core.connection import ConnectionState
37 1
from kytos.core.events import KytosEvent
38 1
from kytos.core.helpers import now
39 1
from kytos.core.interface import Interface
40 1
from kytos.core.logs import LogManager
41 1
from kytos.core.napps.base import NApp
42 1
from kytos.core.napps.manager import NAppsManager
43 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
44 1
from kytos.core.switch import Switch
45
46 1
__all__ = ('Controller',)
47
48
49 1
class Controller:
50
    """Main class of Kytos.
51
52
    The main responsibilities of this class are:
53
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
54
        - manage KytosNApps (install, load and unload);
55
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
56
        - manage which event should be sent to NApps methods;
57
        - manage the buffers handlers, considering one thread per handler.
58
    """
59
60
    # Created issue #568 for the disabled checks.
61
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
62 1
    def __init__(self, options=None, loop=None):
63
        """Init method of Controller class takes the parameters below.
64
65
        Args:
66
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
67
                instance of :class:`~kytos.core.config.KytosConfig` class.
68
        """
69 1
        if options is None:
70
            options = KytosConfig().options['daemon']
71
72 1
        self._loop = loop or asyncio.get_event_loop()
73 1
        self._pool = ThreadPoolExecutor(max_workers=1)
74
75
        #: dict: keep the main threads of the controller (buffers and handler)
76 1
        self._threads = {}
77
        #: KytosBuffers: KytosBuffer object with Controller buffers
78 1
        self.buffers = KytosBuffers(loop=self._loop)
79
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
80
        #:
81
        #: This dict stores all connections between the controller and the
82
        #: switches. The key for this dict is a tuple (ip, port). The content
83
        #: is another dict with the connection information.
84 1
        self.connections = {}
85
        #: dict: mapping of events and event listeners.
86
        #:
87
        #: The key of the dict is a KytosEvent (or a string that represent a
88
        #: regex to match agains KytosEvents) and the value is a list of
89
        #: methods that will receive the referenced event
90 1
        self.events_listeners = {'kytos/core.connection.new':
91
                                 [self.new_connection]}
92
93
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
94
        #:
95
        #: The key is the napp name (string), while the value is the napp
96
        #: instance itself.
97 1
        self.napps = {}
98
        #: Object generated by ParseArgs on config.py file
99 1
        self.options = options
100
        #: KytosServer: Instance of KytosServer that will be listening to TCP
101
        #: connections.
102 1
        self.server = None
103
        #: dict: Current existing switches.
104
        #:
105
        #: The key is the switch dpid, while the value is a Switch object.
106 1
        self.switches = {}  # dpid: Switch()
107
108
        #: datetime.datetime: Time when the controller finished starting.
109 1
        self.started_at = None
110
111
        #: logging.Logger: Logger instance used by Kytos.
112 1
        self.log = None
113
114
        #: Observer that handle NApps when they are enabled or disabled.
115 1
        self.napp_dir_listener = NAppDirListener(self)
116
117 1
        self.napps_manager = NAppsManager(self)
118
119
        #: API Server used to expose rest endpoints.
120 1
        self.api_server = APIServer(__name__, self.options.listen,
121
                                    self.options.api_port,
122
                                    self.napps_manager, self.options.napps)
123
124 1
        self.auth = Auth(self)
125
126 1
        self._register_endpoints()
127
        #: Adding the napps 'enabled' directory into the PATH
128
        #: Now you can access the enabled napps with:
129
        #: from napps.<username>.<napp_name> import ?....
130 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
131
132 1
    def enable_logs(self):
133
        """Register kytos log and enable the logs."""
134 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
135 1
        LogManager.enable_websocket(self.api_server.server)
136 1
        self.log = logging.getLogger(__name__)
137
138 1
    def start(self, restart=False):
139
        """Create pidfile and call start_controller method."""
140
        self.enable_logs()
141
        if not restart:
142
            self.create_pidfile()
143
        self.start_controller()
144
145 1
    def create_pidfile(self):
146
        """Create a pidfile."""
147
        pid = os.getpid()
148
149
        # Creates directory if it doesn't exist
150
        # System can erase /var/run's content
151
        pid_folder = Path(self.options.pidfile).parent
152
        self.log.info(pid_folder)
153
154
        # Pylint incorrectly infers Path objects
155
        # https://github.com/PyCQA/pylint/issues/224
156
        # pylint: disable=no-member
157
        if not pid_folder.exists():
158
            pid_folder.mkdir()
159
            pid_folder.chmod(0o1777)
160
        # pylint: enable=no-member
161
162
        # Make sure the file is deleted when controller stops
163
        atexit.register(Path(self.options.pidfile).unlink)
164
165
        # Checks if a pidfile exists. Creates a new file.
166
        try:
167
            pidfile = open(self.options.pidfile, mode='x')
168
        except OSError:
169
            # This happens if there is a pidfile already.
170
            # We shall check if the process that created the pidfile is still
171
            # running.
172
            try:
173
                existing_file = open(self.options.pidfile, mode='r')
174
                old_pid = int(existing_file.read())
175
                os.kill(old_pid, 0)
176
                # If kill() doesn't return an error, there's a process running
177
                # with the same PID. We assume it is Kytos and quit.
178
                # Otherwise, overwrite the file and proceed.
179
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
180
                             "running. Aborting.")
181
                sys.exit(error_msg.format(self.options.pidfile))
182
            except OSError:
183
                try:
184
                    pidfile = open(self.options.pidfile, mode='w')
185
                except OSError as exception:
186
                    error_msg = "Failed to create pidfile {}: {}."
187
                    sys.exit(error_msg.format(self.options.pidfile, exception))
188
189
        # Identifies the process that created the pidfile.
190
        pidfile.write(str(pid))
191
        pidfile.close()
192
193 1
    def start_controller(self):
194
        """Start the controller.
195
196
        Starts the KytosServer (TCP Server) coroutine.
197
        Starts a thread for each buffer handler.
198
        Load the installed apps.
199
        """
200
        self.log.info("Starting Kytos - Kytos Controller")
201
        self.server = KytosServer((self.options.listen,
202
                                   int(self.options.port)),
203
                                  KytosServerProtocol,
204
                                  self,
205
                                  self.options.protocol_name)
206
207
        self.log.info("Starting TCP server: %s", self.server)
208
        self.server.serve_forever()
209
210
        def _stop_loop(_):
211
            loop = asyncio.get_event_loop()
212
            # print(_.result())
213
            threads = threading.enumerate()
214
            self.log.debug("%s threads before loop.stop: %s",
215
                           len(threads), threads)
216
            loop.stop()
217
218
        async def _run_api_server_thread(executor):
219
            log = logging.getLogger('kytos.core.controller.api_server_thread')
220
            log.debug('starting')
221
            # log.debug('creating tasks')
222
            loop = asyncio.get_event_loop()
223
            blocking_tasks = [
224
                loop.run_in_executor(executor, self.api_server.run)
225
            ]
226
            # log.debug('waiting for tasks')
227
            completed, pending = await asyncio.wait(blocking_tasks)
228
            # results = [t.result() for t in completed]
229
            # log.debug('results: {!r}'.format(results))
230
            log.debug('completed: %d, pending: %d',
231
                      len(completed), len(pending))
232
233
        task = self._loop.create_task(self.raw_event_handler())
234
        task = self._loop.create_task(self.msg_in_event_handler())
235
        task = self._loop.create_task(self.msg_out_event_handler())
236
        task = self._loop.create_task(self.app_event_handler())
237
        task = self._loop.create_task(_run_api_server_thread(self._pool))
238
        task.add_done_callback(_stop_loop)
239
240
        self.log.info("ThreadPool started: %s", self._pool)
241
242
        # ASYNC TODO: ensure all threads started correctly
243
        # This is critical, if any of them failed starting we should exit.
244
        # sys.exit(error_msg.format(thread, exception))
245
246
        self.log.info("Loading Kytos NApps...")
247
        self.napp_dir_listener.start()
248
        self.pre_install_napps(self.options.napps_pre_installed)
249
        self.load_napps()
250
251
        self.started_at = now()
252
253 1
    def _register_endpoints(self):
254
        """Register all rest endpoint served by kytos.
255
256
        -   Register APIServer endpoints
257
        -   Register WebUI endpoints
258
        -   Register ``/api/kytos/core/config`` endpoint
259
        """
260 1
        self.api_server.start_api()
261
        # Register controller endpoints as /api/kytos/core/...
262 1
        self.api_server.register_core_endpoint('config/',
263
                                               self.configuration_endpoint)
264 1
        self.api_server.register_core_endpoint('metadata/',
265
                                               Controller.metadata_endpoint)
266 1
        self.api_server.register_core_endpoint(
267
            'reload/<username>/<napp_name>/',
268
            self.rest_reload_napp)
269 1
        self.api_server.register_core_endpoint('reload/all',
270
                                               self.rest_reload_all_napps)
271 1
        self.auth.register_core_auth_services()
272
273 1
    def register_rest_endpoint(self, url, function, methods):
274
        """Deprecate in favor of @rest decorator."""
275 1
        self.api_server.register_rest_endpoint(url, function, methods)
276
277 1
    @classmethod
278
    def metadata_endpoint(cls):
279
        """Return the Kytos metadata.
280
281
        Returns:
282
            string: Json with current kytos metadata.
283
284
        """
285
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
286
        meta_file = open(meta_path).read()
287
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
288
        return json.dumps(metadata)
289
290 1
    def configuration_endpoint(self):
291
        """Return the configuration options used by Kytos.
292
293
        Returns:
294
            string: Json with current configurations used by kytos.
295
296
        """
297 1
        return json.dumps(self.options.__dict__)
298
299 1
    def restart(self, graceful=True):
300
        """Restart Kytos SDN Controller.
301
302
        Args:
303
            graceful(bool): Represents the way that Kytos will restart.
304
        """
305
        if self.started_at is not None:
306
            self.stop(graceful)
307
            self.__init__(self.options)
308
309
        self.start(restart=True)
310
311 1
    def stop(self, graceful=True):
312
        """Shutdown all services used by kytos.
313
314
        This method should:
315
            - stop all Websockets
316
            - stop the API Server
317
            - stop the Controller
318
        """
319
        if self.started_at:
320
            self.stop_controller(graceful)
321
322 1
    def stop_controller(self, graceful=True):
323
        """Stop the controller.
324
325
        This method should:
326
            - announce on the network that the controller will shutdown;
327
            - stop receiving incoming packages;
328
            - call the 'shutdown' method of each KytosNApp that is running;
329
            - finish reading the events on all buffers;
330
            - stop each running handler;
331
            - stop all running threads;
332
            - stop the KytosServer;
333
        """
334
        self.log.info("Stopping Kytos")
335
336
        self.buffers.send_stop_signal()
337
        self.api_server.stop_api_server()
338
        self.napp_dir_listener.stop()
339
340
        self.log.info("Stopping threadpool: %s", self._pool)
341
342
        threads = threading.enumerate()
343
        self.log.debug("%s threads before threadpool shutdown: %s",
344
                       len(threads), threads)
345
346
        self._pool.shutdown(wait=graceful)
347
348
        # self.server.socket.shutdown()
349
        # self.server.socket.close()
350
351
        # for thread in self._threads.values():
352
        #     self.log.info("Stopping thread: %s", thread.name)
353
        #     thread.join()
354
355
        # for thread in self._threads.values():
356
        #     while thread.is_alive():
357
        #         self.log.info("Thread is alive: %s", thread.name)
358
        #         pass
359
360
        self.started_at = None
361
        self.unload_napps()
362
        self.buffers = KytosBuffers()
363
364
        # ASYNC TODO: close connections
365
        # self.server.server_close()
366
367
        # Shutdown the TCP server and the main asyncio loop
368
        self.server.shutdown()
369
370 1
    def status(self):
371
        """Return status of Kytos Server.
372
373
        If the controller kytos is running this method will be returned
374
        "Running since 'Started_At'", otherwise "Stopped".
375
376
        Returns:
377
            string: String with kytos status.
378
379
        """
380
        if self.started_at:
381
            return "Running since %s" % self.started_at
382
        return "Stopped"
383
384 1
    def uptime(self):
385
        """Return the uptime of kytos server.
386
387
        This method should return:
388
            - 0 if Kytos Server is stopped.
389
            - (kytos.start_at - datetime.now) if Kytos Server is running.
390
391
        Returns:
392
           datetime.timedelta: The uptime interval.
393
394
        """
395
        return now() - self.started_at if self.started_at else 0
396
397 1
    def notify_listeners(self, event):
398
        """Send the event to the specified listeners.
399
400
        Loops over self.events_listeners matching (by regexp) the attribute
401
        name of the event with the keys of events_listeners. If a match occurs,
402
        then send the event to each registered listener.
403
404
        Args:
405
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
406
        """
407
        self.log.debug("looking for listeners for %s", event)
408
        for event_regex, listeners in dict(self.events_listeners).items():
409
            # self.log.debug("listeners found for %s: %r => %s", event,
410
            #                event_regex, [l.__qualname__ for l in listeners])
411
            # Do not match if the event has more characters
412
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
413
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
414
                event_regex += '$'
415
            if re.match(event_regex, event.name):
416
                # self.log.debug('Calling listeners for %s', event)
417
                for listener in listeners:
418
                    listener(event)
419
420 1
    async def raw_event_handler(self):
421
        """Handle raw events.
422
423
        Listen to the raw_buffer and send all its events to the
424
        corresponding listeners.
425
        """
426
        self.log.info("Raw Event Handler started")
427
        while True:
428
            event = await self.buffers.raw.aget()
429
            self.notify_listeners(event)
430
            self.log.debug("Raw Event handler called")
431
432
            if event.name == "kytos/core.shutdown":
433
                self.log.debug("Raw Event handler stopped")
434
                break
435
436 1
    async def msg_in_event_handler(self):
437
        """Handle msg_in events.
438
439
        Listen to the msg_in buffer and send all its events to the
440
        corresponding listeners.
441
        """
442
        self.log.info("Message In Event Handler started")
443
        while True:
444
            event = await self.buffers.msg_in.aget()
445
            self.notify_listeners(event)
446
            self.log.debug("Message In Event handler called")
447
448
            if event.name == "kytos/core.shutdown":
449
                self.log.debug("Message In Event handler stopped")
450
                break
451
452 1
    async def msg_out_event_handler(self):
453
        """Handle msg_out events.
454
455
        Listen to the msg_out buffer and send all its events to the
456
        corresponding listeners.
457
        """
458
        self.log.info("Message Out Event Handler started")
459
        while True:
460
            triggered_event = await self.buffers.msg_out.aget()
461
462
            if triggered_event.name == "kytos/core.shutdown":
463
                self.log.debug("Message Out Event handler stopped")
464
                break
465
466
            message = triggered_event.content['message']
467
            destination = triggered_event.destination
468
            if (destination and
469
                    not destination.state == ConnectionState.FINISHED):
470
                packet = message.pack()
471
                destination.send(packet)
472
                self.log.debug('Connection %s: OUT OFP, '
473
                               'version: %s, type: %s, xid: %s - %s',
474
                               destination.id,
475
                               message.header.version,
476
                               message.header.message_type,
477
                               message.header.xid,
478
                               packet.hex())
479
                self.notify_listeners(triggered_event)
480
                self.log.debug("Message Out Event handler called")
481
            else:
482
                self.log.info("connection closed. Cannot send message")
483
484 1
    async def app_event_handler(self):
485
        """Handle app events.
486
487
        Listen to the app buffer and send all its events to the
488
        corresponding listeners.
489
        """
490
        self.log.info("App Event Handler started")
491
        while True:
492
            event = await self.buffers.app.aget()
493
            self.notify_listeners(event)
494
            self.log.debug("App Event handler called")
495
496
            if event.name == "kytos/core.shutdown":
497
                self.log.debug("App Event handler stopped")
498
                break
499
500 1
    def get_interface_by_id(self, interface_id):
501
        """Find a Interface  with interface_id.
502
503
        Args:
504
            interface_id(str): Interface Identifier.
505
506
        Returns:
507
            Interface: Instance of Interface with the id given.
508
509
        """
510
        if interface_id is None:
511
            return None
512
513
        switch_id = ":".join(interface_id.split(":")[:-1])
514
        interface_number = int(interface_id.split(":")[-1])
515
516
        switch = self.switches.get(switch_id)
517
518
        if not switch:
519
            return None
520
521
        return switch.interfaces.get(interface_number, None)
522
523 1
    def get_switch_by_dpid(self, dpid):
524
        """Return a specific switch by dpid.
525
526
        Args:
527
            dpid (|DPID|): dpid object used to identify a switch.
528
529
        Returns:
530
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
531
532
        """
533 1
        return self.switches.get(dpid)
534
535 1
    def get_switch_or_create(self, dpid, connection):
536
        """Return switch or create it if necessary.
537
538
        Args:
539
            dpid (|DPID|): dpid object used to identify a switch.
540
            connection (:class:`~kytos.core.connection.Connection`):
541
                connection used by switch. If a switch has a connection that
542
                will be updated.
543
544
        Returns:
545
            :class:`~kytos.core.switch.Switch`: new or existent switch.
546
547
        """
548 1
        self.create_or_update_connection(connection)
549 1
        switch = self.get_switch_by_dpid(dpid)
550 1
        event_name = 'kytos/core.switch.'
551
552 1
        if switch is None:
553
            switch = Switch(dpid=dpid)
554
            self.add_new_switch(switch)
555
            event_name += 'new'
556
        else:
557 1
            event_name += 'reconnected'
558
559 1
        self.set_switch_options(dpid=dpid)
560 1
        event = KytosEvent(name=event_name, content={'switch': switch})
561
562 1
        old_connection = switch.connection
563 1
        switch.update_connection(connection)
564
565 1
        if old_connection is not connection:
566
            self.remove_connection(old_connection)
567
568 1
        self.buffers.app.put(event)
569
570 1
        return switch
571
572 1
    def set_switch_options(self, dpid):
573
        """Update the switch settings based on kytos.conf options.
574
575
        Args:
576
            dpid (str): dpid used to identify a switch.
577
578
        """
579 1
        switch = self.switches.get(dpid)
580 1
        if not switch:
581
            return
582
583 1
        vlan_pool = {}
584 1
        try:
585 1
            vlan_pool = json.loads(self.options.vlan_pool)
586 1
            if not vlan_pool:
587
                return
588
        except (TypeError, json.JSONDecodeError) as err:
589
            self.log.error("Invalid vlan_pool settings: %s", err)
590
591 1
        if vlan_pool.get(dpid):
592 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
593 1
            for intf_num, port_list in vlan_pool[dpid].items():
594 1
                if not switch.interfaces.get((intf_num)):
595 1
                    vlan_ids = set()
596 1
                    for vlan_range in port_list:
597 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
598 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
599 1
                            vlan_ids.add(vlan_id)
600 1
                    intf_num = int(intf_num)
601 1
                    intf = Interface(name=intf_num, port_number=intf_num,
602
                                     switch=switch)
603 1
                    intf.set_available_tags(vlan_ids)
604 1
                    switch.update_interface(intf)
605
606 1
    def create_or_update_connection(self, connection):
607
        """Update a connection.
608
609
        Args:
610
            connection (:class:`~kytos.core.connection.Connection`):
611
                Instance of connection that will be updated.
612
        """
613 1
        self.connections[connection.id] = connection
614
615 1
    def get_connection_by_id(self, conn_id):
616
        """Return a existent connection by id.
617
618
        Args:
619
            id (int): id from a connection.
620
621
        Returns:
622
            :class:`~kytos.core.connection.Connection`: Instance of connection
623
                or None Type.
624
625
        """
626
        return self.connections.get(conn_id)
627
628 1
    def remove_connection(self, connection):
629
        """Close a existent connection and remove it.
630
631
        Args:
632
            connection (:class:`~kytos.core.connection.Connection`):
633
                Instance of connection that will be removed.
634
        """
635
        if connection is None:
636
            return False
637
638
        try:
639
            connection.close()
640
            del self.connections[connection.id]
641
        except KeyError:
642
            return False
643
        return True
644
645 1
    def remove_switch(self, switch):
646
        """Remove an existent switch.
647
648
        Args:
649
            switch (:class:`~kytos.core.switch.Switch`):
650
                Instance of switch that will be removed.
651
        """
652
        try:
653
            del self.switches[switch.dpid]
654
        except KeyError:
655
            return False
656
        return True
657
658 1
    def new_connection(self, event):
659
        """Handle a kytos/core.connection.new event.
660
661
        This method will read new connection event and store the connection
662
        (socket) into the connections attribute on the controller.
663
664
        It also clear all references to the connection since it is a new
665
        connection on the same ip:port.
666
667
        Args:
668
            event (~kytos.core.KytosEvent):
669
                The received event (``kytos/core.connection.new``) with the
670
                needed info.
671
        """
672
        self.log.info("Handling %s...", event)
673
674
        connection = event.source
675
        self.log.debug("Event source: %s", event.source)
676
677
        # Remove old connection (aka cleanup) if it exists
678
        if self.get_connection_by_id(connection.id):
679
            self.remove_connection(connection.id)
680
681
        # Update connections with the new connection
682
        self.create_or_update_connection(connection)
683
684 1
    def add_new_switch(self, switch):
685
        """Add a new switch on the controller.
686
687
        Args:
688
            switch (Switch): A Switch object
689
        """
690
        self.switches[switch.dpid] = switch
691
692 1
    def _import_napp(self, username, napp_name):
693
        """Import a NApp module.
694
695
        Raises:
696
            FileNotFoundError: if NApp's main.py is not found.
697
            ModuleNotFoundError: if any NApp requirement is not installed.
698
699
        """
700
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
701
        path = os.path.join(self.options.napps, username, napp_name,
702
                            'main.py')
703
        napp_spec = spec_from_file_location(mod_name, path)
704
        napp_module = module_from_spec(napp_spec)
705
        sys.modules[napp_spec.name] = napp_module
706
        napp_spec.loader.exec_module(napp_module)
707
        return napp_module
708
709 1
    def load_napp(self, username, napp_name):
710
        """Load a single NApp.
711
712
        Args:
713
            username (str): NApp username (makes up NApp's path).
714
            napp_name (str): Name of the NApp to be loaded.
715
        """
716
        if (username, napp_name) in self.napps:
717
            message = 'NApp %s/%s was already loaded'
718
            self.log.warning(message, username, napp_name)
719
            return
720
721
        try:
722
            napp_module = self._import_napp(username, napp_name)
723
        except ModuleNotFoundError as err:
724
            self.log.error("Error loading NApp '%s/%s': %s",
725
                           username, napp_name, err)
726
            return
727
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
728
            msg = "NApp module not found, assuming it's a meta napp: %s"
729
            self.log.warning(msg, err.filename)
730
            return
731
732
        napp = napp_module.Main(controller=self)
733
734
        self.napps[(username, napp_name)] = napp
735
736
        napp.start()
737
        self.api_server.register_napp_endpoints(napp)
738
739
        # pylint: disable=protected-access
740
        for event, listeners in napp._listeners.items():
741
            self.events_listeners.setdefault(event, []).extend(listeners)
742
        # pylint: enable=protected-access
743
744 1
    def pre_install_napps(self, napps, enable=True):
745
        """Pre install and enable NApps.
746
747
        Before installing, it'll check if it's installed yet.
748
749
        Args:
750
            napps ([str]): List of NApps to be pre-installed and enabled.
751
        """
752
        all_napps = self.napps_manager.get_installed_napps()
753
        installed = [str(napp) for napp in all_napps]
754
        napps_diff = [napp for napp in napps if napp not in installed]
755
        for napp in napps_diff:
756
            self.napps_manager.install(napp, enable=enable)
757
758 1
    def load_napps(self):
759
        """Load all NApps enabled on the NApps dir."""
760
        for napp in self.napps_manager.get_enabled_napps():
761
            try:
762
                self.log.info("Loading NApp %s", napp.id)
763
                self.load_napp(napp.username, napp.name)
764
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
765
                self.log.error("Could not load NApp %s: %s",
766
                               napp.id, exception)
767
768 1
    def unload_napp(self, username, napp_name):
769
        """Unload a specific NApp.
770
771
        Args:
772
            username (str): NApp username.
773
            napp_name (str): Name of the NApp to be unloaded.
774
        """
775 1
        napp = self.napps.pop((username, napp_name), None)
776
777 1
        if napp is None:
778
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
779
        else:
780 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
781 1
            napp_id = NApp(username, napp_name).id
782 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
783 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
784
            # Call listener before removing it from events_listeners
785 1
            napp_shutdown_fn(event)
786
787
            # Remove rest endpoints from that napp
788 1
            self.api_server.remove_napp_endpoints(napp)
789
790
            # Removing listeners from that napp
791
            # pylint: disable=protected-access
792 1
            for event_type, napp_listeners in napp._listeners.items():
793
                event_listeners = self.events_listeners[event_type]
794
                for listener in napp_listeners:
795
                    event_listeners.remove(listener)
796
                if not event_listeners:
797
                    del self.events_listeners[event_type]
798
            # pylint: enable=protected-access
799
800 1
    def unload_napps(self):
801
        """Unload all loaded NApps that are not core NApps."""
802
        # list() is used here to avoid the error:
803
        # 'RuntimeError: dictionary changed size during iteration'
804
        # This is caused by looping over an dictionary while removing
805
        # items from it.
806
        for (username, napp_name) in list(self.napps.keys()):  # noqa
807
            self.unload_napp(username, napp_name)
808
809 1
    def reload_napp_module(self, username, napp_name, napp_file):
810
        """Reload a NApp Module."""
811
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
812
        try:
813
            napp_module = import_module(mod_name)
814
        except ModuleNotFoundError as err:
815
            self.log.error("Module '%s' not found", mod_name)
816
            raise
817
        try:
818
            napp_module = reload_module(napp_module)
819
        except ImportError as err:
820
            self.log.error("Error reloading NApp '%s/%s': %s",
821
                           username, napp_name, err)
822
            raise
823
824 1
    def reload_napp(self, username, napp_name):
825
        """Reload a NApp."""
826
        self.unload_napp(username, napp_name)
827
        try:
828
            self.reload_napp_module(username, napp_name, 'settings')
829
            self.reload_napp_module(username, napp_name, 'main')
830
        except (ModuleNotFoundError, ImportError):
831
            return 400
832
        self.log.info("NApp '%s/%s' successfully reloaded",
833
                      username, napp_name)
834
        self.load_napp(username, napp_name)
835
        return 200
836
837 1
    def rest_reload_napp(self, username, napp_name):
838
        """Request reload a NApp."""
839
        res = self.reload_napp(username, napp_name)
840
        return 'reloaded', res
841
842 1
    def rest_reload_all_napps(self):
843
        """Request reload all NApps."""
844
        for napp in self.napps:
845
            self.reload_napp(*napp)
846
        return 'reloaded', 200
847