Passed
Push — master ( f8912e...f21663 )
by Vinicius
06:13 queued 12s
created

kytos.core.controller.exc_handler()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
ccs 1
cts 3
cp 0.3333
crap 1.2963
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29 2
from socket import error as SocketError
30
31 2
from kytos.core.api_server import APIServer
32
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
33 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
34 2
from kytos.core.auth import Auth
35 2
from kytos.core.buffers import KytosBuffers
36 2
from kytos.core.config import KytosConfig
37 2
from kytos.core.connection import ConnectionState
38 2
from kytos.core.events import KytosEvent
39 2
from kytos.core.helpers import executor as executor_pool
40 2
from kytos.core.helpers import now
41 2
from kytos.core.interface import Interface
42 2
from kytos.core.logs import LogManager
43 2
from kytos.core.napps.base import NApp
44 2
from kytos.core.napps.manager import NAppsManager
45 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
46 2
from kytos.core.switch import Switch
47
48 2
__all__ = ('Controller',)
49
50
51 2
def exc_handler(_, exc, __):
52
    """Log uncaught exceptions.
53
54
    Args:
55
        exc_type (ignored): exception type
56
        exc: exception instance
57
        tb (ignored): traceback
58
    """
59
    logging.basicConfig(filename='errlog.log',
60
                        format='%(asctime)s:%(pathname)'
61
                        's:%(levelname)s:%(message)s')
62
    logging.exception('Uncaught Exception: %s', exc)
63
64
65 2
class Controller:
66
    """Main class of Kytos.
67
68
    The main responsibilities of this class are:
69
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
70
        - manage KytosNApps (install, load and unload);
71
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
72
        - manage which event should be sent to NApps methods;
73
        - manage the buffers handlers, considering one thread per handler.
74
    """
75
76
    # Created issue #568 for the disabled checks.
77
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
78 2
    def __init__(self, options=None, loop=None):
79
        """Init method of Controller class takes the parameters below.
80
81
        Args:
82
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
83
                instance of :class:`~kytos.core.config.KytosConfig` class.
84
        """
85 2
        if options is None:
86
            options = KytosConfig().options['daemon']
87
88 2
        self._loop = loop or asyncio.get_event_loop()
89 2
        self._pool = ThreadPoolExecutor(max_workers=1)
90
91
        # asyncio tasks
92 2
        self._tasks = []
93
94
        #: dict: keep the main threads of the controller (buffers and handler)
95 2
        self._threads = {}
96
        #: KytosBuffers: KytosBuffer object with Controller buffers
97 2
        self.buffers = KytosBuffers(loop=self._loop)
98
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
99
        #:
100
        #: This dict stores all connections between the controller and the
101
        #: switches. The key for this dict is a tuple (ip, port). The content
102
        #: is a Connection
103 2
        self.connections = {}
104
        #: dict: mapping of events and event listeners.
105
        #:
106
        #: The key of the dict is a KytosEvent (or a string that represent a
107
        #: regex to match against KytosEvents) and the value is a list of
108
        #: methods that will receive the referenced event
109 2
        self.events_listeners = {'kytos/core.connection.new':
110
                                 [self.new_connection]}
111
112
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
113
        #:
114
        #: The key is the napp name (string), while the value is the napp
115
        #: instance itself.
116 2
        self.napps = {}
117
        #: Object generated by ParseArgs on config.py file
118 2
        self.options = options
119
        #: KytosServer: Instance of KytosServer that will be listening to TCP
120
        #: connections.
121 2
        self.server = None
122
        #: dict: Current existing switches.
123
        #:
124
        #: The key is the switch dpid, while the value is a Switch object.
125 2
        self.switches = {}  # dpid: Switch()
126 2
        self._switches_lock = threading.Lock()
127
128
        #: datetime.datetime: Time when the controller finished starting.
129 2
        self.started_at = None
130
131
        #: logging.Logger: Logger instance used by Kytos.
132 2
        self.log = None
133
134
        #: Observer that handle NApps when they are enabled or disabled.
135 2
        self.napp_dir_listener = NAppDirListener(self)
