Passed
Push — master ( 4d1ed9...27ce7e )
by Vinicius
14:54 queued 08:58
created

Controller.set_switch_options()   B

Complexity

Conditions 8

Size

Total Lines 30
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 8.0069

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 2
dl 0
loc 30
rs 7.3333
c 0
b 0
f 0
ccs 20
cts 21
cp 0.9524
crap 8.0069
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29 2
from socket import error as SocketError
30
31 2
from kytos.core.api_server import APIServer
32
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
33 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
34 2
from kytos.core.auth import Auth
35 2
from kytos.core.buffers import KytosBuffers
36 2
from kytos.core.config import KytosConfig
37 2
from kytos.core.connection import ConnectionState
38 2
from kytos.core.db import db_conn_wait
39 2
from kytos.core.events import KytosEvent
40 2
from kytos.core.exceptions import KytosDBInitException
41 2
from kytos.core.helpers import executor as executor_pool
42 2
from kytos.core.helpers import now
43 2
from kytos.core.interface import Interface
44 2
from kytos.core.logs import LogManager
45 2
from kytos.core.napps.base import NApp
46 2
from kytos.core.napps.manager import NAppsManager
47 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
48 2
from kytos.core.switch import Switch
49
50 2
__all__ = ('Controller',)
51
52
53 2
def exc_handler(_, exc, __):
54
    """Log uncaught exceptions.
55
56
    Args:
57
        exc_type (ignored): exception type
58
        exc: exception instance
59
        tb (ignored): traceback
60
    """
61
    logging.basicConfig(filename='errlog.log',
62
                        format='%(asctime)s:%(pathname)'
63
                        's:%(levelname)s:%(message)s')
64
    logging.exception('Uncaught Exception: %s', exc)
65
66
67 2
class Controller:
68
    """Main class of Kytos.
69
70
    The main responsibilities of this class are:
71
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
72
        - manage KytosNApps (install, load and unload);
73
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
74
        - manage which event should be sent to NApps methods;
75
        - manage the buffers handlers, considering one thread per handler.
76
    """
77
78
    # Created issue #568 for the disabled checks.
79
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
80 2
    def __init__(self, options=None, loop=None):
81
        """Init method of Controller class takes the parameters below.
82
83
        Args:
84
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
85
                instance of :class:`~kytos.core.config.KytosConfig` class.
86
        """
87 2
        if options is None:
88
            options = KytosConfig().options['daemon']
89
90 2
        self._loop = loop or asyncio.get_event_loop()
91 2
        self._pool = ThreadPoolExecutor(max_workers=1)
92
93
        # asyncio tasks
94 2
        self._tasks = []
95
96
        #: dict: keep the main threads of the controller (buffers and handler)
97 2
        self._threads = {}
98
        #: KytosBuffers: KytosBuffer object with Controller buffers
99 2
        self.buffers = KytosBuffers(loop=self._loop)
100
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
101
        #:
102
        #: This dict stores all connections between the controller and the
103
        #: switches. The key for this dict is a tuple (ip, port). The content
104
        #: is a Connection
105 2
        self.connections = {}
106
        #: dict: mapping of events and event listeners.
107
        #:
108
        #: The key of the dict is a KytosEvent (or a string that represent a
109
        #: regex to match against KytosEvents) and the value is a list of
110
        #: methods that will receive the referenced event
111 2
        self.events_listeners = {'kytos/core.connection.new':
112
                                 [self.new_connection]}
113
114
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
115
        #:
116
        #: The key is the napp name (string), while the value is the napp
117
        #: instance itself.
118 2
        self.napps = {}
119
        #: Object generated by ParseArgs on config.py file
120 2
        self.options = options
121
        #: KytosServer: Instance of KytosServer that will be listening to TCP
122
        #: connections.
123 2
        self.server = None
124
        #: dict: Current existing switches.
125
        #:
126
        #: The key is the switch dpid, while the value is a Switch object.
127 2
        self.switches = {}  # dpid: Switch()
128 2
        self._switches_lock = threading.Lock()
129
130
        #: datetime.datetime: Time when the controller finished starting.
131 2
        self.started_at = None
132
133
        #: logging.Logger: Logger instance used by Kytos.
