Passed
Push — master ( b7641a...50d42e )
by Vinicius
15:54 queued 08:37
created

kytos.core.controller.Controller.reload_napp()   A

Complexity

Conditions 2

Size

Total Lines 12
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 11
nop 3
dl 0
loc 12
rs 9.85
c 0
b 0
f 0
ccs 10
cts 10
cp 1
crap 2
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29 2
from socket import error as SocketError
30
31 2
from kytos.core.api_server import APIServer
32
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
33 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
34 2
from kytos.core.auth import Auth
35 2
from kytos.core.buffers import KytosBuffers
36 2
from kytos.core.config import KytosConfig
37 2
from kytos.core.connection import ConnectionState
38 2
from kytos.core.db import db_conn_wait
39 2
from kytos.core.dead_letter import DeadLetter
40 2
from kytos.core.events import KytosEvent
41 2
from kytos.core.exceptions import KytosDBInitException
42 2
from kytos.core.helpers import executor as executor_pool
43 2
from kytos.core.helpers import now
44 2
from kytos.core.interface import Interface
45 2
from kytos.core.logs import LogManager
46 2
from kytos.core.napps.base import NApp
47 2
from kytos.core.napps.manager import NAppsManager
48 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 2
from kytos.core.switch import Switch
50
51 2
__all__ = ('Controller',)
52
53
54 2
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 2
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
81 2
    def __init__(self, options=None, loop=None):
82
        """Init method of Controller class takes the parameters below.
83
84
        Args:
85
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
86
                instance of :class:`~kytos.core.config.KytosConfig` class.
87
        """
88 2
        if options is None:
89
            options = KytosConfig().options['daemon']
90
91 2
        self._loop = loop or asyncio.get_event_loop()
92 2
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 2
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 2
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 2
        self.buffers = KytosBuffers(loop=self._loop)
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 2
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 2
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 2
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 2
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 2
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 2
        self.switches = {}  # dpid: Switch()
129 2
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 2
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 2
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 2
        self.napp_dir_listener = NAppDirListener(self)
139
140 2
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 2
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 2
        self.auth = Auth(self)
148 2
        self.dead_letter = DeadLetter(self)
149
150 2
        self._register_endpoints()
151
        #: Adding the napps 'enabled' directory into the PATH
152
        #: Now you can access the enabled napps with:
153
        #: from napps.<username>.<napp_name> import ?....
154 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
155 2
        sys.excepthook = exc_handler
156
157 2
    def enable_logs(self):
158
        """Register kytos log and enable the logs."""
159 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
160 2
        LogManager.enable_websocket(self.api_server.server)
161 2
        self.log = logging.getLogger(__name__)
162
163 2
    @staticmethod
164
    def loggers():
165
        """List all logging Loggers.
166
167
        Return a list of Logger objects, with name and logging level.
168
        """
169 2
        return [logging.getLogger(name)
170
                for name in logging.root.manager.loggerDict
171
                if "kytos" in name]
172
173 2
    def toggle_debug(self, name=None):
174
        """Enable/disable logging debug messages to a given logger name.
175
176
        If the name parameter is not specified the debug will be
177
        enabled/disabled following the initial config file. It will decide
178
        to enable/disable using the 'kytos' name to find the current
179
        logger level.
180
        Obs: To disable the debug the logging will be set to NOTSET
181
182
        Args:
183
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
184
        """
185 2
        if name and name not in logging.root.manager.loggerDict:
186
            # A Logger name that is not declared in logging will raise an error
187
            # otherwise logging would create a new Logger.
188 2
            raise ValueError(f"Invalid logger name: {name}")
189
190 2
        if not name:
191
            # Logger name not specified.
192 2
            level = logging.getLogger('kytos').getEffectiveLevel()
193 2
            enable_debug = level != logging.DEBUG
194
195
            # Enable/disable default Loggers