136
137 2
        self.napps_manager = NAppsManager(self)
138
139
        #: API Server used to expose rest endpoints.
140 2
        self.api_server = APIServer(__name__, self.options.listen,
141
                                    self.options.api_port,
142
                                    self.napps_manager, self.options.napps)
143
144 2
        self.auth = Auth(self)
145
146 2
        self._register_endpoints()
147
        #: Adding the napps 'enabled' directory into the PATH
148
        #: Now you can access the enabled napps with:
149
        #: from napps.<username>.<napp_name> import ?....
150 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
151 2
        sys.excepthook = exc_handler
152
153 2
    def enable_logs(self):
154
        """Register kytos log and enable the logs."""
155 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
156 2
        LogManager.enable_websocket(self.api_server.server)
157 2
        self.log = logging.getLogger(__name__)
158
159 2
    @staticmethod
160
    def loggers():
161
        """List all logging Loggers.
162
163
        Return a list of Logger objects, with name and logging level.
164
        """
165 2
        return [logging.getLogger(name)
166
                for name in logging.root.manager.loggerDict
167
                if "kytos" in name]
168
169 2
    def toggle_debug(self, name=None):
170
        """Enable/disable logging debug messages to a given logger name.
171
172
        If the name parameter is not specified the debug will be
173
        enabled/disabled following the initial config file. It will decide
174
        to enable/disable using the 'kytos' name to find the current
175
        logger level.
176
        Obs: To disable the debug the logging will be set to NOTSET
177
178
        Args:
179
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
180
        """
181 2
        if name and name not in logging.root.manager.loggerDict:
182
            # A Logger name that is not declared in logging will raise an error
183
            # otherwise logging would create a new Logger.
184 2
            raise ValueError(f"Invalid logger name: {name}")
185
186 2
        if not name:
187
            # Logger name not specified.
188 2
            level = logging.getLogger('kytos').getEffectiveLevel()
189 2
            enable_debug = level != logging.DEBUG
190
191
            # Enable/disable default Loggers
192 2
            LogManager.load_config_file(self.options.logging, enable_debug)
193 2
            return
194
195
        # Get effective logger level for the name
196 2
        level = logging.getLogger(name).getEffectiveLevel()
197 2
        logger = logging.getLogger(name)
198
199 2
        if level == logging.DEBUG:
200
            # disable debug
201 2
            logger.setLevel(logging.NOTSET)
202
        else:
203
            # enable debug
204 2
            logger.setLevel(logging.DEBUG)
205
206 2
    def start(self, restart=False):
207
        """Create pidfile and call start_controller method."""
208 2
        self.enable_logs()
209 2
        if not restart:
210 2
            self.create_pidfile()
211 2
        self.start_controller()
212
213 2
    def create_pidfile(self):
214
        """Create a pidfile."""
215 2
        pid = os.getpid()
216
217
        # Creates directory if it doesn't exist
218
        # System can erase /var/run's content
219 2
        pid_folder = Path(self.options.pidfile).parent
220 2
        self.log.info(pid_folder)
221
222
        # Pylint incorrectly infers Path objects
223
        # https://github.com/PyCQA/pylint/issues/224
224
        # pylint: disable=no-member
225 2
        if not pid_folder.exists():
226
            pid_folder.mkdir()
227
            pid_folder.chmod(0o1777)
228
        # pylint: enable=no-member
229
230
        # Make sure the file is deleted when controller stops
231 2
        atexit.register(Path(self.options.pidfile).unlink)
232
233
        # Checks if a pidfile exists. Creates a new file.
234 2
        try:
235 2
            pidfile = open(self.options.pidfile, mode='x')
236 2
        except OSError:
237
            # This happens if there is a pidfile already.
238
            # We shall check if the process that created the pidfile is still
239
            # running.
240 2
            try:
241 2
                existing_file = open(self.options.pidfile, mode='r')
242 2
                old_pid = int(existing_file.read())
243 2
                os.kill(old_pid, 0)
244
                # If kill() doesn't return an error, there's a process running
245
                # with the same PID. We assume it is Kytos and quit.
246
                # Otherwise, overwrite the file and proceed.