134 2
        self.log = None
135
136
        #: Observer that handle NApps when they are enabled or disabled.
137 2
        self.napp_dir_listener = NAppDirListener(self)
138
139 2
        self.napps_manager = NAppsManager(self)
140
141
        #: API Server used to expose rest endpoints.
142 2
        self.api_server = APIServer(__name__, self.options.listen,
143
                                    self.options.api_port,
144
                                    self.napps_manager, self.options.napps)
145
146 2
        self.auth = Auth(self)
147
148 2
        self._register_endpoints()
149
        #: Adding the napps 'enabled' directory into the PATH
150
        #: Now you can access the enabled napps with:
151
        #: from napps.<username>.<napp_name> import ?....
152 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
153 2
        sys.excepthook = exc_handler
154
155 2
    def enable_logs(self):
156
        """Register kytos log and enable the logs."""
157 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
158 2
        LogManager.enable_websocket(self.api_server.server)
159 2
        self.log = logging.getLogger(__name__)
160
161 2
    @staticmethod
162
    def loggers():
163
        """List all logging Loggers.
164
165
        Return a list of Logger objects, with name and logging level.
166
        """
167 2
        return [logging.getLogger(name)
168
                for name in logging.root.manager.loggerDict
169
                if "kytos" in name]
170
171 2
    def toggle_debug(self, name=None):
172
        """Enable/disable logging debug messages to a given logger name.
173
174
        If the name parameter is not specified the debug will be
175
        enabled/disabled following the initial config file. It will decide
176
        to enable/disable using the 'kytos' name to find the current
177
        logger level.
178
        Obs: To disable the debug the logging will be set to NOTSET
179
180
        Args:
181
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
182
        """
183 2
        if name and name not in logging.root.manager.loggerDict:
184
            # A Logger name that is not declared in logging will raise an error
185
            # otherwise logging would create a new Logger.
186 2
            raise ValueError(f"Invalid logger name: {name}")
187
188 2
        if not name:
189
            # Logger name not specified.
190 2
            level = logging.getLogger('kytos').getEffectiveLevel()
191 2
            enable_debug = level != logging.DEBUG
192
193
            # Enable/disable default Loggers
194 2
            LogManager.load_config_file(self.options.logging, enable_debug)
195 2
            return
196
197
        # Get effective logger level for the name
198 2
        level = logging.getLogger(name).getEffectiveLevel()
199 2
        logger = logging.getLogger(name)
200
201 2
        if level == logging.DEBUG:
202
            # disable debug
203 2
            logger.setLevel(logging.NOTSET)
204
        else:
205
            # enable debug
206 2
            logger.setLevel(logging.DEBUG)
207
208 2
    def start(self, restart=False):
209
        """Create pidfile and call start_controller method."""
210 2
        self.enable_logs()
211 2
        if self.options.database:
212 2
            self.db_conn_or_core_shutdown()
213 2
        if not restart:
214 2
            self.create_pidfile()
215 2
        self.start_controller()
216
217 2
    def create_pidfile(self):
218
        """Create a pidfile."""
219 2
        pid = os.getpid()
220
221
        # Creates directory if it doesn't exist
222
        # System can erase /var/run's content
223 2
        pid_folder = Path(self.options.pidfile).parent
224 2
        self.log.info(pid_folder)
225
226
        # Pylint incorrectly infers Path objects
227
        # https://github.com/PyCQA/pylint/issues/224
228
        # pylint: disable=no-member
229 2
        if not pid_folder.exists():
230
            pid_folder.mkdir()
231
            pid_folder.chmod(0o1777)
232
        # pylint: enable=no-member
233
234
        # Make sure the file is deleted when controller stops
235 2
        atexit.register(Path(self.options.pidfile).unlink)
236
237
        # Checks if a pidfile exists. Creates a new file.
238 2
        try:
239 2
            pidfile = open(self.options.pidfile, mode='x')
240 2
        except OSError:
241
            # This happens if there is a pidfile already.
242
            # We shall check if the process that created the pidfile is still
243
            # running.
244 2
            try:
245 2
                existing_file = open(self.options.pidfile, mode='r')
246 2
                old_pid = int(existing_file.read())
