Passed
Push — master ( 2d6805...e1e3a4 )
by Humberto
02:14 queued 13s
created

kytos.core.controller   F

Complexity

Total Complexity 96

Size/Duplication

Total Lines 843
Duplicated Lines 0 %

Test Coverage

Coverage 36.73%

Importance

Changes 0
Metric Value
eloc 404
dl 0
loc 843
ccs 137
cts 373
cp 0.3673
rs 2
c 0
b 0
f 0
wmc 96

39 Methods

Rating   Name   Duplication   Size   Complexity  
A Controller.start() 0 6 2
B Controller.create_pidfile() 0 47 5
A Controller.start_controller() 0 59 1
A Controller.enable_logs() 0 5 1
A Controller.__init__() 0 67 2
A Controller.register_rest_endpoint() 0 3 1
A Controller._register_endpoints() 0 18 1
A Controller.rest_reload_napp() 0 4 1
B Controller.notify_listeners() 0 22 6
A Controller.restart() 0 11 2
A Controller.create_or_update_connection() 0 8 1
A Controller.get_switch_by_dpid() 0 11 1
A Controller.add_new_switch() 0 7 1
A Controller.load_napps() 0 9 3
A Controller._import_napp() 0 16 1
A Controller.get_connection_by_id() 0 12 1
A Controller.unload_napps() 0 8 2
A Controller.get_switch_or_create() 0 36 3
A Controller.raw_event_handler() 0 15 3
A Controller.configuration_endpoint() 0 8 1
A Controller.msg_in_event_handler() 0 15 3
A Controller.get_interface_by_id() 0 22 3
A Controller.uptime() 0 12 2
A Controller.rest_reload_all_napps() 0 5 2
A Controller.new_connection() 0 25 2
C Controller.set_switch_options() 0 33 9
A Controller.metadata_endpoint() 0 12 1
A Controller.remove_switch() 0 12 2
B Controller.msg_out_event_handler() 0 31 5
A Controller.stop_controller() 0 47 1
A Controller.reload_napp_module() 0 14 3
A Controller.remove_connection() 0 16 3
A Controller.stop() 0 10 2
A Controller.pre_install_napps() 0 13 2
A Controller.status() 0 13 2
A Controller.app_event_handler() 0 15 3
A Controller.reload_napp() 0 12 2
B Controller.load_napp() 0 33 5
A Controller.unload_napp() 0 30 5

How to fix   Complexity   

Complexity

Complex classes like kytos.core.controller often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29
30 1
from kytos.core.api_server import APIServer
31
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
32 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
33 1
from kytos.core.buffers import KytosBuffers
34 1
from kytos.core.config import KytosConfig
35 1
from kytos.core.connection import ConnectionState
36 1
from kytos.core.events import KytosEvent
37 1
from kytos.core.helpers import now
38 1
from kytos.core.interface import Interface
39 1
from kytos.core.logs import LogManager
40 1
from kytos.core.napps.base import NApp
41 1
from kytos.core.napps.manager import NAppsManager
42 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
43 1
from kytos.core.switch import Switch
44
45 1
__all__ = ('Controller',)
46
47
48 1
class Controller:
49
    """Main class of Kytos.
50
51
    The main responsibilities of this class are:
52
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
53
        - manage KytosNApps (install, load and unload);
54
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
55
        - manage which event should be sent to NApps methods;
56
        - manage the buffers handlers, considering one thread per handler.
57
    """
58
59
    # Created issue #568 for the disabled checks.
60
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
61 1
    def __init__(self, options=None, loop=None):
62
        """Init method of Controller class takes the parameters below.
63
64
        Args:
65
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
66
                instance of :class:`~kytos.core.config.KytosConfig` class.
67
        """
68 1
        if options is None:
69
            options = KytosConfig().options['daemon']
70
71 1
        self._loop = loop or asyncio.get_event_loop()
72 1
        self._pool = ThreadPoolExecutor(max_workers=1)
73
74
        #: dict: keep the main threads of the controller (buffers and handler)
75 1
        self._threads = {}
76
        #: KytosBuffers: KytosBuffer object with Controller buffers
77 1
        self.buffers = KytosBuffers(loop=self._loop)
78
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
79
        #:
80
        #: This dict stores all connections between the controller and the
81
        #: switches. The key for this dict is a tuple (ip, port). The content