247
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
248
                             "running. Aborting.")
249
                sys.exit(error_msg.format(self.options.pidfile))
250 2
            except OSError:
251 2
                try:
252 2
                    pidfile = open(self.options.pidfile, mode='w')
253
                except OSError as exception:
254
                    error_msg = "Failed to create pidfile {}: {}."
255
                    sys.exit(error_msg.format(self.options.pidfile, exception))
256
257
        # Identifies the process that created the pidfile.
258 2
        pidfile.write(str(pid))
259 2
        pidfile.close()
260
261 2
    def start_controller(self):
262
        """Start the controller.
263
264
        Starts the KytosServer (TCP Server) coroutine.
265
        Starts a thread for each buffer handler.
266
        Load the installed apps.
267
        """
268 2
        self.log.info("Starting Kytos - Kytos Controller")
269 2
        self.server = KytosServer((self.options.listen,
270
                                   int(self.options.port)),
271
                                  KytosServerProtocol,
272
                                  self,
273
                                  self.options.protocol_name)
274
275 2
        self.log.info("Starting TCP server: %s", self.server)
276 2
        self.server.serve_forever()
277
278 2
        def _stop_loop(_):
279
            loop = asyncio.get_event_loop()
280
            # print(_.result())
281
            threads = threading.enumerate()
282
            self.log.debug("%s threads before loop.stop: %s",
283
                           len(threads), threads)
284
            loop.stop()
285
286 2
        async def _run_api_server_thread(executor):
287
            log = logging.getLogger('kytos.core.controller.api_server_thread')
288
            log.debug('starting')
289
            # log.debug('creating tasks')
290
            loop = asyncio.get_event_loop()
291
            blocking_tasks = [
292
                loop.run_in_executor(executor, self.api_server.run)
293
            ]
294
            # log.debug('waiting for tasks')
295
            completed, pending = await asyncio.wait(blocking_tasks)
296
            # results = [t.result() for t in completed]
297
            # log.debug('results: {!r}'.format(results))
298
            log.debug('completed: %d, pending: %d',
299
                      len(completed), len(pending))
300
301 2
        task = self._loop.create_task(self.raw_event_handler())
302 2
        self._tasks.append(task)
303 2
        task = self._loop.create_task(self.msg_in_event_handler())
304 2
        self._tasks.append(task)
305 2
        task = self._loop.create_task(self.msg_out_event_handler())
306 2
        self._tasks.append(task)
307 2
        task = self._loop.create_task(self.app_event_handler())
308 2
        self._tasks.append(task)
309 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
310 2
        task.add_done_callback(_stop_loop)
311 2
        self._tasks.append(task)
312
313 2
        self.log.info("ThreadPool started: %s", self._pool)
314
315
        # ASYNC TODO: ensure all threads started correctly
316
        # This is critical, if any of them failed starting we should exit.
317
        # sys.exit(error_msg.format(thread, exception))
318
319 2
        self.log.info("Loading Kytos NApps...")
320 2
        self.napp_dir_listener.start()
321 2
        self.pre_install_napps(self.options.napps_pre_installed)
322 2
        self.load_napps()
323
324 2
        self.started_at = now()
325
326 2
    def _register_endpoints(self):
327
        """Register all rest endpoint served by kytos.
328
329
        -   Register APIServer endpoints
330
        -   Register WebUI endpoints
331
        -   Register ``/api/kytos/core/config`` endpoint
332
        """
333 2
        self.api_server.start_api()
334
        # Register controller endpoints as /api/kytos/core/...
335 2
        self.api_server.register_core_endpoint('config/',
336
                                               self.configuration_endpoint)
337 2
        self.api_server.register_core_endpoint('metadata/',
338
                                               Controller.metadata_endpoint)
339 2
        self.api_server.register_core_endpoint(
340
            'reload/<username>/<napp_name>/',
341
            self.rest_reload_napp)
342 2
        self.api_server.register_core_endpoint('reload/all',
343
                                               self.rest_reload_all_napps)
344 2
        self.auth.register_core_auth_services()
345
346 2
    def register_rest_endpoint(self, url, function, methods):
