Passed
Pull Request — master (#209)
by Vinicius
13:06 queued 09:04
created

Controller.reload_napp_module()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 13
nop 4
dl 0
loc 14
rs 9.75
c 0
b 0
f 0
ccs 12
cts 12
cp 1
crap 3
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29 1
from socket import error as SocketError
30
31 1
from kytos.core.api_server import APIServer
32 1
from kytos.core.apm import init_apm
33
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executor as executor_pool
44 1
from kytos.core.helpers import now
45 1
from kytos.core.interface import Interface
46 1
from kytos.core.logs import LogManager
47 1
from kytos.core.napps.base import NApp
48 1
from kytos.core.napps.manager import NAppsManager
49 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
50 1
from kytos.core.switch import Switch
51
52 1
__all__ = ('Controller',)
53
54
55 1
def exc_handler(_, exc, __):
56
    """Log uncaught exceptions.
57
58
    Args:
59
        exc_type (ignored): exception type
60
        exc: exception instance
61
        tb (ignored): traceback
62
    """
63
    logging.basicConfig(filename='errlog.log',
64
                        format='%(asctime)s:%(pathname)'
65
                        's:%(levelname)s:%(message)s')
66
    logging.exception('Uncaught Exception: %s', exc)
67
68
69 1
class Controller:
70
    """Main class of Kytos.
71
72
    The main responsibilities of this class are:
73
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
74
        - manage KytosNApps (install, load and unload);
75
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
76
        - manage which event should be sent to NApps methods;
77
        - manage the buffers handlers, considering one thread per handler.
78
    """
79
80
    # Created issue #568 for the disabled checks.
81
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
82 1
    def __init__(self, options=None, loop=None):
83
        """Init method of Controller class takes the parameters below.
84
85
        Args:
86
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
87
                instance of :class:`~kytos.core.config.KytosConfig` class.
88
        """
89 1
        if options is None:
90
            options = KytosConfig().options['daemon']
91
92 1
        self._loop = loop or asyncio.get_event_loop()
93 1
        self._pool = ThreadPoolExecutor(max_workers=1)
94
95
        # asyncio tasks
96 1
        self._tasks = []
97
98
        #: dict: keep the main threads of the controller (buffers and handler)
99 1
        self._threads = {}
100
        #: KytosBuffers: KytosBuffer object with Controller buffers
101 1
        self.buffers = KytosBuffers(loop=self._loop)
102
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
103
        #:
104
        #: This dict stores all connections between the controller and the
105
        #: switches. The key for this dict is a tuple (ip, port). The content
106
        #: is a Connection
107 1
        self.connections = {}
108
        #: dict: mapping of events and event listeners.
109
        #:
110
        #: The key of the dict is a KytosEvent (or a string that represent a
111
        #: regex to match against KytosEvents) and the value is a list of
112
        #: methods that will receive the referenced event
113 1
        self.events_listeners = {'kytos/core.connection.new':
114
                                 [self.new_connection]}
115
116
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
117
        #:
118
        #: The key is the napp name (string), while the value is the napp
119
        #: instance itself.
120 1
        self.napps = {}
121
        #: Object generated by ParseArgs on config.py file
122 1
        self.options = options
123
        #: KytosServer: Instance of KytosServer that will be listening to TCP
124
        #: connections.
125 1
        self.server = None
126
        #: dict: Current existing switches.
127
        #:
128
        #: The key is the switch dpid, while the value is a Switch object.
129 1
        self.switches = {}  # dpid: Switch()
130 1
        self._switches_lock = threading.Lock()
131
132
        #: datetime.datetime: Time when the controller finished starting.
133 1
        self.started_at = None
134
135
        #: logging.Logger: Logger instance used by Kytos.
136 1
        self.log = None
137
138
        #: Observer that handle NApps when they are enabled or disabled.
139 1
        self.napp_dir_listener = NAppDirListener(self)
140
141 1
        self.napps_manager = NAppsManager(self)
142
143
        #: API Server used to expose rest endpoints.
144 1
        self.api_server = APIServer(__name__, self.options.listen,
145
                                    self.options.api_port,
146
                                    self.napps_manager, self.options.napps)
147
148 1
        self.auth = Auth(self)
149 1
        self.dead_letter = DeadLetter(self)
150
151 1
        self._register_endpoints()
152
        #: Adding the napps 'enabled' directory into the PATH
153
        #: Now you can access the enabled napps with:
154
        #: from napps.<username>.<napp_name> import ?....
155 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
156 1
        sys.excepthook = exc_handler
157
158 1
    def enable_logs(self):
159
        """Register kytos log and enable the logs."""
160 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
161 1
        LogManager.enable_websocket(self.api_server.server)
162 1
        self.log = logging.getLogger(__name__)
163
164 1
    @staticmethod
165 1
    def loggers():
166
        """List all logging Loggers.
167
168
        Return a list of Logger objects, with name and logging level.
169
        """
170 1
        return [logging.getLogger(name)
171
                for name in logging.root.manager.loggerDict
172
                if "kytos" in name]
173
174 1
    def toggle_debug(self, name=None):
175
        """Enable/disable logging debug messages to a given logger name.
176
177
        If the name parameter is not specified the debug will be
178
        enabled/disabled following the initial config file. It will decide
179
        to enable/disable using the 'kytos' name to find the current
180
        logger level.
181
        Obs: To disable the debug the logging will be set to NOTSET
182
183
        Args:
184
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
185
        """
186 1
        if name and name not in logging.root.manager.loggerDict:
187
            # A Logger name that is not declared in logging will raise an error
188
            # otherwise logging would create a new Logger.
189 1
            raise ValueError(f"Invalid logger name: {name}")
190
191 1
        if not name:
192
            # Logger name not specified.
193 1
            level = logging.getLogger('kytos').getEffectiveLevel()
194 1
            enable_debug = level != logging.DEBUG
195
196
            # Enable/disable default Loggers
197 1
            LogManager.load_config_file(self.options.logging, enable_debug)
198 1
            return
199
200
        # Get effective logger level for the name
201 1
        level = logging.getLogger(name).getEffectiveLevel()
202 1
        logger = logging.getLogger(name)
203
204 1
        if level == logging.DEBUG:
205
            # disable debug
206 1
            logger.setLevel(logging.NOTSET)
207
        else:
208
            # enable debug
209 1
            logger.setLevel(logging.DEBUG)
210
211 1
    def start(self, restart=False):
212
        """Create pidfile and call start_controller method."""
213 1
        self.enable_logs()
214 1
        if self.options.database:
215 1
            self.db_conn_or_core_shutdown()
216 1
        if self.options.apm:
217 1
            self.init_apm_or_core_shutdown()
218 1
        if not restart:
219 1
            self.create_pidfile()
220 1
        self.start_controller()
221
222 1
    def create_pidfile(self):
223
        """Create a pidfile."""
224 1
        pid = os.getpid()
225
226
        # Creates directory if it doesn't exist
227
        # System can erase /var/run's content
228 1
        pid_folder = Path(self.options.pidfile).parent
229 1
        self.log.info(pid_folder)
230
231
        # Pylint incorrectly infers Path objects
232
        # https://github.com/PyCQA/pylint/issues/224
233
        # pylint: disable=no-member
234 1
        if not pid_folder.exists():
235
            pid_folder.mkdir()
236
            pid_folder.chmod(0o1777)
237
        # pylint: enable=no-member
238
239
        # Make sure the file is deleted when controller stops
240 1
        atexit.register(Path(self.options.pidfile).unlink)
241
242
        # Checks if a pidfile exists. Creates a new file.
243 1
        try:
244 1
            pidfile = open(self.options.pidfile, mode='x')
245 1
        except OSError:
246
            # This happens if there is a pidfile already.
247
            # We shall check if the process that created the pidfile is still
248
            # running.
249 1
            try:
250 1
                existing_file = open(self.options.pidfile, mode='r')
251 1
                old_pid = int(existing_file.read())
252 1
                os.kill(old_pid, 0)
253
                # If kill() doesn't return an error, there's a process running
254
                # with the same PID. We assume it is Kytos and quit.
255
                # Otherwise, overwrite the file and proceed.
256
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
257
                             "running. Aborting.")