82
        #: is another dict with the connection information.
83 1
        self.connections = {}
84
        #: dict: mapping of events and event listeners.
85
        #:
86
        #: The key of the dict is a KytosEvent (or a string that represent a
87
        #: regex to match agains KytosEvents) and the value is a list of
88
        #: methods that will receive the referenced event
89 1
        self.events_listeners = {'kytos/core.connection.new':
90
                                 [self.new_connection]}
91
92
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
93
        #:
94
        #: The key is the napp name (string), while the value is the napp
95
        #: instance itself.
96 1
        self.napps = {}
97
        #: Object generated by ParseArgs on config.py file
98 1
        self.options = options
99
        #: KytosServer: Instance of KytosServer that will be listening to TCP
100
        #: connections.
101 1
        self.server = None
102
        #: dict: Current existing switches.
103
        #:
104
        #: The key is the switch dpid, while the value is a Switch object.
105 1
        self.switches = {}  # dpid: Switch()
106
107
        #: datetime.datetime: Time when the controller finished starting.
108 1
        self.started_at = None
109
110
        #: logging.Logger: Logger instance used by Kytos.
111 1
        self.log = None
112
113
        #: Observer that handle NApps when they are enabled or disabled.
114 1
        self.napp_dir_listener = NAppDirListener(self)
115
116 1
        self.napps_manager = NAppsManager(self)
117
118
        #: API Server used to expose rest endpoints.
119 1
        self.api_server = APIServer(__name__, self.options.listen,
120
                                    self.options.api_port,
121
                                    self.napps_manager, self.options.napps)
122
123 1
        self._register_endpoints()
124
        #: Adding the napps 'enabled' directory into the PATH
125
        #: Now you can access the enabled napps with:
126
        #: from napps.<username>.<napp_name> import ?....
127 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
128
129 1
    def enable_logs(self):
130
        """Register kytos log and enable the logs."""
131 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
132 1
        LogManager.enable_websocket(self.api_server.server)
133 1
        self.log = logging.getLogger(__name__)
134
135 1
    def start(self, restart=False):
136
        """Create pidfile and call start_controller method."""
137
        self.enable_logs()
138
        if not restart:
139
            self.create_pidfile()
140
        self.start_controller()
141
142 1
    def create_pidfile(self):
143
        """Create a pidfile."""
144
        pid = os.getpid()
145
146
        # Creates directory if it doesn't exist
147
        # System can erase /var/run's content
148
        pid_folder = Path(self.options.pidfile).parent
149
        self.log.info(pid_folder)
150
151
        # Pylint incorrectly infers Path objects
152
        # https://github.com/PyCQA/pylint/issues/224
153
        # pylint: disable=no-member
154
        if not pid_folder.exists():
155
            pid_folder.mkdir()
156
            pid_folder.chmod(0o1777)
157
        # pylint: enable=no-member
158
159
        # Make sure the file is deleted when controller stops
160
        atexit.register(Path(self.options.pidfile).unlink)
161
162
        # Checks if a pidfile exists. Creates a new file.
163
        try:
164
            pidfile = open(self.options.pidfile, mode='x')
165
        except OSError:
166
            # This happens if there is a pidfile already.
167
            # We shall check if the process that created the pidfile is still
168
            # running.
169
            try:
170
                existing_file = open(self.options.pidfile, mode='r')
171
                old_pid = int(existing_file.read())
172
                os.kill(old_pid, 0)
173
                # If kill() doesn't return an error, there's a process running
174
                # with the same PID. We assume it is Kytos and quit.
175
                # Otherwise, overwrite the file and proceed.
176
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
177
                             "running. Aborting.")
178
                sys.exit(error_msg.format(self.options.pidfile))
179
            except OSError:
180
                try:
181
                    pidfile = open(self.options.pidfile, mode='w')
182
                except OSError as exception:
183
                    error_msg = "Failed to create pidfile {}: {}."
184
                    sys.exit(error_msg.format(self.options.pidfile, exception))
185
186
        # Identifies the process that created the pidfile.
187
        pidfile.write(str(pid))
188
        pidfile.close()
189
190 1
    def start_controller(self):
191
        """Start the controller.
192
193
        Starts the KytosServer (TCP Server) coroutine.
194
        Starts a thread for each buffer handler.
195
        Load the installed apps.
196
        """