247 2
                os.kill(old_pid, 0)
248
                # If kill() doesn't return an error, there's a process running
249
                # with the same PID. We assume it is Kytos and quit.
250
                # Otherwise, overwrite the file and proceed.
251
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
252
                             "running. Aborting.")
253
                sys.exit(error_msg.format(self.options.pidfile))
254 2
            except OSError:
255 2
                try:
256 2
                    pidfile = open(self.options.pidfile, mode='w')
257
                except OSError as exception:
258
                    error_msg = "Failed to create pidfile {}: {}."
259
                    sys.exit(error_msg.format(self.options.pidfile, exception))
260
261
        # Identifies the process that created the pidfile.
262 2
        pidfile.write(str(pid))
263 2
        pidfile.close()
264
265 2
    def start_controller(self):
266
        """Start the controller.
267
268
        Starts the KytosServer (TCP Server) coroutine.
269
        Starts a thread for each buffer handler.
270
        Load the installed apps.
271
        """
272 2
        self.log.info("Starting Kytos - Kytos Controller")
273 2
        self.server = KytosServer((self.options.listen,
274
                                   int(self.options.port)),
275
                                  KytosServerProtocol,
276
                                  self,
277
                                  self.options.protocol_name)
278
279 2
        self.log.info("Starting TCP server: %s", self.server)
280 2
        self.server.serve_forever()
281
282 2
        def _stop_loop(_):
283
            loop = asyncio.get_event_loop()
284
            # print(_.result())
285
            threads = threading.enumerate()
286
            self.log.debug("%s threads before loop.stop: %s",
287
                           len(threads), threads)
288
            loop.stop()
289
290 2
        async def _run_api_server_thread(executor):
291
            log = logging.getLogger('kytos.core.controller.api_server_thread')
292
            log.debug('starting')
293
            # log.debug('creating tasks')
294
            loop = asyncio.get_event_loop()
295
            blocking_tasks = [
296
                loop.run_in_executor(executor, self.api_server.run)
297
            ]
298
            # log.debug('waiting for tasks')
299
            completed, pending = await asyncio.wait(blocking_tasks)
300
            # results = [t.result() for t in completed]
301
            # log.debug('results: {!r}'.format(results))
302
            log.debug('completed: %d, pending: %d',
303
                      len(completed), len(pending))
304
305 2
        task = self._loop.create_task(self.raw_event_handler())
306 2
        self._tasks.append(task)
307 2
        task = self._loop.create_task(self.msg_in_event_handler())
308 2
        self._tasks.append(task)
309 2
        task = self._loop.create_task(self.msg_out_event_handler())
310 2
        self._tasks.append(task)
311 2
        task = self._loop.create_task(self.app_event_handler())
312 2
        self._tasks.append(task)
313 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
314 2
        task.add_done_callback(_stop_loop)
315 2
        self._tasks.append(task)
316
317 2
        self.log.info("ThreadPool started: %s", self._pool)
318
319
        # ASYNC TODO: ensure all threads started correctly
320
        # This is critical, if any of them failed starting we should exit.
321
        # sys.exit(error_msg.format(thread, exception))
322
323 2
        self.log.info("Loading Kytos NApps...")
324 2
        self.napp_dir_listener.start()
325 2
        self.pre_install_napps(self.options.napps_pre_installed)
326 2
        self.load_napps()
327
328 2
        self.started_at = now()
329
330 2
    def db_conn_or_core_shutdown(self):
331
        """Ensure db connection or core shutdown."""
332 2
        try:
333 2
            db_conn_wait(db_backend=self.options.database)
334 2
        except KytosDBInitException as exc:
335 2
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
336
337 2
    def _register_endpoints(self):
338
        """Register all rest endpoint served by kytos.
339
340
        -   Register APIServer endpoints
341
        -   Register WebUI endpoints
342
        -   Register ``/api/kytos/core/config`` endpoint
343
        """
344 2
        self.api_server.start_api()
345
        # Register controller endpoints as /api/kytos/core/...
346 2
        self.api_server.register_core_endpoint('config/',
347
                                               self.configuration_endpoint)
348 2
        self.api_server.register_core_endpoint('metadata/',
349
                                               Controller.metadata_endpoint)