258
                sys.exit(error_msg.format(self.options.pidfile))
259 1
            except OSError:
260 1
                try:
261 1
                    pidfile = open(self.options.pidfile, mode='w')
262
                except OSError as exception:
263
                    error_msg = "Failed to create pidfile {}: {}."
264
                    sys.exit(error_msg.format(self.options.pidfile, exception))
265
266
        # Identifies the process that created the pidfile.
267 1
        pidfile.write(str(pid))
268 1
        pidfile.close()
269
270 1
    def start_controller(self):
271
        """Start the controller.
272
273
        Starts the KytosServer (TCP Server) coroutine.
274
        Starts a thread for each buffer handler.
275
        Load the installed apps.
276
        """
277 1
        self.log.info("Starting Kytos - Kytos Controller")
278 1
        self.server = KytosServer((self.options.listen,
279
                                   int(self.options.port)),
280
                                  KytosServerProtocol,
281
                                  self,
282
                                  self.options.protocol_name)
283
284 1
        self.log.info("Starting TCP server: %s", self.server)
285 1
        self.server.serve_forever()
286
287 1
        def _stop_loop(_):
288
            loop = asyncio.get_event_loop()
289
            # print(_.result())
290
            threads = threading.enumerate()
291
            self.log.debug("%s threads before loop.stop: %s",
292
                           len(threads), threads)