347
        """Deprecate in favor of @rest decorator."""
348 2
        self.api_server.register_rest_endpoint(url, function, methods)
349
350 2
    @classmethod
351
    def metadata_endpoint(cls):
352
        """Return the Kytos metadata.
353
354
        Returns:
355
            string: Json with current kytos metadata.
356
357
        """
358 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
359 2
        meta_file = open(meta_path).read()
360 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
361 2
        return json.dumps(metadata)
362
363 2
    def configuration_endpoint(self):
364
        """Return the configuration options used by Kytos.
365
366
        Returns:
367
            string: Json with current configurations used by kytos.
368
369
        """
370 2
        return json.dumps(self.options.__dict__)
371
372 2
    def restart(self, graceful=True):
373
        """Restart Kytos SDN Controller.
374
375
        Args:
376
            graceful(bool): Represents the way that Kytos will restart.
377
        """
378 2
        if self.started_at is not None:
379 2
            self.stop(graceful)
380 2
            self.__init__(self.options)
381
382 2
        self.start(restart=True)
383
384 2
    def stop(self, graceful=True):
385
        """Shutdown all services used by kytos.
386
387
        This method should:
388
            - stop all Websockets
389
            - stop the API Server
390
            - stop the Controller
391
        """
392 2
        if self.started_at:
393 2
            self.stop_controller(graceful)
394
395 2
    def stop_controller(self, graceful=True):
396
        """Stop the controller.
397
398
        This method should:
399
            - announce on the network that the controller will shutdown;
400
            - stop receiving incoming packages;
401
            - call the 'shutdown' method of each KytosNApp that is running;
402
            - finish reading the events on all buffers;
403
            - stop each running handler;
404
            - stop all running threads;
405
            - stop the KytosServer;
406
        """
407 2
        self.log.info("Stopping Kytos")
408
409 2
        self.buffers.send_stop_signal()
410 2
        self.api_server.stop_api_server()
411 2
        self.napp_dir_listener.stop()
412
413 2
        self.log.info("Stopping threadpool: %s", self._pool)
414 2
        if executor_pool:
415 2
            self.log.info("Stopping threadpool: %s", executor_pool)
416
417 2
        threads = threading.enumerate()
418 2
        self.log.debug("%s threads before threadpool shutdown: %s",
419
                       len(threads), threads)
420
421 2
        try:
422
            # Python >= 3.9
423
            # pylint: disable=unexpected-keyword-arg
424 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
425 2
            if executor_pool:
426 2
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
427 2
        except TypeError:
428 2
            self._pool.shutdown(wait=graceful)
429 2
            if executor_pool:
430 2
                executor_pool.shutdown(wait=graceful)
431
432
        # self.server.socket.shutdown()
433
        # self.server.socket.close()
434
435
        # for thread in self._threads.values():
436
        #     self.log.info("Stopping thread: %s", thread.name)
437
        #     thread.join()
438
439
        # for thread in self._threads.values():
440
        #     while thread.is_alive():
441
        #         self.log.info("Thread is alive: %s", thread.name)
442
        #         pass
443
444 2
        self.started_at = None
445 2
        self.unload_napps()
446 2
        self.buffers = KytosBuffers()
447
448
        # Cancel all async tasks (event handlers and servers)
449 2
        for task in self._tasks:
450
            task.cancel()
451
452
        # ASYNC TODO: close connections
453
        # self.server.server_close()
454
455
        # Shutdown the TCP server and the main asyncio loop
456 2
        self.server.shutdown()
457
458 2
    def status(self):
459
        """Return status of Kytos Server.
460
461
        If the controller kytos is running this method will be returned
462
        "Running since 'Started_At'", otherwise "Stopped".
463
464
        Returns:
465
            string: String with kytos status.
466
467
        """
468 2
        if self.started_at:
469 2
            return "Running since %s" % self.started_at
470 2
        return "Stopped"
471
472 2
    def uptime(self):
473
        """Return the uptime of kytos server.
474
475
        This method should return:
476
            - 0 if Kytos Server is stopped.
477
            - (kytos.start_at - datetime.now) if Kytos Server is running.
478
479
        Returns:
480
           datetime.timedelta: The uptime interval.
481
482
        """