350 2
        self.api_server.register_core_endpoint(
351
            'reload/<username>/<napp_name>/',
352
            self.rest_reload_napp)
353 2
        self.api_server.register_core_endpoint('reload/all',
354
                                               self.rest_reload_all_napps)
355 2
        self.auth.register_core_auth_services()
356
357 2
    def register_rest_endpoint(self, url, function, methods):
358
        """Deprecate in favor of @rest decorator."""
359 2
        self.api_server.register_rest_endpoint(url, function, methods)
360
361 2
    @classmethod
362
    def metadata_endpoint(cls):
363
        """Return the Kytos metadata.
364
365
        Returns:
366
            string: Json with current kytos metadata.
367
368
        """
369 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
370 2
        meta_file = open(meta_path).read()
371 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
372 2
        return json.dumps(metadata)
373
374 2
    def configuration_endpoint(self):
375
        """Return the configuration options used by Kytos.
376
377
        Returns:
378
            string: Json with current configurations used by kytos.
379
380
        """
381 2
        return json.dumps(self.options.__dict__)
382
383 2
    def restart(self, graceful=True):
384
        """Restart Kytos SDN Controller.
385
386
        Args:
387
            graceful(bool): Represents the way that Kytos will restart.
388
        """
389 2
        if self.started_at is not None:
390 2
            self.stop(graceful)
391 2
            self.__init__(self.options)
392
393 2
        self.start(restart=True)
394
395 2
    def stop(self, graceful=True):
396
        """Shutdown all services used by kytos.
397
398
        This method should:
399
            - stop all Websockets
400
            - stop the API Server
401
            - stop the Controller
402
        """
403 2
        if self.started_at:
404 2
            self.stop_controller(graceful)
405
406 2
    def stop_controller(self, graceful=True):
407
        """Stop the controller.
408
409
        This method should:
410
            - announce on the network that the controller will shutdown;
411
            - stop receiving incoming packages;
412
            - call the 'shutdown' method of each KytosNApp that is running;
413
            - finish reading the events on all buffers;
414
            - stop each running handler;
415
            - stop all running threads;
416
            - stop the KytosServer;
417
        """
418 2
        self.log.info("Stopping Kytos")
419
420 2
        self.buffers.send_stop_signal()
421 2
        self.api_server.stop_api_server()
422 2
        self.napp_dir_listener.stop()
423
424 2
        self.log.info("Stopping threadpool: %s", self._pool)
425 2
        if executor_pool:
426 2
            self.log.info("Stopping threadpool: %s", executor_pool)
427
428 2
        threads = threading.enumerate()
429 2
        self.log.debug("%s threads before threadpool shutdown: %s",
430
                       len(threads), threads)
431
432 2
        try:
433
            # Python >= 3.9
434
            # pylint: disable=unexpected-keyword-arg
435 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
436 2
            if executor_pool:
437 2
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
438 2
        except TypeError:
439 2
            self._pool.shutdown(wait=graceful)
440 2
            if executor_pool:
441 2
                executor_pool.shutdown(wait=graceful)
442
443
        # self.server.socket.shutdown()
444
        # self.server.socket.close()
445
446
        # for thread in self._threads.values():
447
        #     self.log.info("Stopping thread: %s", thread.name)
448
        #     thread.join()
449
450
        # for thread in self._threads.values():
451
        #     while thread.is_alive():
452
        #         self.log.info("Thread is alive: %s", thread.name)
453
        #         pass
454
455 2
        self.started_at = None
456 2
        self.unload_napps()
457 2
        self.buffers = KytosBuffers()
458
459
        # Cancel all async tasks (event handlers and servers)
460 2
        for task in self._tasks:
461
            task.cancel()
462
463
        # ASYNC TODO: close connections
464
        # self.server.server_close()
465
466
        # Shutdown the TCP server and the main asyncio loop
467 2
        self.server.shutdown()
468
469 2
    def status(self):
470
        """Return status of Kytos Server.
471
472
        If the controller kytos is running this method will be returned
473
        "Running since 'Started_At'", otherwise "Stopped".
474
475
        Returns:
476
            string: String with kytos status.
477
478
        """