196 2
            LogManager.load_config_file(self.options.logging, enable_debug)
197 2
            return
198
199
        # Get effective logger level for the name
200 2
        level = logging.getLogger(name).getEffectiveLevel()
201 2
        logger = logging.getLogger(name)
202
203 2
        if level == logging.DEBUG:
204
            # disable debug
205 2
            logger.setLevel(logging.NOTSET)
206
        else:
207
            # enable debug
208 2
            logger.setLevel(logging.DEBUG)
209
210 2
    def start(self, restart=False):
211
        """Create pidfile and call start_controller method."""
212 2
        self.enable_logs()
213 2
        if self.options.database:
214 2
            self.db_conn_or_core_shutdown()
215 2
        if not restart:
216 2
            self.create_pidfile()
217 2
        self.start_controller()
218
219 2
    def create_pidfile(self):
220
        """Create a pidfile."""
221 2
        pid = os.getpid()
222
223
        # Creates directory if it doesn't exist
224
        # System can erase /var/run's content
225 2
        pid_folder = Path(self.options.pidfile).parent
226 2
        self.log.info(pid_folder)
227
228
        # Pylint incorrectly infers Path objects
229
        # https://github.com/PyCQA/pylint/issues/224
230
        # pylint: disable=no-member
231 2
        if not pid_folder.exists():
232
            pid_folder.mkdir()
233
            pid_folder.chmod(0o1777)
234
        # pylint: enable=no-member
235
236
        # Make sure the file is deleted when controller stops
237 2
        atexit.register(Path(self.options.pidfile).unlink)
238
239
        # Checks if a pidfile exists. Creates a new file.
240 2
        try:
241 2
            pidfile = open(self.options.pidfile, mode='x')
242 2
        except OSError:
243
            # This happens if there is a pidfile already.
244
            # We shall check if the process that created the pidfile is still
245
            # running.
246 2
            try:
247 2
                existing_file = open(self.options.pidfile, mode='r')
248 2
                old_pid = int(existing_file.read())
249 2
                os.kill(old_pid, 0)
250
                # If kill() doesn't return an error, there's a process running
251
                # with the same PID. We assume it is Kytos and quit.
252
                # Otherwise, overwrite the file and proceed.
253
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
254
                             "running. Aborting.")
255
                sys.exit(error_msg.format(self.options.pidfile))
256 2
            except OSError:
257 2
                try:
258 2
                    pidfile = open(self.options.pidfile, mode='w')
259
                except OSError as exception:
260
                    error_msg = "Failed to create pidfile {}: {}."
261
                    sys.exit(error_msg.format(self.options.pidfile, exception))
262
263
        # Identifies the process that created the pidfile.
264 2
        pidfile.write(str(pid))
265 2
        pidfile.close()
266
267 2
    def start_controller(self):
268
        """Start the controller.
269
270
        Starts the KytosServer (TCP Server) coroutine.
271
        Starts a thread for each buffer handler.
272
        Load the installed apps.
273
        """
274 2
        self.log.info("Starting Kytos - Kytos Controller")
275 2
        self.server = KytosServer((self.options.listen,
276
                                   int(self.options.port)),
277
                                  KytosServerProtocol,
278
                                  self,
279
                                  self.options.protocol_name)
280
281 2
        self.log.info("Starting TCP server: %s", self.server)
282 2
        self.server.serve_forever()
283
284 2
        def _stop_loop(_):
285
            loop = asyncio.get_event_loop()
286
            # print(_.result())
287
            threads = threading.enumerate()
288
            self.log.debug("%s threads before loop.stop: %s",
289
                           len(threads), threads)
290
            loop.stop()
291
292 2
        async def _run_api_server_thread(executor):
293
            log = logging.getLogger('kytos.core.controller.api_server_thread')
294
            log.debug('starting')
295
            # log.debug('creating tasks')
296
            loop = asyncio.get_event_loop()