483 2
        return now() - self.started_at if self.started_at else 0
484
485 2
    def notify_listeners(self, event):
486
        """Send the event to the specified listeners.
487
488
        Loops over self.events_listeners matching (by regexp) the attribute
489
        name of the event with the keys of events_listeners. If a match occurs,
490
        then send the event to each registered listener.
491
492
        Args:
493
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
494
        """
495 2
        self.log.debug("looking for listeners for %s", event)
496 2
        for event_regex, listeners in dict(self.events_listeners).items():
497
            # self.log.debug("listeners found for %s: %r => %s", event,
498
            #                event_regex, [l.__qualname__ for l in listeners])
499
            # Do not match if the event has more characters
500
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
501 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
502 2
                event_regex += '$'
503 2
            if re.match(event_regex, event.name):
504
                # self.log.debug('Calling listeners for %s', event)
505 2
                for listener in listeners:
506 2
                    listener(event)
507
508 2
    async def raw_event_handler(self):
509
        """Handle raw events.
510
511
        Listen to the raw_buffer and send all its events to the
512
        corresponding listeners.
513
        """
514 2
        self.log.info("Raw Event Handler started")
515 2
        while True:
516 2
            event = await self.buffers.raw.aget()
517 2
            self.notify_listeners(event)
518 2
            self.log.debug("Raw Event handler called")
519
520 2
            if event.name == "kytos/core.shutdown":
521 2
                self.log.debug("Raw Event handler stopped")
522 2
                break
523
524 2
    async def msg_in_event_handler(self):
525
        """Handle msg_in events.
526
527
        Listen to the msg_in buffer and send all its events to the
528
        corresponding listeners.
529
        """
530 2
        self.log.info("Message In Event Handler started")
531 2
        while True:
532 2
            event = await self.buffers.msg_in.aget()
533 2
            self.notify_listeners(event)
534 2
            self.log.debug("Message In Event handler called")
535
536 2
            if event.name == "kytos/core.shutdown":
537 2
                self.log.debug("Message In Event handler stopped")
538 2
                break
539
540 2
    async def publish_connection_error(self, event):
541
        """Publish connection error event.
542
543
        Args:
544
            event (KytosEvent): Event that triggered for error propagation.
545
        """
546
        event.name = \
547
            f"kytos/core.{event.destination.protocol.name}.connection.error"
548
        error_msg = f"Connection state: {event.destination.state}"
549
        event.content["exception"] = error_msg
550
        await self.buffers.app.aput(event)
551
552 2
    async def msg_out_event_handler(self):
553
        """Handle msg_out events.
554
555
        Listen to the msg_out buffer and send all its events to the
556
        corresponding listeners.
557
        """
558 2
        self.log.info("Message Out Event Handler started")
559 2
        while True:
560 2
            triggered_event = await self.buffers.msg_out.aget()
561
562 2
            if triggered_event.name == "kytos/core.shutdown":
563 2
                self.log.debug("Message Out Event handler stopped")
564 2
                break
565
566 2
            message = triggered_event.content['message']
567 2
            destination = triggered_event.destination
568 2
            try:
569 2
                if (destination and
570
                        not destination.state == ConnectionState.FINISHED):
571 2
                    packet = message.pack()
572 2
                    destination.send(packet)
573 2
                    self.log.debug('Connection %s: OUT OFP, '
574
                                   'version: %s, type: %s, xid: %s - %s',
575
                                   destination.id,
576
                                   message.header.version,
577
                                   message.header.message_type,
578
                                   message.header.xid,
579
                                   packet.hex())
580 2
                    self.notify_listeners(triggered_event)
581 2
                    self.log.debug("Message Out Event handler called")
582 2
                    continue
583
584
            except (OSError, SocketError):
585
                pass
586
587
            await self.publish_connection_error(triggered_event)
588
            self.log.info("connection closed. Cannot send message")
589
590 2
    async def app_event_handler(self):
591
        """Handle app events.
592
593
        Listen to the app buffer and send all its events to the
594
        corresponding listeners.
595
        """