479 2
        if self.started_at:
480 2
            return "Running since %s" % self.started_at
481 2
        return "Stopped"
482
483 2
    def uptime(self):
484
        """Return the uptime of kytos server.
485
486
        This method should return:
487
            - 0 if Kytos Server is stopped.
488
            - (kytos.start_at - datetime.now) if Kytos Server is running.
489
490
        Returns:
491
           datetime.timedelta: The uptime interval.
492
493
        """
494 2
        return now() - self.started_at if self.started_at else 0
495
496 2
    def notify_listeners(self, event):
497
        """Send the event to the specified listeners.
498
499
        Loops over self.events_listeners matching (by regexp) the attribute
500
        name of the event with the keys of events_listeners. If a match occurs,
501
        then send the event to each registered listener.
502
503
        Args:
504
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
505
        """
506 2
        self.log.debug("looking for listeners for %s", event)
507 2
        for event_regex, listeners in dict(self.events_listeners).items():
508
            # self.log.debug("listeners found for %s: %r => %s", event,
509
            #                event_regex, [l.__qualname__ for l in listeners])
510
            # Do not match if the event has more characters
511
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
512 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
513 2
                event_regex += '$'
514 2
            if re.match(event_regex, event.name):
515
                # self.log.debug('Calling listeners for %s', event)
516 2
                for listener in listeners:
517 2
                    listener(event)
518
519 2
    async def raw_event_handler(self):
520
        """Handle raw events.
521
522
        Listen to the raw_buffer and send all its events to the
523
        corresponding listeners.
524
        """
525 2
        self.log.info("Raw Event Handler started")
526 2
        while True:
527 2
            event = await self.buffers.raw.aget()
528 2
            self.notify_listeners(event)
529 2
            self.log.debug("Raw Event handler called")
530
531 2
            if event.name == "kytos/core.shutdown":
532 2
                self.log.debug("Raw Event handler stopped")
533 2
                break
534
535 2
    async def msg_in_event_handler(self):
536
        """Handle msg_in events.
537
538
        Listen to the msg_in buffer and send all its events to the
539
        corresponding listeners.
540
        """
541 2
        self.log.info("Message In Event Handler started")
542 2
        while True:
543 2
            event = await self.buffers.msg_in.aget()
544 2
            self.notify_listeners(event)
545 2
            self.log.debug("Message In Event handler called")
546
547 2
            if event.name == "kytos/core.shutdown":
548 2
                self.log.debug("Message In Event handler stopped")
549 2
                break
550
551 2
    async def publish_connection_error(self, event):
552
        """Publish connection error event.
553
554
        Args:
555
            event (KytosEvent): Event that triggered for error propagation.
556
        """
557
        event.name = \
558
            f"kytos/core.{event.destination.protocol.name}.connection.error"
559
        error_msg = f"Connection state: {event.destination.state}"
560
        event.content["exception"] = error_msg
561
        await self.buffers.app.aput(event)
562
563 2
    async def msg_out_event_handler(self):
564
        """Handle msg_out events.
565
566
        Listen to the msg_out buffer and send all its events to the
567
        corresponding listeners.
568
        """
569 2
        self.log.info("Message Out Event Handler started")
570 2
        while True:
571 2
            triggered_event = await self.buffers.msg_out.aget()
572
573 2
            if triggered_event.name == "kytos/core.shutdown":
574 2
                self.log.debug("Message Out Event handler stopped")
575 2
                break
576
577 2
            message = triggered_event.content['message']
578 2
            destination = triggered_event.destination
579 2
            try:
580 2
                if (destination and
581
                        not destination.state == ConnectionState.FINISHED):
582 2
                    packet = message.pack()
583 2
                    destination.send(packet)
584 2
                    self.log.debug('Connection %s: OUT OFP, '
585
                                   'version: %s, type: %s, xid: %s - %s',
586
                                   destination.id,
587
                                   message.header.version,
588
                                   message.header.message_type,
589
                                   message.header.xid,
590
                                   packet.hex())
591 2
                    self.notify_listeners(triggered_event)
592 2
                    self.log.debug("Message Out Event handler called")
593 2
                    continue
594
595
            except (OSError, SocketError):
596
                pass