293
            loop.stop()
294
295 1
        async def _run_api_server_thread(executor):
296
            log = logging.getLogger('kytos.core.controller.api_server_thread')
297
            log.debug('starting')
298
            # log.debug('creating tasks')
299
            loop = asyncio.get_event_loop()
300
            blocking_tasks = [
301
                loop.run_in_executor(executor, self.api_server.run)
302
            ]
303
            # log.debug('waiting for tasks')
304
            completed, pending = await asyncio.wait(blocking_tasks)
305
            # results = [t.result() for t in completed]
306
            # log.debug('results: {!r}'.format(results))
307
            log.debug('completed: %d, pending: %d',
308
                      len(completed), len(pending))
309
310 1
        task = self._loop.create_task(self.raw_event_handler())
311 1
        self._tasks.append(task)
312 1
        task = self._loop.create_task(self.msg_in_event_handler())
313 1
        self._tasks.append(task)
314 1
        task = self._loop.create_task(self.msg_out_event_handler())
315 1
        self._tasks.append(task)
316 1
        task = self._loop.create_task(self.app_event_handler())
317 1
        self._tasks.append(task)
318 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
319 1
        task.add_done_callback(_stop_loop)
320 1
        self._tasks.append(task)
321
322 1
        self.log.info("ThreadPool started: %s", self._pool)
323
324
        # ASYNC TODO: ensure all threads started correctly
325
        # This is critical, if any of them failed starting we should exit.
326
        # sys.exit(error_msg.format(thread, exception))
327
328 1
        self.log.info("Loading Kytos NApps...")
329 1
        self.napp_dir_listener.start()
330 1
        self.pre_install_napps(self.options.napps_pre_installed)
331 1
        self.load_napps()
332
333 1
        self.started_at = now()
334
335 1
    def db_conn_or_core_shutdown(self):
336
        """Ensure db connection or core shutdown."""
337 1
        try:
338 1
            db_conn_wait(db_backend=self.options.database)
339 1
        except KytosDBInitException as exc:
340 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
341
342 1
    def init_apm_or_core_shutdown(self, **kwargs):
343
        """Init APM instrumentation or core shutdown."""
344
        if not self.options.apm:
345
            return
346
        try:
347
            init_apm(self.options.apm, app=self.api_server.app,
348
                     **kwargs)
349
        except KytosAPMInitException as exc:
350
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
351
352 1
    def _register_endpoints(self):
353
        """Register all rest endpoint served by kytos.
354
355
        -   Register APIServer endpoints
356
        -   Register WebUI endpoints
357
        -   Register ``/api/kytos/core/config`` endpoint
358
        """
359 1
        self.api_server.start_api()
360
        # Register controller endpoints as /api/kytos/core/...
361 1
        self.api_server.register_core_endpoint('config/',
362
                                               self.configuration_endpoint)
363 1
        self.api_server.register_core_endpoint('metadata/',
364
                                               Controller.metadata_endpoint)