297
            blocking_tasks = [
298
                loop.run_in_executor(executor, self.api_server.run)
299
            ]
300
            # log.debug('waiting for tasks')
301
            completed, pending = await asyncio.wait(blocking_tasks)
302
            # results = [t.result() for t in completed]
303
            # log.debug('results: {!r}'.format(results))
304
            log.debug('completed: %d, pending: %d',
305
                      len(completed), len(pending))
306
307 2
        task = self._loop.create_task(self.raw_event_handler())
308 2
        self._tasks.append(task)
309 2
        task = self._loop.create_task(self.msg_in_event_handler())
310 2
        self._tasks.append(task)
311 2
        task = self._loop.create_task(self.msg_out_event_handler())
312 2
        self._tasks.append(task)
313 2
        task = self._loop.create_task(self.app_event_handler())
314 2
        self._tasks.append(task)
315 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
316 2
        task.add_done_callback(_stop_loop)
317 2
        self._tasks.append(task)
318
319 2
        self.log.info("ThreadPool started: %s", self._pool)
320
321
        # ASYNC TODO: ensure all threads started correctly
322
        # This is critical, if any of them failed starting we should exit.
323
        # sys.exit(error_msg.format(thread, exception))
324
325 2
        self.log.info("Loading Kytos NApps...")
326 2
        self.napp_dir_listener.start()
327 2
        self.pre_install_napps(self.options.napps_pre_installed)
328 2
        self.load_napps()
329
330 2
        self.started_at = now()
331
332 2
    def db_conn_or_core_shutdown(self):
333
        """Ensure db connection or core shutdown."""
334 2
        try:
335 2
            db_conn_wait(db_backend=self.options.database)
336 2
        except KytosDBInitException as exc:
337 2
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
338
339 2
    def _register_endpoints(self):
340
        """Register all rest endpoint served by kytos.
341
342
        -   Register APIServer endpoints
343
        -   Register WebUI endpoints
344
        -   Register ``/api/kytos/core/config`` endpoint
345
        """
346 2
        self.api_server.start_api()
347
        # Register controller endpoints as /api/kytos/core/...
348 2
        self.api_server.register_core_endpoint('config/',
349
                                               self.configuration_endpoint)
350 2
        self.api_server.register_core_endpoint('metadata/',
351
                                               Controller.metadata_endpoint)
352 2
        self.api_server.register_core_endpoint(
353
            'reload/<username>/<napp_name>/',
354
            self.rest_reload_napp)
355 2
        self.api_server.register_core_endpoint('reload/all',
356
                                               self.rest_reload_all_napps)
357 2
        self.auth.register_core_auth_services()
358 2
        self.dead_letter.register_endpoints()
359
360 2
    def register_rest_endpoint(self, url, function, methods):
361
        """Deprecate in favor of @rest decorator."""
362 2
        self.api_server.register_rest_endpoint(url, function, methods)
363
364 2
    @classmethod
365
    def metadata_endpoint(cls):
366
        """Return the Kytos metadata.
367
368
        Returns:
369
            string: Json with current kytos metadata.
370
371
        """
372 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
373 2
        meta_file = open(meta_path).read()
374 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
375 2
        return json.dumps(metadata)
376
377 2
    def configuration_endpoint(self):
378
        """Return the configuration options used by Kytos.
379
380
        Returns:
381
            string: Json with current configurations used by kytos.
382
383
        """
384 2
        return json.dumps(self.options.__dict__)
385
386 2
    def restart(self, graceful=True):
387
        """Restart Kytos SDN Controller.
388
389
        Args:
390
            graceful(bool): Represents the way that Kytos will restart.
391
        """
392 2
        if self.started_at is not None:
393 2
            self.stop(graceful)
394 2
            self.__init__(self.options)
395
396 2
        self.start(restart=True)
397
398 2
    def stop(self, graceful=True):