597
598
            await self.publish_connection_error(triggered_event)
599
            self.log.info("connection closed. Cannot send message")
600
601 2
    async def app_event_handler(self):
602
        """Handle app events.
603
604
        Listen to the app buffer and send all its events to the
605
        corresponding listeners.
606
        """
607 2
        self.log.info("App Event Handler started")
608 2
        while True:
609 2
            event = await self.buffers.app.aget()
610 2
            self.notify_listeners(event)
611 2
            self.log.debug("App Event handler called")
612
613 2
            if event.name == "kytos/core.shutdown":
614 2
                self.log.debug("App Event handler stopped")
615 2
                break
616
617 2
    def get_interface_by_id(self, interface_id):
618
        """Find a Interface  with interface_id.
619
620
        Args:
621
            interface_id(str): Interface Identifier.
622
623
        Returns:
624
            Interface: Instance of Interface with the id given.
625
626
        """
627 2
        if interface_id is None:
628 2
            return None
629
630 2
        switch_id = ":".join(interface_id.split(":")[:-1])
631 2
        interface_number = int(interface_id.split(":")[-1])
632
633 2
        switch = self.switches.get(switch_id)
634
635 2
        if not switch:
636 2
            return None
637
638 2
        return switch.interfaces.get(interface_number, None)
639
640 2
    def get_switch_by_dpid(self, dpid):
641
        """Return a specific switch by dpid.
642
643
        Args:
644
            dpid (|DPID|): dpid object used to identify a switch.
645
646
        Returns:
647
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
648
649
        """
650 2
        return self.switches.get(dpid)
651
652 2
    def get_switch_or_create(self, dpid, connection=None):
653
        """Return switch or create it if necessary.
654
655
        Args:
656
            dpid (|DPID|): dpid object used to identify a switch.
657
            connection (:class:`~kytos.core.connection.Connection`):
658
                connection used by switch. If a switch has a connection that
659
                will be updated.
660
661
        Returns:
662
            :class:`~kytos.core.switch.Switch`: new or existent switch.
663
664
        """
665 2
        with self._switches_lock:
666 2
            if connection:
667 2
                self.create_or_update_connection(connection)
668
669 2
            switch = self.get_switch_by_dpid(dpid)
670 2
            event_name = 'kytos/core.switch.'
671
672 2
            if switch is None:
673 2
                switch = Switch(dpid=dpid)
674 2
                self.add_new_switch(switch)
675 2
                event_name += 'new'
676
            else:
677 2
                event_name += 'reconnected'
678
679 2
            self.set_switch_options(dpid=dpid)
680 2
            event = KytosEvent(name=event_name, content={'switch': switch})
681
682 2
            if connection:
683 2
                old_connection = switch.connection
684 2
                switch.update_connection(connection)
685
686 2
                if old_connection is not connection:
687 2
                    self.remove_connection(old_connection)
688
689 2
            self.buffers.app.put(event)
690
691 2
            return switch
692
693 2
    def set_switch_options(self, dpid):
694
        """Update the switch settings based on kytos.conf options.
695
696
        Args:
697
            dpid (str): dpid used to identify a switch.
698
699
        """
700 2
        switch = self.switches.get(dpid)
701 2
        if not switch:
702
            return
703
704 2
        vlan_pool = {}
705 2
        vlan_pool = self.options.vlan_pool
706 2
        if not vlan_pool:
707 2
            return
708
709 2
        if vlan_pool.get(dpid):
710 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
711 2
            for intf_num, port_list in vlan_pool[dpid].items():
712 2
                if not switch.interfaces.get((intf_num)):
713 2
                    vlan_ids = set()
714 2
                    for vlan_range in port_list:
715 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
716 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
717 2
                            vlan_ids.add(vlan_id)
718 2
                    intf_num = int(intf_num)
719 2
                    intf = Interface(name=intf_num, port_number=intf_num,
720
                                     switch=switch)
721 2
                    intf.set_available_tags(vlan_ids)
722 2
                    switch.update_interface(intf)
723
724 2
    def create_or_update_connection(self, connection):
725
        """Update a connection.
726
727
        Args:
728
            connection (:class:`~kytos.core.connection.Connection`):
729
                Instance of connection that will be updated.
730
        """