596 2
        self.log.info("App Event Handler started")
597 2
        while True:
598 2
            event = await self.buffers.app.aget()
599 2
            self.notify_listeners(event)
600 2
            self.log.debug("App Event handler called")
601
602 2
            if event.name == "kytos/core.shutdown":
603 2
                self.log.debug("App Event handler stopped")
604 2
                break
605
606 2
    def get_interface_by_id(self, interface_id):
607
        """Find a Interface  with interface_id.
608
609
        Args:
610
            interface_id(str): Interface Identifier.
611
612
        Returns:
613
            Interface: Instance of Interface with the id given.
614
615
        """
616 2
        if interface_id is None:
617 2
            return None
618
619 2
        switch_id = ":".join(interface_id.split(":")[:-1])
620 2
        interface_number = int(interface_id.split(":")[-1])
621
622 2
        switch = self.switches.get(switch_id)
623
624 2
        if not switch:
625 2
            return None
626
627 2
        return switch.interfaces.get(interface_number, None)
628
629 2
    def get_switch_by_dpid(self, dpid):
630
        """Return a specific switch by dpid.
631
632
        Args:
633
            dpid (|DPID|): dpid object used to identify a switch.
634
635
        Returns:
636
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
637
638
        """
639 2
        return self.switches.get(dpid)
640
641 2
    def get_switch_or_create(self, dpid, connection=None):
642
        """Return switch or create it if necessary.
643
644
        Args:
645
            dpid (|DPID|): dpid object used to identify a switch.
646
            connection (:class:`~kytos.core.connection.Connection`):
647
                connection used by switch. If a switch has a connection that
648
                will be updated.
649
650
        Returns:
651
            :class:`~kytos.core.switch.Switch`: new or existent switch.
652
653
        """
654 2
        with self._switches_lock:
655 2
            if connection:
656 2
                self.create_or_update_connection(connection)
657
658 2
            switch = self.get_switch_by_dpid(dpid)
659 2
            event_name = 'kytos/core.switch.'
660
661 2
            if switch is None:
662 2
                switch = Switch(dpid=dpid)
663 2
                self.add_new_switch(switch)
664 2
                event_name += 'new'
665
            else:
666 2
                event_name += 'reconnected'
667
668 2
            self.set_switch_options(dpid=dpid)
669 2
            event = KytosEvent(name=event_name, content={'switch': switch})
670
671 2
            if connection:
672 2
                old_connection = switch.connection
673 2
                switch.update_connection(connection)
674
675 2
                if old_connection is not connection:
676 2
                    self.remove_connection(old_connection)
677
678 2
            self.buffers.app.put(event)
679
680 2
            return switch
681
682 2
    def set_switch_options(self, dpid):
683
        """Update the switch settings based on kytos.conf options.
684
685
        Args:
686
            dpid (str): dpid used to identify a switch.
687
688
        """
689 2
        switch = self.switches.get(dpid)
690 2
        if not switch:
691
            return
692
693 2
        vlan_pool = {}
694 2
        vlan_pool = self.options.vlan_pool
695 2
        if not vlan_pool:
696 2
            return
697
698 2
        if vlan_pool.get(dpid):
699 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
700 2
            for intf_num, port_list in vlan_pool[dpid].items():
701 2
                if not switch.interfaces.get((intf_num)):
702 2
                    vlan_ids = set()
703 2
                    for vlan_range in port_list:
704 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
705 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
706 2
                            vlan_ids.add(vlan_id)
707 2
                    intf_num = int(intf_num)
708 2
                    intf = Interface(name=intf_num, port_number=intf_num,
709
                                     switch=switch)
710 2
                    intf.set_available_tags(vlan_ids)
711 2
                    switch.update_interface(intf)
712
713 2
    def create_or_update_connection(self, connection):
714
        """Update a connection.
715
716
        Args:
717
            connection (:class:`~kytos.core.connection.Connection`):
718
                Instance of connection that will be updated.
719
        """
720 2
        self.connections[connection.id] = connection
721
722 2
    def get_connection_by_id(self, conn_id):