197
        self.log.info("Starting Kytos - Kytos Controller")
198
        self.server = KytosServer((self.options.listen,
199
                                   int(self.options.port)),
200
                                  KytosServerProtocol,
201
                                  self,
202
                                  self.options.protocol_name)
203
204
        self.log.info("Starting TCP server: %s", self.server)
205
        self.server.serve_forever()
206
207
        def _stop_loop(_):
208
            loop = asyncio.get_event_loop()
209
            # print(_.result())
210
            threads = threading.enumerate()
211
            self.log.debug("%s threads before loop.stop: %s",
212
                           len(threads), threads)
213
            loop.stop()
214
215
        async def _run_api_server_thread(executor):
216
            log = logging.getLogger('kytos.core.controller.api_server_thread')
217
            log.debug('starting')
218
            # log.debug('creating tasks')
219
            loop = asyncio.get_event_loop()
220
            blocking_tasks = [
221
                loop.run_in_executor(executor, self.api_server.run)
222
            ]
223
            # log.debug('waiting for tasks')
224
            completed, pending = await asyncio.wait(blocking_tasks)
225
            # results = [t.result() for t in completed]
226
            # log.debug('results: {!r}'.format(results))
227
            log.debug('completed: %d, pending: %d',
228
                      len(completed), len(pending))
229
230
        task = self._loop.create_task(self.raw_event_handler())
231
        task = self._loop.create_task(self.msg_in_event_handler())
232
        task = self._loop.create_task(self.msg_out_event_handler())
233
        task = self._loop.create_task(self.app_event_handler())
234
        task = self._loop.create_task(_run_api_server_thread(self._pool))
235
        task.add_done_callback(_stop_loop)
236
237
        self.log.info("ThreadPool started: %s", self._pool)
238
239
        # ASYNC TODO: ensure all threads started correctly
240
        # This is critical, if any of them failed starting we should exit.
241
        # sys.exit(error_msg.format(thread, exception))
242
243
        self.log.info("Loading Kytos NApps...")
244
        self.napp_dir_listener.start()
245
        self.pre_install_napps(self.options.napps_pre_installed)
246
        self.load_napps()
247
248
        self.started_at = now()
249
250 1
    def _register_endpoints(self):
251
        """Register all rest endpoint served by kytos.
252
253
        -   Register APIServer endpoints
254
        -   Register WebUI endpoints
255
        -   Register ``/api/kytos/core/config`` endpoint
256
        """
257 1
        self.api_server.start_api()
258
        # Register controller endpoints as /api/kytos/core/...
259 1
        self.api_server.register_core_endpoint('config/',
260
                                               self.configuration_endpoint)
261 1
        self.api_server.register_core_endpoint('metadata/',
262
                                               Controller.metadata_endpoint)
263 1
        self.api_server.register_core_endpoint(
264
            'reload/<username>/<napp_name>/',
265
            self.rest_reload_napp)
266 1
        self.api_server.register_core_endpoint('reload/all',
267
                                               self.rest_reload_all_napps)
268
269 1
    def register_rest_endpoint(self, url, function, methods):
270
        """Deprecate in favor of @rest decorator."""
271 1
        self.api_server.register_rest_endpoint(url, function, methods)
272
273 1
    @classmethod
274
    def metadata_endpoint(cls):
275
        """Return the Kytos metadata.
276
277
        Returns:
278
            string: Json with current kytos metadata.
279
280
        """
281
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
282
        meta_file = open(meta_path).read()
283
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
284
        return json.dumps(metadata)
285
286 1
    def configuration_endpoint(self):
287
        """Return the configuration options used by Kytos.
288
289
        Returns:
290
            string: Json with current configurations used by kytos.
291
292
        """
293 1
        return json.dumps(self.options.__dict__)
294
295 1
    def restart(self, graceful=True):
296
        """Restart Kytos SDN Controller.
297
298
        Args:
299
            graceful(bool): Represents the way that Kytos will restart.
300
        """
301
        if self.started_at is not None:
302
            self.stop(graceful)
303
            self.__init__(self.options)
304
305
        self.start(restart=True)
306
307 1
    def stop(self, graceful=True):
308
        """Shutdown all services used by kytos.
309
310
        This method should:
311
            - stop all Websockets
312
            - stop the API Server
313
            - stop the Controller
314
        """
315
        if self.started_at:
316
            self.stop_controller(graceful)
317
318 1
    def stop_controller(self, graceful=True):
319
        """Stop the controller.
320
321
        This method should:
322
            - announce on the network that the controller will shutdown;
323
            - stop receiving incoming packages;
324
            - call the 'shutdown' method of each KytosNApp that is running;
325
            - finish reading the events on all buffers;
326
            - stop each running handler;
327
            - stop all running threads;
328
            - stop the KytosServer;
329
        """
330
        self.log.info("Stopping Kytos")
331
332
        self.buffers.send_stop_signal()
333
        self.api_server.stop_api_server()
334
        self.napp_dir_listener.stop()
335
336
        self.log.info("Stopping threadpool: %s", self._pool)
337
338
        threads = threading.enumerate()
339
        self.log.debug("%s threads before threadpool shutdown: %s",
340
                       len(threads), threads)
341
342
        self._pool.shutdown(wait=graceful)
343
344
        # self.server.socket.shutdown()
345
        # self.server.socket.close()
346
347
        # for thread in self._threads.values():
348
        #     self.log.info("Stopping thread: %s", thread.name)
349
        #     thread.join()
350
351
        # for thread in self._threads.values():
352
        #     while thread.is_alive():
353
        #         self.log.info("Thread is alive: %s", thread.name)
354
        #         pass
355
356
        self.started_at = None
357
        self.unload_napps()
358
        self.buffers = KytosBuffers()
359
360
        # ASYNC TODO: close connections
361
        # self.server.server_close()
362
363
        # Shutdown the TCP server and the main asyncio loop
364
        self.server.shutdown()
365
366 1
    def status(self):
367
        """Return status of Kytos Server.
368
369
        If the controller kytos is running this method will be returned
370
        "Running since 'Started_At'", otherwise "Stopped".
371
372
        Returns:
373
            string: String with kytos status.
374
375
        """
376
        if self.started_at:
377
            return "Running since %s" % self.started_at
378
        return "Stopped"
379
380 1
    def uptime(self):
381
        """Return the uptime of kytos server.
382
383
        This method should return:
384
            - 0 if Kytos Server is stopped.
385
            - (kytos.start_at - datetime.now) if Kytos Server is running.
386
387
        Returns:
388
           datetime.timedelta: The uptime interval.
389
390
        """
391
        return now() - self.started_at if self.started_at else 0
392
393 1
    def notify_listeners(self, event):
394
        """Send the event to the specified listeners.
395
396
        Loops over self.events_listeners matching (by regexp) the attribute
397
        name of the event with the keys of events_listeners. If a match occurs,
398
        then send the event to each registered listener.
399
400
        Args:
401
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
402
        """
403
        self.log.debug("looking for listeners for %s", event)
404
        for event_regex, listeners in dict(self.events_listeners).items():
405
            # self.log.debug("listeners found for %s: %r => %s", event,
406
            #                event_regex, [l.__qualname__ for l in listeners])
407
            # Do not match if the event has more characters
408
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
409
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
410
                event_regex += '$'
411
            if re.match(event_regex, event.name):
412
                # self.log.debug('Calling listeners for %s', event)
413
                for listener in listeners:
414
                    listener(event)
415
416 1
    async def raw_event_handler(self):
417
        """Handle raw events.
418
419
        Listen to the raw_buffer and send all its events to the
420
        corresponding listeners.
421
        """
422
        self.log.info("Raw Event Handler started")
423
        while True:
424
            event = await self.buffers.raw.aget()
425
            self.notify_listeners(event)
426
            self.log.debug("Raw Event handler called")
427
428
            if event.name == "kytos/core.shutdown":
429
                self.log.debug("Raw Event handler stopped")
430
                break
431
432 1
    async def msg_in_event_handler(self):
433
        """Handle msg_in events.
434
435
        Listen to the msg_in buffer and send all its events to the
436
        corresponding listeners.
437
        """
438
        self.log.info("Message In Event Handler started")
439
        while True:
440
            event = await self.buffers.msg_in.aget()
441
            self.notify_listeners(event)
442
            self.log.debug("Message In Event handler called")
443
444
            if event.name == "kytos/core.shutdown":
445
                self.log.debug("Message In Event handler stopped")
446
                break
447
448 1
    async def msg_out_event_handler(self):