365 1
        self.api_server.register_core_endpoint(
366
            'reload/<username>/<napp_name>/',
367
            self.rest_reload_napp)
368 1
        self.api_server.register_core_endpoint('reload/all',
369
                                               self.rest_reload_all_napps)
370 1
        self.auth.register_core_auth_services()
371 1
        self.dead_letter.register_endpoints()
372
373 1
    def register_rest_endpoint(self, url, function, methods):
374
        """Deprecate in favor of @rest decorator."""
375 1
        self.api_server.register_rest_endpoint(url, function, methods)
376
377 1
    @classmethod
378 1
    def metadata_endpoint(cls):
379
        """Return the Kytos metadata.
380
381
        Returns:
382
            string: Json with current kytos metadata.
383
384
        """
385 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
386 1
        meta_file = open(meta_path).read()
387 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
388 1
        return json.dumps(metadata)
389
390 1
    def configuration_endpoint(self):
391
        """Return the configuration options used by Kytos.
392
393
        Returns:
394
            string: Json with current configurations used by kytos.
395
396
        """
397 1
        return json.dumps(self.options.__dict__)
398
399 1
    def restart(self, graceful=True):
400
        """Restart Kytos SDN Controller.
401
402
        Args:
403
            graceful(bool): Represents the way that Kytos will restart.
404
        """
405 1
        if self.started_at is not None:
406 1
            self.stop(graceful)
407 1
            self.__init__(self.options)
408
409 1
        self.start(restart=True)
410
411 1
    def stop(self, graceful=True):
412
        """Shutdown all services used by kytos.
413
414
        This method should:
415
            - stop all Websockets
416
            - stop the API Server
417
            - stop the Controller
418
        """
419 1
        if self.started_at:
420 1
            self.stop_controller(graceful)
421
422 1
    def stop_controller(self, graceful=True):
423
        """Stop the controller.
424
425
        This method should:
426
            - announce on the network that the controller will shutdown;
427
            - stop receiving incoming packages;
428
            - call the 'shutdown' method of each KytosNApp that is running;
429
            - finish reading the events on all buffers;
430
            - stop each running handler;
431
            - stop all running threads;
432
            - stop the KytosServer;
433
        """
434 1
        self.log.info("Stopping Kytos")
435
436 1
        self.buffers.send_stop_signal()
437 1
        self.api_server.stop_api_server()
438 1
        self.napp_dir_listener.stop()
439
440 1
        self.log.info("Stopping threadpool: %s", self._pool)
441 1
        if executor_pool:
442 1
            self.log.info("Stopping threadpool: %s", executor_pool)
443
444 1
        threads = threading.enumerate()
445 1
        self.log.debug("%s threads before threadpool shutdown: %s",
446
                       len(threads), threads)
447
448 1
        try:
449
            # Python >= 3.9
450
            # pylint: disable=unexpected-keyword-arg
451 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
452 1
            if executor_pool:
453 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
454
        except TypeError:
455
            self._pool.shutdown(wait=graceful)
456
            if executor_pool:
457
                executor_pool.shutdown(wait=graceful)
458
459
        # self.server.socket.shutdown()
460
        # self.server.socket.close()
461
462
        # for thread in self._threads.values():
463
        #     self.log.info("Stopping thread: %s", thread.name)
464
        #     thread.join()
465
466
        # for thread in self._threads.values():
467
        #     while thread.is_alive():
468
        #         self.log.info("Thread is alive: %s", thread.name)
469
        #         pass
470
471 1
        self.started_at = None
472 1
        self.unload_napps()
473 1
        self.buffers = KytosBuffers()
474
475
        # Cancel all async tasks (event handlers and servers)
476 1
        for task in self._tasks:
477
            task.cancel()
478
479
        # ASYNC TODO: close connections
480
        # self.server.server_close()
481
482
        # Shutdown the TCP server and the main asyncio loop
483 1
        self.server.shutdown()
484
485 1
    def status(self):
486
        """Return status of Kytos Server.
487
488
        If the controller kytos is running this method will be returned
489
        "Running since 'Started_At'", otherwise "Stopped".
490
491
        Returns:
492
            string: String with kytos status.
493
494
        """
495 1
        if self.started_at:
496 1
            return "Running since %s" % self.started_at
497 1
        return "Stopped"
498
499 1
    def uptime(self):
500
        """Return the uptime of kytos server.
501
502
        This method should return:
503
            - 0 if Kytos Server is stopped.
504
            - (kytos.start_at - datetime.now) if Kytos Server is running.
505
506
        Returns:
507
           datetime.timedelta: The uptime interval.
508
509
        """
510 1
        return now() - self.started_at if self.started_at else 0
511
512 1
    def notify_listeners(self, event):
513
        """Send the event to the specified listeners.
514
515
        Loops over self.events_listeners matching (by regexp) the attribute
516
        name of the event with the keys of events_listeners. If a match occurs,
517
        then send the event to each registered listener.
518
519
        Args:
520
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
521
        """
522 1
        self.log.debug("looking for listeners for %s", event)
523 1
        for event_regex, listeners in dict(self.events_listeners).items():
524
            # self.log.debug("listeners found for %s: %r => %s", event,
525
            #                event_regex, [l.__qualname__ for l in listeners])
526
            # Do not match if the event has more characters
527
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
528 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
529 1
                event_regex += '$'
530 1
            if re.match(event_regex, event.name):
531
                # self.log.debug('Calling listeners for %s', event)
532 1
                for listener in listeners:
533 1
                    listener(event)
534
535 1
    async def raw_event_handler(self):
536
        """Handle raw events.
537
538
        Listen to the raw_buffer and send all its events to the
539
        corresponding listeners.
540
        """
541 1
        self.log.info("Raw Event Handler started")
542
        while True:
543 1
            event = await self.buffers.raw.aget()
544 1
            self.notify_listeners(event)
545 1
            self.log.debug("Raw Event handler called")
546
547 1
            if event.name == "kytos/core.shutdown":
548 1
                self.log.debug("Raw Event handler stopped")
549 1
                break
550
551 1
    async def msg_in_event_handler(self):
552
        """Handle msg_in events.
553
554
        Listen to the msg_in buffer and send all its events to the
555
        corresponding listeners.
556
        """
557 1
        self.log.info("Message In Event Handler started")
558
        while True:
559 1
            event = await self.buffers.msg_in.aget()
560 1
            self.notify_listeners(event)
561 1
            self.log.debug("Message In Event handler called")
562
563 1
            if event.name == "kytos/core.shutdown":
564 1
                self.log.debug("Message In Event handler stopped")
565 1
                break
566
567 1
    async def publish_connection_error(self, event):
568
        """Publish connection error event.
569
570
        Args:
571
            event (KytosEvent): Event that triggered for error propagation.
572
        """
573
        event.name = \
574
            f"kytos/core.{event.destination.protocol.name}.connection.error"
575
        error_msg = f"Connection state: {event.destination.state}"
576
        event.content["exception"] = error_msg
577
        await self.buffers.app.aput(event)
578
579 1
    async def msg_out_event_handler(self):
580
        """Handle msg_out events.
581
582
        Listen to the msg_out buffer and send all its events to the
583
        corresponding listeners.
584
        """
585 1
        self.log.info("Message Out Event Handler started")
586
        while True:
587 1
            triggered_event = await self.buffers.msg_out.aget()
588
589 1
            if triggered_event.name == "kytos/core.shutdown":
590 1
                self.log.debug("Message Out Event handler stopped")
591 1
                break
592
593 1
            message = triggered_event.content['message']
594 1
            destination = triggered_event.destination
595 1
            try:
596 1
                if (destination and
597
                        not destination.state == ConnectionState.FINISHED):
598 1
                    packet = message.pack()
599 1
                    destination.send(packet)
600 1
                    self.log.debug('Connection %s: OUT OFP, '
601
                                   'version: %s, type: %s, xid: %s - %s',
602
                                   destination.id,
603
                                   message.header.version,
604
                                   message.header.message_type,
605
                                   message.header.xid,
606
                                   packet.hex())
607 1
                    self.notify_listeners(triggered_event)
608 1
                    self.log.debug("Message Out Event handler called")
609 1
                    continue
610
611
            except (OSError, SocketError):
612
                pass
613
614
            await self.publish_connection_error(triggered_event)