723
        """Return a existent connection by id.
724
725
        Args:
726
            id (int): id from a connection.
727
728
        Returns:
729
            :class:`~kytos.core.connection.Connection`: Instance of connection
730
                or None Type.
731
732
        """
733 2
        return self.connections.get(conn_id)
734
735 2
    def remove_connection(self, connection):
736
        """Close a existent connection and remove it.
737
738
        Args:
739
            connection (:class:`~kytos.core.connection.Connection`):
740
                Instance of connection that will be removed.
741
        """
742 2
        if connection is None:
743 2
            return False
744
745 2
        try:
746 2
            connection.close()
747 2
            del self.connections[connection.id]
748 2
        except KeyError:
749 2
            return False
750 2
        return True
751
752 2
    def remove_switch(self, switch):
753
        """Remove an existent switch.
754
755
        Args:
756
            switch (:class:`~kytos.core.switch.Switch`):
757
                Instance of switch that will be removed.
758
        """
759 2
        try:
760 2
            del self.switches[switch.dpid]
761 2
        except KeyError:
762 2
            return False
763 2
        return True
764
765 2
    def new_connection(self, event):
766
        """Handle a kytos/core.connection.new event.
767
768
        This method will read new connection event and store the connection
769
        (socket) into the connections attribute on the controller.
770
771
        It also clear all references to the connection since it is a new
772
        connection on the same ip:port.
773
774
        Args:
775
            event (~kytos.core.KytosEvent):
776
                The received event (``kytos/core.connection.new``) with the
777
                needed info.
778
        """
779 2
        self.log.info("Handling %s...", event)
780
781 2
        connection = event.source
782 2
        self.log.debug("Event source: %s", event.source)
783
784
        # Remove old connection (aka cleanup) if it exists
785 2
        if self.get_connection_by_id(connection.id):
786
            self.remove_connection(connection.id)
787
788
        # Update connections with the new connection
789 2
        self.create_or_update_connection(connection)
790
791 2
    def add_new_switch(self, switch):
792
        """Add a new switch on the controller.
793
794
        Args:
795
            switch (Switch): A Switch object
796
        """
797 2
        self.switches[switch.dpid] = switch
798
799 2
    def _import_napp(self, username, napp_name):
800
        """Import a NApp module.
801
802
        Raises:
803
            FileNotFoundError: if NApp's main.py is not found.
804
            ModuleNotFoundError: if any NApp requirement is not installed.
805
806
        """
807 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
808 2
        path = os.path.join(self.options.napps, username, napp_name,
809
                            'main.py')
810 2
        napp_spec = spec_from_file_location(mod_name, path)
811 2
        napp_module = module_from_spec(napp_spec)
812 2
        sys.modules[napp_spec.name] = napp_module
813 2
        napp_spec.loader.exec_module(napp_module)
814 2
        return napp_module
815
816 2
    def load_napp(self, username, napp_name):
817
        """Load a single NApp.
818
819
        Args:
820
            username (str): NApp username (makes up NApp's path).
821
            napp_name (str): Name of the NApp to be loaded.
822
        """
823 2
        if (username, napp_name) in self.napps:
824 2
            message = 'NApp %s/%s was already loaded'
825 2
            self.log.warning(message, username, napp_name)
826 2
            return
827
828 2
        try:
829 2
            napp_module = self._import_napp(username, napp_name)
830 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
831 2
            self.log.error("Error loading NApp '%s/%s': %s",
832
                           username, napp_name, err)
833 2
            return
834 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
835 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
836 2
            self.log.warning(msg, err.filename)
837 2
            return
838
839 2
        try:
840 2
            napp = napp_module.Main(controller=self)
841 2
        except:  # noqa pylint: disable=bare-except
842 2
            self.log.critical("NApp initialization failed: %s/%s",
843
                              username, napp_name, exc_info=True)
844 2
            return
845
846 2
        self.napps[(username, napp_name)] = napp
847
848 2
        napp.start()
849 2
        self.api_server.authenticate_endpoints(napp)
850 2
        self.api_server.register_napp_endpoints(napp)
851
852
        # pylint: disable=protected-access