399
        """Shutdown all services used by kytos.
400
401
        This method should:
402
            - stop all Websockets
403
            - stop the API Server
404
            - stop the Controller
405
        """
406 2
        if self.started_at:
407 2
            self.stop_controller(graceful)
408
409 2
    def stop_controller(self, graceful=True):
410
        """Stop the controller.
411
412
        This method should:
413
            - announce on the network that the controller will shutdown;
414
            - stop receiving incoming packages;
415
            - call the 'shutdown' method of each KytosNApp that is running;
416
            - finish reading the events on all buffers;
417
            - stop each running handler;
418
            - stop all running threads;
419
            - stop the KytosServer;
420
        """
421 2
        self.log.info("Stopping Kytos")
422
423 2
        self.buffers.send_stop_signal()
424 2
        self.api_server.stop_api_server()
425 2
        self.napp_dir_listener.stop()
426
427 2
        self.log.info("Stopping threadpool: %s", self._pool)
428 2
        if executor_pool:
429 2
            self.log.info("Stopping threadpool: %s", executor_pool)
430
431 2
        threads = threading.enumerate()
432 2
        self.log.debug("%s threads before threadpool shutdown: %s",
433
                       len(threads), threads)
434
435 2
        try:
436
            # Python >= 3.9
437
            # pylint: disable=unexpected-keyword-arg
438 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
439 2
            if executor_pool:
440 2
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
441 2
        except TypeError:
442 2
            self._pool.shutdown(wait=graceful)
443 2
            if executor_pool:
444 2
                executor_pool.shutdown(wait=graceful)
445
446
        # self.server.socket.shutdown()
447
        # self.server.socket.close()
448
449
        # for thread in self._threads.values():
450
        #     self.log.info("Stopping thread: %s", thread.name)
451
        #     thread.join()
452
453
        # for thread in self._threads.values():
454
        #     while thread.is_alive():
455
        #         self.log.info("Thread is alive: %s", thread.name)
456
        #         pass
457
458 2
        self.started_at = None
459 2
        self.unload_napps()
460 2
        self.buffers = KytosBuffers()
461
462
        # Cancel all async tasks (event handlers and servers)
463 2
        for task in self._tasks:
464
            task.cancel()
465
466
        # ASYNC TODO: close connections
467
        # self.server.server_close()
468
469
        # Shutdown the TCP server and the main asyncio loop
470 2
        self.server.shutdown()
471
472 2
    def status(self):
473
        """Return status of Kytos Server.
474
475
        If the controller kytos is running this method will be returned
476
        "Running since 'Started_At'", otherwise "Stopped".
477
478
        Returns:
479
            string: String with kytos status.
480
481
        """
482 2
        if self.started_at:
483 2
            return "Running since %s" % self.started_at
484 2
        return "Stopped"
485
486 2
    def uptime(self):
487
        """Return the uptime of kytos server.
488
489
        This method should return:
490
            - 0 if Kytos Server is stopped.
491
            - (kytos.start_at - datetime.now) if Kytos Server is running.
492
493
        Returns:
494
           datetime.timedelta: The uptime interval.
495
496
        """
497 2
        return now() - self.started_at if self.started_at else 0
498
499 2
    def notify_listeners(self, event):
500
        """Send the event to the specified listeners.
501
502
        Loops over self.events_listeners matching (by regexp) the attribute
503
        name of the event with the keys of events_listeners. If a match occurs,
504
        then send the event to each registered listener.
505
506
        Args:
507
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
508
        """
509 2
        self.log.debug("looking for listeners for %s", event)
510 2
        for event_regex, listeners in dict(self.events_listeners).items():
511
            # self.log.debug("listeners found for %s: %r => %s", event,
512
            #                event_regex, [l.__qualname__ for l in listeners])
513
            # Do not match if the event has more characters
514
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
515 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
516 2
                event_regex += '$'
517 2
            if re.match(event_regex, event.name):