615
            self.log.info("connection closed. Cannot send message")
616
617 1
    async def app_event_handler(self):
618
        """Handle app events.
619
620
        Listen to the app buffer and send all its events to the
621
        corresponding listeners.
622
        """
623 1
        self.log.info("App Event Handler started")
624
        while True:
625 1
            event = await self.buffers.app.aget()
626 1
            self.notify_listeners(event)
627 1
            self.log.debug("App Event handler called")
628
629 1
            if event.name == "kytos/core.shutdown":
630 1
                self.log.debug("App Event handler stopped")
631 1
                break
632
633 1
    def get_interface_by_id(self, interface_id):
634
        """Find a Interface  with interface_id.
635
636
        Args:
637
            interface_id(str): Interface Identifier.
638
639
        Returns:
640
            Interface: Instance of Interface with the id given.
641
642
        """
643 1
        if interface_id is None:
644 1
            return None
645
646 1
        switch_id = ":".join(interface_id.split(":")[:-1])
647 1
        interface_number = int(interface_id.split(":")[-1])
648
649 1
        switch = self.switches.get(switch_id)
650
651 1
        if not switch:
652 1
            return None
653
654 1
        return switch.interfaces.get(interface_number, None)
655
656 1
    def get_switch_by_dpid(self, dpid):
657
        """Return a specific switch by dpid.
658
659
        Args:
660
            dpid (|DPID|): dpid object used to identify a switch.
661
662
        Returns:
663
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
664
665
        """
666 1
        return self.switches.get(dpid)
667
668 1
    def get_switch_or_create(self, dpid, connection=None):
669
        """Return switch or create it if necessary.
670
671
        Args:
672
            dpid (|DPID|): dpid object used to identify a switch.
673
            connection (:class:`~kytos.core.connection.Connection`):
674
                connection used by switch. If a switch has a connection that
675
                will be updated.
676
677
        Returns:
678
            :class:`~kytos.core.switch.Switch`: new or existent switch.
679
680
        """
681 1
        with self._switches_lock:
682 1
            if connection:
683 1
                self.create_or_update_connection(connection)
684
685 1
            switch = self.get_switch_by_dpid(dpid)
686 1
            event_name = 'kytos/core.switch.'
687
688 1
            if switch is None:
689 1
                switch = Switch(dpid=dpid)
690 1
                self.add_new_switch(switch)
691 1
                event_name += 'new'
692
            else:
693 1
                event_name += 'reconnected'
694
695 1
            self.set_switch_options(dpid=dpid)
696 1
            event = KytosEvent(name=event_name, content={'switch': switch})
697
698 1
            if connection:
699 1
                old_connection = switch.connection
700 1
                switch.update_connection(connection)
701
702 1
                if old_connection is not connection:
703 1
                    self.remove_connection(old_connection)
704
705 1
            self.buffers.app.put(event)
706
707 1
            return switch
708
709 1
    def set_switch_options(self, dpid):
710
        """Update the switch settings based on kytos.conf options.
711
712
        Args:
713
            dpid (str): dpid used to identify a switch.
714
715
        """
716 1
        switch = self.switches.get(dpid)
717 1
        if not switch:
718
            return
719
720 1
        vlan_pool = {}
721 1
        vlan_pool = self.options.vlan_pool
722 1
        if not vlan_pool:
723 1
            return
724
725 1
        if vlan_pool.get(dpid):
726 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
727 1
            for intf_num, port_list in vlan_pool[dpid].items():
728 1
                if not switch.interfaces.get((intf_num)):
729 1
                    vlan_ids = set()
730 1
                    for vlan_range in port_list:
731 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
732 1
                        for vlan_id in range(vlan_begin, vlan_end):
733 1
                            vlan_ids.add(vlan_id)
734 1
                    intf_num = int(intf_num)
735 1
                    intf = Interface(name=intf_num, port_number=intf_num,
736
                                     switch=switch)
737 1
                    intf.set_available_tags(vlan_ids)
738 1
                    switch.update_interface(intf)
739
740 1
    def create_or_update_connection(self, connection):
741
        """Update a connection.
742
743
        Args:
744
            connection (:class:`~kytos.core.connection.Connection`):
745
                Instance of connection that will be updated.
746
        """