731 2
        self.connections[connection.id] = connection
732
733 2
    def get_connection_by_id(self, conn_id):
734
        """Return a existent connection by id.
735
736
        Args:
737
            id (int): id from a connection.
738
739
        Returns:
740
            :class:`~kytos.core.connection.Connection`: Instance of connection
741
                or None Type.
742
743
        """
744 2
        return self.connections.get(conn_id)
745
746 2
    def remove_connection(self, connection):
747
        """Close a existent connection and remove it.
748
749
        Args:
750
            connection (:class:`~kytos.core.connection.Connection`):
751
                Instance of connection that will be removed.
752
        """
753 2
        if connection is None:
754 2
            return False
755
756 2
        try:
757 2
            connection.close()
758 2
            del self.connections[connection.id]
759 2
        except KeyError:
760 2
            return False
761 2
        return True
762
763 2
    def remove_switch(self, switch):
764
        """Remove an existent switch.
765
766
        Args:
767
            switch (:class:`~kytos.core.switch.Switch`):
768
                Instance of switch that will be removed.
769
        """
770 2
        try:
771 2
            del self.switches[switch.dpid]
772 2
        except KeyError:
773 2
            return False
774 2
        return True
775
776 2
    def new_connection(self, event):
777
        """Handle a kytos/core.connection.new event.
778
779
        This method will read new connection event and store the connection
780
        (socket) into the connections attribute on the controller.
781
782
        It also clear all references to the connection since it is a new
783
        connection on the same ip:port.
784
785
        Args:
786
            event (~kytos.core.KytosEvent):
787
                The received event (``kytos/core.connection.new``) with the
788
                needed info.
789
        """
790 2
        self.log.info("Handling %s...", event)
791
792 2
        connection = event.source
793 2
        self.log.debug("Event source: %s", event.source)
794
795
        # Remove old connection (aka cleanup) if it exists
796 2
        if self.get_connection_by_id(connection.id):
797
            self.remove_connection(connection.id)
798
799
        # Update connections with the new connection
800 2
        self.create_or_update_connection(connection)
801
802 2
    def add_new_switch(self, switch):
803
        """Add a new switch on the controller.
804
805
        Args:
806
            switch (Switch): A Switch object
807
        """
808 2
        self.switches[switch.dpid] = switch
809
810 2
    def _import_napp(self, username, napp_name):
811
        """Import a NApp module.
812
813
        Raises:
814
            FileNotFoundError: if NApp's main.py is not found.
815
            ModuleNotFoundError: if any NApp requirement is not installed.
816
817
        """
818 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
819 2
        path = os.path.join(self.options.napps, username, napp_name,
820
                            'main.py')
821 2
        napp_spec = spec_from_file_location(mod_name, path)
822 2
        napp_module = module_from_spec(napp_spec)
823 2
        sys.modules[napp_spec.name] = napp_module
824 2
        napp_spec.loader.exec_module(napp_module)
825 2
        return napp_module
826
827 2
    def load_napp(self, username, napp_name):
828
        """Load a single NApp.
829
830
        Args:
831
            username (str): NApp username (makes up NApp's path).
832
            napp_name (str): Name of the NApp to be loaded.
833
        """
834 2
        if (username, napp_name) in self.napps:
835 2
            message = 'NApp %s/%s was already loaded'
836 2
            self.log.warning(message, username, napp_name)
837 2
            return
838
839 2
        try:
840 2
            napp_module = self._import_napp(username, napp_name)
841 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
842 2
            self.log.error("Error loading NApp '%s/%s': %s",
843
                           username, napp_name, err)
844 2
            return
845 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
846 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
847 2
            self.log.warning(msg, err.filename)
848 2
            return
849
850 2
        try:
851 2
            napp = napp_module.Main(controller=self)
852 2
        except:  # noqa pylint: disable=bare-except
853 2
            self.log.critical("NApp initialization failed: %s/%s",
854
                              username, napp_name, exc_info=True)
855 2
            return
856
857 2
        self.napps[(username, napp_name)] = napp
858
859 2
        napp.start()
860 2
        self.api_server.authenticate_endpoints(napp)