518
                # self.log.debug('Calling listeners for %s', event)
519 2
                for listener in listeners:
520 2
                    listener(event)
521
522 2
    async def raw_event_handler(self):
523
        """Handle raw events.
524
525
        Listen to the raw_buffer and send all its events to the
526
        corresponding listeners.
527
        """
528 2
        self.log.info("Raw Event Handler started")
529 2
        while True:
530 2
            event = await self.buffers.raw.aget()
531 2
            self.notify_listeners(event)
532 2
            self.log.debug("Raw Event handler called")
533
534 2
            if event.name == "kytos/core.shutdown":
535 2
                self.log.debug("Raw Event handler stopped")
536 2
                break
537
538 2
    async def msg_in_event_handler(self):
539
        """Handle msg_in events.
540
541
        Listen to the msg_in buffer and send all its events to the
542
        corresponding listeners.
543
        """
544 2
        self.log.info("Message In Event Handler started")
545 2
        while True:
546 2
            event = await self.buffers.msg_in.aget()
547 2
            self.notify_listeners(event)
548 2
            self.log.debug("Message In Event handler called")
549
550 2
            if event.name == "kytos/core.shutdown":
551 2
                self.log.debug("Message In Event handler stopped")
552 2
                break
553
554 2
    async def publish_connection_error(self, event):
555
        """Publish connection error event.
556
557
        Args:
558
            event (KytosEvent): Event that triggered for error propagation.
559
        """
560
        event.name = \
561
            f"kytos/core.{event.destination.protocol.name}.connection.error"
562
        error_msg = f"Connection state: {event.destination.state}"
563
        event.content["exception"] = error_msg
564
        await self.buffers.app.aput(event)
565
566 2
    async def msg_out_event_handler(self):
567
        """Handle msg_out events.
568
569
        Listen to the msg_out buffer and send all its events to the
570
        corresponding listeners.
571
        """
572 2
        self.log.info("Message Out Event Handler started")
573 2
        while True:
574 2
            triggered_event = await self.buffers.msg_out.aget()
575
576 2
            if triggered_event.name == "kytos/core.shutdown":
577 2
                self.log.debug("Message Out Event handler stopped")
578 2
                break
579
580 2
            message = triggered_event.content['message']
581 2
            destination = triggered_event.destination
582 2
            try:
583 2
                if (destination and
584
                        not destination.state == ConnectionState.FINISHED):
585 2
                    packet = message.pack()
586 2
                    destination.send(packet)
587 2
                    self.log.debug('Connection %s: OUT OFP, '
588
                                   'version: %s, type: %s, xid: %s - %s',
589
                                   destination.id,
590
                                   message.header.version,
591
                                   message.header.message_type,
592
                                   message.header.xid,
593
                                   packet.hex())
594 2
                    self.notify_listeners(triggered_event)
595 2
                    self.log.debug("Message Out Event handler called")
596 2
                    continue
597
598
            except (OSError, SocketError):
599
                pass
600
601
            await self.publish_connection_error(triggered_event)
602
            self.log.info("connection closed. Cannot send message")
603
604 2
    async def app_event_handler(self):
605
        """Handle app events.
606
607
        Listen to the app buffer and send all its events to the
608
        corresponding listeners.
609
        """
610 2
        self.log.info("App Event Handler started")
611 2
        while True:
612 2
            event = await self.buffers.app.aget()
613 2
            self.notify_listeners(event)
614 2
            self.log.debug("App Event handler called")
615
616 2
            if event.name == "kytos/core.shutdown":
617 2
                self.log.debug("App Event handler stopped")
618 2
                break
619
620 2
    def get_interface_by_id(self, interface_id):
621
        """Find a Interface  with interface_id.
622
623
        Args:
624
            interface_id(str): Interface Identifier.
625
626
        Returns:
627
            Interface: Instance of Interface with the id given.
628
629
        """
630 2
        if interface_id is None:
631 2
            return None
632
633 2
        switch_id = ":".join(interface_id.split(":")[:-1])
634 2
        interface_number = int(interface_id.split(":")[-1])
635
636 2
        switch = self.switches.get(switch_id)
637
638 2
        if not switch:
639 2
            return None
640
641 2
        return switch.interfaces.get(interface_number, None)
642
643 2
    def get_switch_by_dpid(self, dpid):
644
        """Return a specific switch by dpid.
645
646
        Args:
647
            dpid (|DPID|): dpid object used to identify a switch.
648
649
        Returns:
650
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
651
652
        """
653 2
        return self.switches.get(dpid)
654
655 2
    def get_switch_or_create(self, dpid, connection=None):
656
        """Return switch or create it if necessary.
657
658
        Args:
659
            dpid (|DPID|): dpid object used to identify a switch.
660
            connection (:class:`~kytos.core.connection.Connection`):
661
                connection used by switch. If a switch has a connection that
662
                will be updated.
663
664
        Returns:
665
            :class:`~kytos.core.switch.Switch`: new or existent switch.
666
667
        """
668 2
        with self._switches_lock:
669 2
            if connection:
670 2
                self.create_or_update_connection(connection)
671
672 2
            switch = self.get_switch_by_dpid(dpid)
673 2
            event_name = 'kytos/core.switch.'
674
675 2
            if switch is None:
676 2
                switch = Switch(dpid=dpid)
677 2
                self.add_new_switch(switch)
678 2
                event_name += 'new'
679
            else:
680 2
                event_name += 'reconnected'
681
682 2
            self.set_switch_options(dpid=dpid)
683 2
            event = KytosEvent(name=event_name, content={'switch': switch})
684
685 2
            if connection:
686 2
                old_connection = switch.connection
687 2
                switch.update_connection(connection)
688
689 2
                if old_connection is not connection:
690 2
                    self.remove_connection(old_connection)
691
692 2
            self.buffers.app.put(event)
693
694 2
            return switch
695
696 2
    def set_switch_options(self, dpid):
697
        """Update the switch settings based on kytos.conf options.
698
699
        Args:
700
            dpid (str): dpid used to identify a switch.
701
702
        """
703 2
        switch = self.switches.get(dpid)
704 2
        if not switch:
705
            return
706
707 2
        vlan_pool = {}
708 2
        vlan_pool = self.options.vlan_pool
709 2
        if not vlan_pool:
710 2
            return
711
712 2
        if vlan_pool.get(dpid):
713 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
714 2
            for intf_num, port_list in vlan_pool[dpid].items():
715 2
                if not switch.interfaces.get((intf_num)):
716 2
                    vlan_ids = set()
717 2
                    for vlan_range in port_list:
718 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
719 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
720 2
                            vlan_ids.add(vlan_id)
721 2
                    intf_num = int(intf_num)
722 2
                    intf = Interface(name=intf_num, port_number=intf_num,
723
                                     switch=switch)
724 2
                    intf.set_available_tags(vlan_ids)
725 2
                    switch.update_interface(intf)
726
727 2
    def create_or_update_connection(self, connection):
728
        """Update a connection.
729
730
        Args:
731
            connection (:class:`~kytos.core.connection.Connection`):
732
                Instance of connection that will be updated.
733
        """
734 2
        self.connections[connection.id] = connection
735
736 2
    def get_connection_by_id(self, conn_id):
737
        """Return a existent connection by id.
738
739
        Args:
740
            id (int): id from a connection.
741
742
        Returns:
743
            :class:`~kytos.core.connection.Connection`: Instance of connection
744
                or None Type.
745
746
        """
747 2
        return self.connections.get(conn_id)
748
749 2
    def remove_connection(self, connection):
750
        """Close a existent connection and remove it.
751
752
        Args:
753
            connection (:class:`~kytos.core.connection.Connection`):
754
                Instance of connection that will be removed.
755
        """