449
        """Handle msg_out events.
450
451
        Listen to the msg_out buffer and send all its events to the
452
        corresponding listeners.
453
        """
454
        self.log.info("Message Out Event Handler started")
455
        while True:
456
            triggered_event = await self.buffers.msg_out.aget()
457
458
            if triggered_event.name == "kytos/core.shutdown":
459
                self.log.debug("Message Out Event handler stopped")
460
                break
461
462
            message = triggered_event.content['message']
463
            destination = triggered_event.destination
464
            if (destination and
465
                    not destination.state == ConnectionState.FINISHED):
466
                packet = message.pack()
467
                destination.send(packet)
468
                self.log.debug('Connection %s: OUT OFP, '
469
                               'version: %s, type: %s, xid: %s - %s',
470
                               destination.id,
471
                               message.header.version,
472
                               message.header.message_type,
473
                               message.header.xid,
474
                               packet.hex())
475
                self.notify_listeners(triggered_event)
476
                self.log.debug("Message Out Event handler called")
477
            else:
478
                self.log.info("connection closed. Cannot send message")
479
480 1
    async def app_event_handler(self):
481
        """Handle app events.
482
483
        Listen to the app buffer and send all its events to the
484
        corresponding listeners.
485
        """
486
        self.log.info("App Event Handler started")
487
        while True:
488
            event = await self.buffers.app.aget()
489
            self.notify_listeners(event)
490
            self.log.debug("App Event handler called")
491
492
            if event.name == "kytos/core.shutdown":
493
                self.log.debug("App Event handler stopped")
494
                break
495
496 1
    def get_interface_by_id(self, interface_id):
497
        """Find a Interface  with interface_id.
498
499
        Args:
500
            interface_id(str): Interface Identifier.
501
502
        Returns:
503
            Interface: Instance of Interface with the id given.
504
505
        """
506
        if interface_id is None:
507
            return None
508
509
        switch_id = ":".join(interface_id.split(":")[:-1])
510
        interface_number = int(interface_id.split(":")[-1])
511
512
        switch = self.switches.get(switch_id)
513
514
        if not switch:
515
            return None
516
517
        return switch.interfaces.get(interface_number, None)
518
519 1
    def get_switch_by_dpid(self, dpid):
520
        """Return a specific switch by dpid.
521
522
        Args:
523
            dpid (|DPID|): dpid object used to identify a switch.
524
525
        Returns:
526
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
527
528
        """
529 1
        return self.switches.get(dpid)
530
531 1
    def get_switch_or_create(self, dpid, connection):
532
        """Return switch or create it if necessary.
533
534
        Args:
535
            dpid (|DPID|): dpid object used to identify a switch.
536
            connection (:class:`~kytos.core.connection.Connection`):
537
                connection used by switch. If a switch has a connection that
538
                will be updated.
539
540
        Returns:
541
            :class:`~kytos.core.switch.Switch`: new or existent switch.
542
543
        """
544 1
        self.create_or_update_connection(connection)
545 1
        switch = self.get_switch_by_dpid(dpid)
546 1
        event_name = 'kytos/core.switch.'
547
548 1
        if switch is None:
549
            switch = Switch(dpid=dpid)
550
            self.add_new_switch(switch)
551
            event_name += 'new'
552
        else:
553 1
            event_name += 'reconnected'
554
555 1
        self.set_switch_options(dpid=dpid)
556 1
        event = KytosEvent(name=event_name, content={'switch': switch})
557
558 1
        old_connection = switch.connection
559 1
        switch.update_connection(connection)
560
561 1
        if old_connection is not connection:
562
            self.remove_connection(old_connection)
563
564 1
        self.buffers.app.put(event)
565
566 1
        return switch
567
568 1
    def set_switch_options(self, dpid):
569
        """Update the switch settings based on kytos.conf options.
570
571
        Args:
572
            dpid (str): dpid used to identify a switch.
573
574
        """
575 1
        switch = self.switches.get(dpid)
576 1
        if not switch:
577
            return
578
579 1
        vlan_pool = {}
580 1
        try:
581 1
            vlan_pool = json.loads(self.options.vlan_pool)
582 1
            if not vlan_pool:
583
                return
584
        except (TypeError, json.JSONDecodeError) as err:
585
            self.log.error("Invalid vlan_pool settings: %s", err)
586
587 1
        if vlan_pool.get(dpid):