861 2
        self.api_server.register_napp_endpoints(napp)
862
863
        # pylint: disable=protected-access
864 2
        for event, listeners in napp._listeners.items():
865
            self.events_listeners.setdefault(event, []).extend(listeners)
866
        # pylint: enable=protected-access
867
868 2
    def pre_install_napps(self, napps, enable=True):
869
        """Pre install and enable NApps.
870
871
        Before installing, it'll check if it's installed yet.
872
873
        Args:
874
            napps ([str]): List of NApps to be pre-installed and enabled.
875
        """
876 2
        all_napps = self.napps_manager.get_installed_napps()
877 2
        installed = [str(napp) for napp in all_napps]
878 2
        napps_diff = [napp for napp in napps if napp not in installed]
879 2
        for napp in napps_diff:
880 2
            self.napps_manager.install(napp, enable=enable)
881
882 2
    def load_napps(self):
883
        """Load all NApps enabled on the NApps dir."""
884 2
        for napp in self.napps_manager.get_enabled_napps():
885 2
            try:
886 2
                self.log.info("Loading NApp %s", napp.id)
887 2
                self.load_napp(napp.username, napp.name)
888
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
889
                self.log.error("Could not load NApp %s: %s",
890
                               napp.id, exception)
891
892 2
    def unload_napp(self, username, napp_name):
893
        """Unload a specific NApp.
894
895
        Args:
896
            username (str): NApp username.
897
            napp_name (str): Name of the NApp to be unloaded.
898
        """
899 2
        napp = self.napps.pop((username, napp_name), None)
900
901 2
        if napp is None:
902
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
903
        else:
904 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
905 2
            napp_id = NApp(username, napp_name).id
906 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
907 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
908
            # Call listener before removing it from events_listeners
909 2
            napp_shutdown_fn(event)
910
911
            # Remove rest endpoints from that napp
912 2
            self.api_server.remove_napp_endpoints(napp)
913
914
            # Removing listeners from that napp
915
            # pylint: disable=protected-access
916 2
            for event_type, napp_listeners in napp._listeners.items():
917
                event_listeners = self.events_listeners[event_type]
918
                for listener in napp_listeners:
919
                    event_listeners.remove(listener)
920
                if not event_listeners:
921
                    del self.events_listeners[event_type]
922
            # pylint: enable=protected-access
923
924 2
    def unload_napps(self):
925
        """Unload all loaded NApps that are not core NApps."""
926
        # list() is used here to avoid the error:
927
        # 'RuntimeError: dictionary changed size during iteration'
928
        # This is caused by looping over an dictionary while removing
929
        # items from it.
930
        for (username, napp_name) in list(self.napps.keys()):  # noqa
931
            self.unload_napp(username, napp_name)
932
933 2
    def reload_napp_module(self, username, napp_name, napp_file):
934
        """Reload a NApp Module."""
935 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
936 2
        try:
937 2
            napp_module = import_module(mod_name)
938 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
939 2
            self.log.error("Module '%s' not found", mod_name)
940 2
            raise
941 2
        try:
942 2
            napp_module = reload_module(napp_module)
943 2
        except ImportError as err:
944 2
            self.log.error("Error reloading NApp '%s/%s': %s",
945
                           username, napp_name, err)
946 2
            raise
947
948 2
    def reload_napp(self, username, napp_name):
949
        """Reload a NApp."""
950 2
        self.unload_napp(username, napp_name)
951 2
        try:
952 2
            self.reload_napp_module(username, napp_name, 'settings')
953 2
            self.reload_napp_module(username, napp_name, 'main')
954 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
955 2
            return 400
956 2
        self.log.info("NApp '%s/%s' successfully reloaded",
957
                      username, napp_name)
958 2
        self.load_napp(username, napp_name)
959 2
        return 200
960
961 2
    def rest_reload_napp(self, username, napp_name):
962
        """Request reload a NApp."""
963 2
        res = self.reload_napp(username, napp_name)
964 2
        return 'reloaded', res
965
966 2
    def rest_reload_all_napps(self):
967
        """Request reload all NApps."""
968 2
        for napp in self.napps:
969 2
            self.reload_napp(*napp)
970
        return 'reloaded', 200
971