756 2
        if connection is None:
757 2
            return False
758
759 2
        try:
760 2
            connection.close()
761 2
            del self.connections[connection.id]
762 2
        except KeyError:
763 2
            return False
764 2
        return True
765
766 2
    def remove_switch(self, switch):
767
        """Remove an existent switch.
768
769
        Args:
770
            switch (:class:`~kytos.core.switch.Switch`):
771
                Instance of switch that will be removed.
772
        """
773 2
        try:
774 2
            del self.switches[switch.dpid]
775 2
        except KeyError:
776 2
            return False
777 2
        return True
778
779 2
    def new_connection(self, event):
780
        """Handle a kytos/core.connection.new event.
781
782
        This method will read new connection event and store the connection
783
        (socket) into the connections attribute on the controller.
784
785
        It also clear all references to the connection since it is a new
786
        connection on the same ip:port.
787
788
        Args:
789
            event (~kytos.core.KytosEvent):
790
                The received event (``kytos/core.connection.new``) with the
791
                needed info.
792
        """
793 2
        self.log.info("Handling %s...", event)
794
795 2
        connection = event.source
796 2
        self.log.debug("Event source: %s", event.source)
797
798
        # Remove old connection (aka cleanup) if it exists
799 2
        if self.get_connection_by_id(connection.id):
800
            self.remove_connection(connection.id)
801
802
        # Update connections with the new connection
803 2
        self.create_or_update_connection(connection)
804
805 2
    def add_new_switch(self, switch):
806
        """Add a new switch on the controller.
807
808
        Args:
809
            switch (Switch): A Switch object
810
        """
811 2
        self.switches[switch.dpid] = switch
812
813 2
    def _import_napp(self, username, napp_name):
814
        """Import a NApp module.
815
816
        Raises:
817
            FileNotFoundError: if NApp's main.py is not found.
818
            ModuleNotFoundError: if any NApp requirement is not installed.
819
820
        """
821 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
822 2
        path = os.path.join(self.options.napps, username, napp_name,
823
                            'main.py')
824 2
        napp_spec = spec_from_file_location(mod_name, path)
825 2
        napp_module = module_from_spec(napp_spec)
826 2
        sys.modules[napp_spec.name] = napp_module
827 2
        napp_spec.loader.exec_module(napp_module)
828 2
        return napp_module
829
830 2
    def load_napp(self, username, napp_name):
831
        """Load a single NApp.
832
833
        Args:
834
            username (str): NApp username (makes up NApp's path).
835
            napp_name (str): Name of the NApp to be loaded.
836
        """
837 2
        if (username, napp_name) in self.napps:
838 2
            message = 'NApp %s/%s was already loaded'
839 2
            self.log.warning(message, username, napp_name)
840 2
            return
841
842 2
        try:
843 2
            napp_module = self._import_napp(username, napp_name)
844 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
845 2
            self.log.error("Error loading NApp '%s/%s': %s",
846
                           username, napp_name, err)
847 2
            return
848 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
849 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
850 2
            self.log.warning(msg, err.filename)
851 2
            return
852
853 2
        try:
854 2
            napp = napp_module.Main(controller=self)
855 2
        except:  # noqa pylint: disable=bare-except
856 2
            self.log.critical("NApp initialization failed: %s/%s",
857
                              username, napp_name, exc_info=True)
858 2
            return
859
860 2
        self.napps[(username, napp_name)] = napp
861
862 2
        napp.start()
863 2
        self.api_server.authenticate_endpoints(napp)
864 2
        self.api_server.register_napp_endpoints(napp)
865
866
        # pylint: disable=protected-access
867 2
        for event, listeners in napp._listeners.items():
868
            self.events_listeners.setdefault(event, []).extend(listeners)
869
        # pylint: enable=protected-access
870
871 2
    def pre_install_napps(self, napps, enable=True):