747 1
        self.connections[connection.id] = connection
748
749 1
    def get_connection_by_id(self, conn_id):
750
        """Return a existent connection by id.
751
752
        Args:
753
            id (int): id from a connection.
754
755
        Returns:
756
            :class:`~kytos.core.connection.Connection`: Instance of connection
757
                or None Type.
758
759
        """
760 1
        return self.connections.get(conn_id)
761
762 1
    def remove_connection(self, connection):
763
        """Close a existent connection and remove it.
764
765
        Args:
766
            connection (:class:`~kytos.core.connection.Connection`):
767
                Instance of connection that will be removed.
768
        """
769 1
        if connection is None:
770 1
            return False
771
772 1
        try:
773 1
            connection.close()
774 1
            del self.connections[connection.id]
775 1
        except KeyError:
776 1
            return False
777 1
        return True
778
779 1
    def remove_switch(self, switch):
780
        """Remove an existent switch.
781
782
        Args:
783
            switch (:class:`~kytos.core.switch.Switch`):
784
                Instance of switch that will be removed.
785
        """
786 1
        try:
787 1
            del self.switches[switch.dpid]
788 1
        except KeyError:
789 1
            return False
790 1
        return True
791
792 1
    def new_connection(self, event):
793
        """Handle a kytos/core.connection.new event.
794
795
        This method will read new connection event and store the connection
796
        (socket) into the connections attribute on the controller.
797
798
        It also clear all references to the connection since it is a new
799
        connection on the same ip:port.
800
801
        Args:
802
            event (~kytos.core.KytosEvent):
803
                The received event (``kytos/core.connection.new``) with the
804
                needed info.
805
        """
806 1
        self.log.info("Handling %s...", event)
807
808 1
        connection = event.source
809 1
        self.log.debug("Event source: %s", event.source)
810
811
        # Remove old connection (aka cleanup) if it exists
812 1
        if self.get_connection_by_id(connection.id):
813
            self.remove_connection(connection.id)
814
815
        # Update connections with the new connection
816 1
        self.create_or_update_connection(connection)
817
818 1
    def add_new_switch(self, switch):
819
        """Add a new switch on the controller.
820
821
        Args:
822
            switch (Switch): A Switch object
823
        """
824 1
        self.switches[switch.dpid] = switch
825
826 1
    def _import_napp(self, username, napp_name):
827
        """Import a NApp module.
828
829
        Raises:
830
            FileNotFoundError: if NApp's main.py is not found.
831
            ModuleNotFoundError: if any NApp requirement is not installed.
832
833
        """
834 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
835 1
        path = os.path.join(self.options.napps, username, napp_name,
836
                            'main.py')
837 1
        napp_spec = spec_from_file_location(mod_name, path)
838 1
        napp_module = module_from_spec(napp_spec)
839 1
        sys.modules[napp_spec.name] = napp_module
840 1
        napp_spec.loader.exec_module(napp_module)
841 1
        return napp_module
842
843 1
    def load_napp(self, username, napp_name):
844
        """Load a single NApp.
845
846
        Args:
847
            username (str): NApp username (makes up NApp's path).
848
            napp_name (str): Name of the NApp to be loaded.
849
        """
850 1
        if (username, napp_name) in self.napps:
851 1
            message = 'NApp %s/%s was already loaded'
852 1
            self.log.warning(message, username, napp_name)
853 1
            return
854
855 1
        try:
856 1
            napp_module = self._import_napp(username, napp_name)
857 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
858 1
            self.log.error("Error loading NApp '%s/%s': %s",
859
                           username, napp_name, err)
860 1
            return
861 1
        except FileNotFoundError as err:
862 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
863 1
            self.log.warning(msg, err.filename)
864 1
            return
865
866 1
        try:
867 1
            napp = napp_module.Main(controller=self)
868 1
        except:  # noqa pylint: disable=bare-except
869 1
            self.log.critical("NApp initialization failed: %s/%s",
870
                              username, napp_name, exc_info=True)
871 1
            return
872
873 1
        self.napps[(username, napp_name)] = napp
874
875 1
        napp.start()
876 1
        self.api_server.authenticate_endpoints(napp)