853 2
        for event, listeners in napp._listeners.items():
854
            self.events_listeners.setdefault(event, []).extend(listeners)
855
        # pylint: enable=protected-access
856
857 2
    def pre_install_napps(self, napps, enable=True):
858
        """Pre install and enable NApps.
859
860
        Before installing, it'll check if it's installed yet.
861
862
        Args:
863
            napps ([str]): List of NApps to be pre-installed and enabled.
864
        """
865 2
        all_napps = self.napps_manager.get_installed_napps()
866 2
        installed = [str(napp) for napp in all_napps]
867 2
        napps_diff = [napp for napp in napps if napp not in installed]
868 2
        for napp in napps_diff:
869 2
            self.napps_manager.install(napp, enable=enable)
870
871 2
    def load_napps(self):
872
        """Load all NApps enabled on the NApps dir."""
873 2
        for napp in self.napps_manager.get_enabled_napps():
874 2
            try:
875 2
                self.log.info("Loading NApp %s", napp.id)
876 2
                self.load_napp(napp.username, napp.name)
877
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
878
                self.log.error("Could not load NApp %s: %s",
879
                               napp.id, exception)
880
881 2
    def unload_napp(self, username, napp_name):
882
        """Unload a specific NApp.
883
884
        Args:
885
            username (str): NApp username.
886
            napp_name (str): Name of the NApp to be unloaded.
887
        """
888 2
        napp = self.napps.pop((username, napp_name), None)
889
890 2
        if napp is None:
891
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
892
        else:
893 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
894 2
            napp_id = NApp(username, napp_name).id
895 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
896 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
897
            # Call listener before removing it from events_listeners
898 2
            napp_shutdown_fn(event)
899
900
            # Remove rest endpoints from that napp
901 2
            self.api_server.remove_napp_endpoints(napp)
902
903
            # Removing listeners from that napp
904
            # pylint: disable=protected-access
905 2
            for event_type, napp_listeners in napp._listeners.items():
906
                event_listeners = self.events_listeners[event_type]
907
                for listener in napp_listeners:
908
                    event_listeners.remove(listener)
909
                if not event_listeners:
910
                    del self.events_listeners[event_type]
911
            # pylint: enable=protected-access
912
913 2
    def unload_napps(self):
914
        """Unload all loaded NApps that are not core NApps."""
915
        # list() is used here to avoid the error:
916
        # 'RuntimeError: dictionary changed size during iteration'
917
        # This is caused by looping over an dictionary while removing
918
        # items from it.
919
        for (username, napp_name) in list(self.napps.keys()):  # noqa
920
            self.unload_napp(username, napp_name)
921
922 2
    def reload_napp_module(self, username, napp_name, napp_file):
923
        """Reload a NApp Module."""
924 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
925 2
        try:
926 2
            napp_module = import_module(mod_name)
927 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
928 2
            self.log.error("Module '%s' not found", mod_name)
929 2
            raise
930 2
        try:
931 2
            napp_module = reload_module(napp_module)
932 2
        except ImportError as err:
933 2
            self.log.error("Error reloading NApp '%s/%s': %s",
934
                           username, napp_name, err)
935 2
            raise
936
937 2
    def reload_napp(self, username, napp_name):
938
        """Reload a NApp."""
939 2
        self.unload_napp(username, napp_name)
940 2
        try:
941 2
            self.reload_napp_module(username, napp_name, 'settings')
942 2
            self.reload_napp_module(username, napp_name, 'main')
943 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
944 2
            return 400
945 2
        self.log.info("NApp '%s/%s' successfully reloaded",
946
                      username, napp_name)
947 2
        self.load_napp(username, napp_name)
948 2
        return 200
949
950 2
    def rest_reload_napp(self, username, napp_name):
951
        """Request reload a NApp."""
952 2
        res = self.reload_napp(username, napp_name)
953 2
        return 'reloaded', res
954
955 2
    def rest_reload_all_napps(self):
956
        """Request reload all NApps."""
957 2
        for napp in self.napps:
958 2
            self.reload_napp(*napp)
959
        return 'reloaded', 200
960