588 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
589 1
            for intf_num, port_list in vlan_pool[dpid].items():
590 1
                if not switch.interfaces.get((intf_num)):
591 1
                    vlan_ids = set()
592 1
                    for vlan_range in port_list:
593 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
594 1
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
595 1
                            vlan_ids.add(vlan_id)
596 1
                    intf_num = int(intf_num)
597 1
                    intf = Interface(name=intf_num, port_number=intf_num,
598
                                     switch=switch)
599 1
                    intf.set_available_tags(vlan_ids)
600 1
                    switch.update_interface(intf)
601
602 1
    def create_or_update_connection(self, connection):
603
        """Update a connection.
604
605
        Args:
606
            connection (:class:`~kytos.core.connection.Connection`):
607
                Instance of connection that will be updated.
608
        """
609 1
        self.connections[connection.id] = connection
610
611 1
    def get_connection_by_id(self, conn_id):
612
        """Return a existent connection by id.
613
614
        Args:
615
            id (int): id from a connection.
616
617
        Returns:
618
            :class:`~kytos.core.connection.Connection`: Instance of connection
619
                or None Type.
620
621
        """
622
        return self.connections.get(conn_id)
623
624 1
    def remove_connection(self, connection):
625
        """Close a existent connection and remove it.
626
627
        Args:
628
            connection (:class:`~kytos.core.connection.Connection`):
629
                Instance of connection that will be removed.
630
        """
631
        if connection is None:
632
            return False
633
634
        try:
635
            connection.close()
636
            del self.connections[connection.id]
637
        except KeyError:
638
            return False
639
        return True
640
641 1
    def remove_switch(self, switch):
642
        """Remove an existent switch.
643
644
        Args:
645
            switch (:class:`~kytos.core.switch.Switch`):
646
                Instance of switch that will be removed.
647
        """
648
        try:
649
            del self.switches[switch.dpid]
650
        except KeyError:
651
            return False
652
        return True
653
654 1
    def new_connection(self, event):
655
        """Handle a kytos/core.connection.new event.
656
657
        This method will read new connection event and store the connection
658
        (socket) into the connections attribute on the controller.
659
660
        It also clear all references to the connection since it is a new
661
        connection on the same ip:port.
662
663
        Args:
664
            event (~kytos.core.KytosEvent):
665
                The received event (``kytos/core.connection.new``) with the
666
                needed info.
667
        """
668
        self.log.info("Handling %s...", event)
669
670
        connection = event.source
671
        self.log.debug("Event source: %s", event.source)
672
673
        # Remove old connection (aka cleanup) if it exists
674
        if self.get_connection_by_id(connection.id):
675
            self.remove_connection(connection.id)
676
677
        # Update connections with the new connection
678
        self.create_or_update_connection(connection)
679
680 1
    def add_new_switch(self, switch):
681
        """Add a new switch on the controller.
682
683
        Args:
684
            switch (Switch): A Switch object
685
        """
686
        self.switches[switch.dpid] = switch
687
688 1
    def _import_napp(self, username, napp_name):
689
        """Import a NApp module.
690
691
        Raises:
692
            FileNotFoundError: if NApp's main.py is not found.
693
            ModuleNotFoundError: if any NApp requirement is not installed.
694
695
        """
696
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
697
        path = os.path.join(self.options.napps, username, napp_name,
698
                            'main.py')
699
        napp_spec = spec_from_file_location(mod_name, path)
700
        napp_module = module_from_spec(napp_spec)
701
        sys.modules[napp_spec.name] = napp_module
702
        napp_spec.loader.exec_module(napp_module)
703
        return napp_module
704
705 1
    def load_napp(self, username, napp_name):
706
        """Load a single NApp.
707
708
        Args:
709
            username (str): NApp username (makes up NApp's path).
710
            napp_name (str): Name of the NApp to be loaded.
711
        """
712
        if (username, napp_name) in self.napps:
713
            message = 'NApp %s/%s was already loaded'
714
            self.log.warning(message, username, napp_name)
715
            return
716
717
        try:
718
            napp_module = self._import_napp(username, napp_name)
719
        except ModuleNotFoundError as err:
720
            self.log.error("Error loading NApp '%s/%s': %s",
721
                           username, napp_name, err)
722
            return
723
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
724
            msg = "NApp module not found, assuming it's a meta napp: %s"