877 1
        self.api_server.register_napp_endpoints(napp)
878
879
        # pylint: disable=protected-access
880 1
        for event, listeners in napp._listeners.items():
881
            self.events_listeners.setdefault(event, []).extend(listeners)
882
        # pylint: enable=protected-access
883
884 1
    def pre_install_napps(self, napps, enable=True):
885
        """Pre install and enable NApps.
886
887
        Before installing, it'll check if it's installed yet.
888
889
        Args:
890
            napps ([str]): List of NApps to be pre-installed and enabled.
891
        """
892 1
        all_napps = self.napps_manager.get_installed_napps()
893 1
        installed = [str(napp) for napp in all_napps]
894 1
        napps_diff = [napp for napp in napps if napp not in installed]
895 1
        for napp in napps_diff:
896 1
            self.napps_manager.install(napp, enable=enable)
897
898 1
    def load_napps(self):
899
        """Load all NApps enabled on the NApps dir."""
900 1
        for napp in self.napps_manager.get_enabled_napps():
901 1
            try:
902 1
                self.log.info("Loading NApp %s", napp.id)
903 1
                self.load_napp(napp.username, napp.name)
904
            except FileNotFoundError as exception:
905
                self.log.error("Could not load NApp %s: %s",
906
                               napp.id, exception)
907
908 1
    def unload_napp(self, username, napp_name):
909
        """Unload a specific NApp.
910
911
        Args:
912
            username (str): NApp username.
913
            napp_name (str): Name of the NApp to be unloaded.
914
        """
915 1
        napp = self.napps.pop((username, napp_name), None)
916
917 1
        if napp is None:
918
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
919
        else:
920 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
921 1
            napp_id = NApp(username, napp_name).id
922 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
923 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
924
            # Call listener before removing it from events_listeners
925 1
            napp_shutdown_fn(event)
926
927
            # Remove rest endpoints from that napp
928 1
            self.api_server.remove_napp_endpoints(napp)
929
930
            # Removing listeners from that napp
931
            # pylint: disable=protected-access
932 1
            for event_type, napp_listeners in napp._listeners.items():
933
                event_listeners = self.events_listeners[event_type]
934
                for listener in napp_listeners:
935
                    event_listeners.remove(listener)
936
                if not event_listeners:
937
                    del self.events_listeners[event_type]
938
            # pylint: enable=protected-access
939
940 1
    def unload_napps(self):
941
        """Unload all loaded NApps that are not core NApps."""
942
        # list() is used here to avoid the error:
943
        # 'RuntimeError: dictionary changed size during iteration'
944
        # This is caused by looping over an dictionary while removing
945
        # items from it.
946
        for (username, napp_name) in list(self.napps.keys()):  # noqa
947
            self.unload_napp(username, napp_name)
948
949 1
    def reload_napp_module(self, username, napp_name, napp_file):
950
        """Reload a NApp Module."""
951 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
952 1
        try:
953 1
            napp_module = import_module(mod_name)
954 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
955 1
            self.log.error("Module '%s' not found", mod_name)
956 1
            raise
957 1
        try:
958 1
            napp_module = reload_module(napp_module)
959 1
        except ImportError as err:
960 1
            self.log.error("Error reloading NApp '%s/%s': %s",
961
                           username, napp_name, err)
962 1
            raise
963
964 1
    def reload_napp(self, username, napp_name):
965
        """Reload a NApp."""
966 1
        self.unload_napp(username, napp_name)
967 1
        try:
968 1
            self.reload_napp_module(username, napp_name, 'settings')
969 1
            self.reload_napp_module(username, napp_name, 'main')
970 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
971 1
            return 400
972 1
        self.log.info("NApp '%s/%s' successfully reloaded",
973
                      username, napp_name)
974 1
        self.load_napp(username, napp_name)
975 1
        return 200
976
977 1
    def rest_reload_napp(self, username, napp_name):
978
        """Request reload a NApp."""
979 1
        res = self.reload_napp(username, napp_name)
980 1
        return 'reloaded', res
981
982 1
    def rest_reload_all_napps(self):
983
        """Request reload all NApps."""
984 1
        for napp in self.napps:
985 1
            self.reload_napp(*napp)
986
        return 'reloaded', 200
987