872
        """Pre install and enable NApps.
873
874
        Before installing, it'll check if it's installed yet.
875
876
        Args:
877
            napps ([str]): List of NApps to be pre-installed and enabled.
878
        """
879 2
        all_napps = self.napps_manager.get_installed_napps()
880 2
        installed = [str(napp) for napp in all_napps]
881 2
        napps_diff = [napp for napp in napps if napp not in installed]
882 2
        for napp in napps_diff:
883 2
            self.napps_manager.install(napp, enable=enable)
884
885 2
    def load_napps(self):
886
        """Load all NApps enabled on the NApps dir."""
887 2
        for napp in self.napps_manager.get_enabled_napps():
888 2
            try:
889 2
                self.log.info("Loading NApp %s", napp.id)
890 2
                self.load_napp(napp.username, napp.name)
891
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
892
                self.log.error("Could not load NApp %s: %s",
893
                               napp.id, exception)
894
895 2
    def unload_napp(self, username, napp_name):
896
        """Unload a specific NApp.
897
898
        Args:
899
            username (str): NApp username.
900
            napp_name (str): Name of the NApp to be unloaded.
901
        """
902 2
        napp = self.napps.pop((username, napp_name), None)
903
904 2
        if napp is None:
905
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
906
        else:
907 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
908 2
            napp_id = NApp(username, napp_name).id
909 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
910 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
911
            # Call listener before removing it from events_listeners
912 2
            napp_shutdown_fn(event)
913
914
            # Remove rest endpoints from that napp
915 2
            self.api_server.remove_napp_endpoints(napp)
916
917
            # Removing listeners from that napp
918
            # pylint: disable=protected-access
919 2
            for event_type, napp_listeners in napp._listeners.items():
920
                event_listeners = self.events_listeners[event_type]
921
                for listener in napp_listeners:
922
                    event_listeners.remove(listener)
923
                if not event_listeners:
924
                    del self.events_listeners[event_type]
925
            # pylint: enable=protected-access
926
927 2
    def unload_napps(self):
928
        """Unload all loaded NApps that are not core NApps."""
929
        # list() is used here to avoid the error:
930
        # 'RuntimeError: dictionary changed size during iteration'
931
        # This is caused by looping over an dictionary while removing
932
        # items from it.
933
        for (username, napp_name) in list(self.napps.keys()):  # noqa
934
            self.unload_napp(username, napp_name)
935
936 2
    def reload_napp_module(self, username, napp_name, napp_file):
937
        """Reload a NApp Module."""
938 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
939 2
        try:
940 2
            napp_module = import_module(mod_name)
941 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
942 2
            self.log.error("Module '%s' not found", mod_name)
943 2
            raise
944 2
        try:
945 2
            napp_module = reload_module(napp_module)
946 2
        except ImportError as err:
947 2
            self.log.error("Error reloading NApp '%s/%s': %s",
948
                           username, napp_name, err)
949 2
            raise
950
951 2
    def reload_napp(self, username, napp_name):
952
        """Reload a NApp."""
953 2
        self.unload_napp(username, napp_name)
954 2
        try:
955 2
            self.reload_napp_module(username, napp_name, 'settings')
956 2
            self.reload_napp_module(username, napp_name, 'main')
957 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
958 2
            return 400
959 2
        self.log.info("NApp '%s/%s' successfully reloaded",
960
                      username, napp_name)
961 2
        self.load_napp(username, napp_name)
962 2
        return 200
963
964 2
    def rest_reload_napp(self, username, napp_name):
965
        """Request reload a NApp."""
966 2
        res = self.reload_napp(username, napp_name)
967 2
        return 'reloaded', res
968
969 2
    def rest_reload_all_napps(self):
970
        """Request reload all NApps."""
971 2
        for napp in self.napps:
972 2
            self.reload_napp(*napp)
973
        return 'reloaded', 200
974