725
            self.log.warning(msg, err.filename)
726
            return
727
728
        napp = napp_module.Main(controller=self)
729
730
        self.napps[(username, napp_name)] = napp
731
732
        napp.start()
733
        self.api_server.register_napp_endpoints(napp)
734
735
        # pylint: disable=protected-access
736
        for event, listeners in napp._listeners.items():
737
            self.events_listeners.setdefault(event, []).extend(listeners)
738
        # pylint: enable=protected-access
739
740 1
    def pre_install_napps(self, napps, enable=True):
741
        """Pre install and enable NApps.
742
743
        Before installing, it'll check if it's installed yet.
744
745
        Args:
746
            napps ([str]): List of NApps to be pre-installed and enabled.
747
        """
748
        all_napps = self.napps_manager.get_installed_napps()
749
        installed = [str(napp) for napp in all_napps]
750
        napps_diff = [napp for napp in napps if napp not in installed]
751
        for napp in napps_diff:
752
            self.napps_manager.install(napp, enable=enable)
753
754 1
    def load_napps(self):
755
        """Load all NApps enabled on the NApps dir."""
756
        for napp in self.napps_manager.get_enabled_napps():
757
            try:
758
                self.log.info("Loading NApp %s", napp.id)
759
                self.load_napp(napp.username, napp.name)
760
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
761
                self.log.error("Could not load NApp %s: %s",
762
                               napp.id, exception)
763
764 1
    def unload_napp(self, username, napp_name):
765
        """Unload a specific NApp.
766
767
        Args:
768
            username (str): NApp username.
769
            napp_name (str): Name of the NApp to be unloaded.
770
        """
771 1
        napp = self.napps.pop((username, napp_name), None)
772
773 1
        if napp is None:
774
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
775
        else:
776 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
777 1
            napp_id = NApp(username, napp_name).id
778 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
779 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
780
            # Call listener before removing it from events_listeners
781 1
            napp_shutdown_fn(event)
782
783
            # Remove rest endpoints from that napp
784 1
            self.api_server.remove_napp_endpoints(napp)
785
786
            # Removing listeners from that napp
787
            # pylint: disable=protected-access
788 1
            for event_type, napp_listeners in napp._listeners.items():
789
                event_listeners = self.events_listeners[event_type]
790
                for listener in napp_listeners:
791
                    event_listeners.remove(listener)
792
                if not event_listeners:
793
                    del self.events_listeners[event_type]
794
            # pylint: enable=protected-access
795
796 1
    def unload_napps(self):
797
        """Unload all loaded NApps that are not core NApps."""
798
        # list() is used here to avoid the error:
799
        # 'RuntimeError: dictionary changed size during iteration'
800
        # This is caused by looping over an dictionary while removing
801
        # items from it.
802
        for (username, napp_name) in list(self.napps.keys()):  # noqa
803
            self.unload_napp(username, napp_name)
804
805 1
    def reload_napp_module(self, username, napp_name, napp_file):
806
        """Reload a NApp Module."""
807
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
808
        try:
809
            napp_module = import_module(mod_name)
810
        except ModuleNotFoundError as err:
811
            self.log.error("Module '%s' not found", mod_name)
812
            raise
813
        try:
814
            napp_module = reload_module(napp_module)
815
        except ImportError as err:
816
            self.log.error("Error reloading NApp '%s/%s': %s",
817
                           username, napp_name, err)
818
            raise
819
820 1
    def reload_napp(self, username, napp_name):
821
        """Reload a NApp."""
822
        self.unload_napp(username, napp_name)
823
        try:
824
            self.reload_napp_module(username, napp_name, 'settings')
825
            self.reload_napp_module(username, napp_name, 'main')
826
        except (ModuleNotFoundError, ImportError):
827
            return 400
828
        self.log.info("NApp '%s/%s' successfully reloaded",
829
                      username, napp_name)
830
        self.load_napp(username, napp_name)
831
        return 200
832
833 1
    def rest_reload_napp(self, username, napp_name):
834
        """Request reload a NApp."""
835
        res = self.reload_napp(username, napp_name)
836
        return 'reloaded', res
837
838 1
    def rest_reload_all_napps(self):
839
        """Request reload all NApps."""
840
        for napp in self.napps:
841
            self.reload_napp(*napp)
842
        return 'reloaded', 200
843