Passed
Push — master ( 59f51d...f8912e )
by Vinicius
06:12 queued 12s
created

Controller.pre_install_napps()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 3
dl 0
loc 13
rs 10
c 0
b 0
f 0
ccs 6
cts 6
cp 1
crap 2
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 2
import asyncio
17 2
import atexit
18 2
import json
19 2
import logging
20 2
import os
21 2
import re
22 2
import sys
23 2
import threading
24 2
from concurrent.futures import ThreadPoolExecutor
25 2
from importlib import import_module
26 2
from importlib import reload as reload_module
27 2
from importlib.util import module_from_spec, spec_from_file_location
28 2
from pathlib import Path
29 2
from socket import error as SocketError
30
31 2
from kytos.core.api_server import APIServer
32
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
33 2
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
34 2
from kytos.core.auth import Auth
35 2
from kytos.core.buffers import KytosBuffers
36 2
from kytos.core.config import KytosConfig
37 2
from kytos.core.connection import ConnectionState
38 2
from kytos.core.events import KytosEvent
39 2
from kytos.core.helpers import now
40 2
from kytos.core.interface import Interface
41 2
from kytos.core.logs import LogManager
42 2
from kytos.core.napps.base import NApp
43 2
from kytos.core.napps.manager import NAppsManager
44 2
from kytos.core.napps.napp_dir_listener import NAppDirListener
45 2
from kytos.core.switch import Switch
46
47 2
__all__ = ('Controller',)
48
49
50 2
def exc_handler(_, exc, __):
51
    """Log uncaught exceptions.
52
53
    Args:
54
        exc_type (ignored): exception type
55
        exc: exception instance
56
        tb (ignored): traceback
57
    """
58
    logging.basicConfig(filename='errlog.log',
59
                        format='%(asctime)s:%(pathname)'
60
                        's:%(levelname)s:%(message)s')
61
    logging.exception('Uncaught Exception: %s', exc)
62
63
64 2
class Controller:
65
    """Main class of Kytos.
66
67
    The main responsibilities of this class are:
68
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
69
        - manage KytosNApps (install, load and unload);
70
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
71
        - manage which event should be sent to NApps methods;
72
        - manage the buffers handlers, considering one thread per handler.
73
    """
74
75
    # Created issue #568 for the disabled checks.
76
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
77 2
    def __init__(self, options=None, loop=None):
78
        """Init method of Controller class takes the parameters below.
79
80
        Args:
81
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
82
                instance of :class:`~kytos.core.config.KytosConfig` class.
83
        """
84 2
        if options is None:
85
            options = KytosConfig().options['daemon']
86
87 2
        self._loop = loop or asyncio.get_event_loop()
88 2
        self._pool = ThreadPoolExecutor(max_workers=1)
89
        # asyncio tasks
90 2
        self._tasks = []
91
92
        #: dict: keep the main threads of the controller (buffers and handler)
93 2
        self._threads = {}
94
        #: KytosBuffers: KytosBuffer object with Controller buffers
95 2
        self.buffers = KytosBuffers(loop=self._loop)
96
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
97
        #:
98
        #: This dict stores all connections between the controller and the
99
        #: switches. The key for this dict is a tuple (ip, port). The content
100
        #: is a Connection
101 2
        self.connections = {}
102
        #: dict: mapping of events and event listeners.
103
        #:
104
        #: The key of the dict is a KytosEvent (or a string that represent a
105
        #: regex to match against KytosEvents) and the value is a list of
106
        #: methods that will receive the referenced event
107 2
        self.events_listeners = {'kytos/core.connection.new':
108
                                 [self.new_connection]}
109
110
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
111
        #:
112
        #: The key is the napp name (string), while the value is the napp
113
        #: instance itself.
114 2
        self.napps = {}
115
        #: Object generated by ParseArgs on config.py file
116 2
        self.options = options
117
        #: KytosServer: Instance of KytosServer that will be listening to TCP
118
        #: connections.
119 2
        self.server = None
120
        #: dict: Current existing switches.
121
        #:
122
        #: The key is the switch dpid, while the value is a Switch object.
123 2
        self.switches = {}  # dpid: Switch()
124 2
        self._switches_lock = threading.Lock()
125
126
        #: datetime.datetime: Time when the controller finished starting.
127 2
        self.started_at = None
128
129
        #: logging.Logger: Logger instance used by Kytos.
130 2
        self.log = None
131
132
        #: Observer that handle NApps when they are enabled or disabled.
133 2
        self.napp_dir_listener = NAppDirListener(self)
134
135 2
        self.napps_manager = NAppsManager(self)
136
137
        #: API Server used to expose rest endpoints.
138 2
        self.api_server = APIServer(__name__, self.options.listen,
139
                                    self.options.api_port,
140
                                    self.napps_manager, self.options.napps)
141
142 2
        self.auth = Auth(self)
143
144 2
        self._register_endpoints()
145
        #: Adding the napps 'enabled' directory into the PATH
146
        #: Now you can access the enabled napps with:
147
        #: from napps.<username>.<napp_name> import ?....
148 2
        sys.path.append(os.path.join(self.options.napps, os.pardir))
149 2
        sys.excepthook = exc_handler
150
151 2
    def enable_logs(self):
152
        """Register kytos log and enable the logs."""
153 2
        LogManager.load_config_file(self.options.logging, self.options.debug)
154 2
        LogManager.enable_websocket(self.api_server.server)
155 2
        self.log = logging.getLogger(__name__)
156
157 2
    @staticmethod
158
    def loggers():
159
        """List all logging Loggers.
160
161
        Return a list of Logger objects, with name and logging level.
162
        """
163 2
        return [logging.getLogger(name)
164
                for name in logging.root.manager.loggerDict
165
                if "kytos" in name]
166
167 2
    def toggle_debug(self, name=None):
168
        """Enable/disable logging debug messages to a given logger name.
169
170
        If the name parameter is not specified the debug will be
171
        enabled/disabled following the initial config file. It will decide
172
        to enable/disable using the 'kytos' name to find the current
173
        logger level.
174
        Obs: To disable the debug the logging will be set to NOTSET
175
176
        Args:
177
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
178
        """
179 2
        if name and name not in logging.root.manager.loggerDict:
180
            # A Logger name that is not declared in logging will raise an error
181
            # otherwise logging would create a new Logger.
182 2
            raise ValueError(f"Invalid logger name: {name}")
183
184 2
        if not name:
185
            # Logger name not specified.
186 2
            level = logging.getLogger('kytos').getEffectiveLevel()
187 2
            enable_debug = level != logging.DEBUG
188
189
            # Enable/disable default Loggers
190 2
            LogManager.load_config_file(self.options.logging, enable_debug)
191 2
            return
192
193
        # Get effective logger level for the name
194 2
        level = logging.getLogger(name).getEffectiveLevel()
195 2
        logger = logging.getLogger(name)
196
197 2
        if level == logging.DEBUG:
198
            # disable debug
199 2
            logger.setLevel(logging.NOTSET)
200
        else:
201
            # enable debug
202 2
            logger.setLevel(logging.DEBUG)
203
204 2
    def start(self, restart=False):
205
        """Create pidfile and call start_controller method."""
206 2
        self.enable_logs()
207 2
        if not restart:
208 2
            self.create_pidfile()
209 2
        self.start_controller()
210
211 2
    def create_pidfile(self):
212
        """Create a pidfile."""
213 2
        pid = os.getpid()
214
215
        # Creates directory if it doesn't exist
216
        # System can erase /var/run's content
217 2
        pid_folder = Path(self.options.pidfile).parent
218 2
        self.log.info(pid_folder)
219
220
        # Pylint incorrectly infers Path objects
221
        # https://github.com/PyCQA/pylint/issues/224
222
        # pylint: disable=no-member
223 2
        if not pid_folder.exists():
224
            pid_folder.mkdir()
225
            pid_folder.chmod(0o1777)
226
        # pylint: enable=no-member
227
228
        # Make sure the file is deleted when controller stops
229 2
        atexit.register(Path(self.options.pidfile).unlink)
230
231
        # Checks if a pidfile exists. Creates a new file.
232 2
        try:
233 2
            pidfile = open(self.options.pidfile, mode='x')
234 2
        except OSError:
235
            # This happens if there is a pidfile already.
236
            # We shall check if the process that created the pidfile is still
237
            # running.
238 2
            try:
239 2
                existing_file = open(self.options.pidfile, mode='r')
240 2
                old_pid = int(existing_file.read())
241 2
                os.kill(old_pid, 0)
242
                # If kill() doesn't return an error, there's a process running
243
                # with the same PID. We assume it is Kytos and quit.
244
                # Otherwise, overwrite the file and proceed.
245
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
246
                             "running. Aborting.")
247
                sys.exit(error_msg.format(self.options.pidfile))
248 2
            except OSError:
249 2
                try:
250 2
                    pidfile = open(self.options.pidfile, mode='w')
251
                except OSError as exception:
252
                    error_msg = "Failed to create pidfile {}: {}."
253
                    sys.exit(error_msg.format(self.options.pidfile, exception))
254
255
        # Identifies the process that created the pidfile.
256 2
        pidfile.write(str(pid))
257 2
        pidfile.close()
258
259 2
    def start_controller(self):
260
        """Start the controller.
261
262
        Starts the KytosServer (TCP Server) coroutine.
263
        Starts a thread for each buffer handler.
264
        Load the installed apps.
265
        """
266 2
        self.log.info("Starting Kytos - Kytos Controller")
267 2
        self.server = KytosServer((self.options.listen,
268
                                   int(self.options.port)),
269
                                  KytosServerProtocol,
270
                                  self,
271
                                  self.options.protocol_name)
272
273 2
        self.log.info("Starting TCP server: %s", self.server)
274 2
        self.server.serve_forever()
275
276 2
        def _stop_loop(_):
277
            loop = asyncio.get_event_loop()
278
            # print(_.result())
279
            threads = threading.enumerate()
280
            self.log.debug("%s threads before loop.stop: %s",
281
                           len(threads), threads)
282
            loop.stop()
283
284 2
        async def _run_api_server_thread(executor):
285
            log = logging.getLogger('kytos.core.controller.api_server_thread')
286
            log.debug('starting')
287
            # log.debug('creating tasks')
288
            loop = asyncio.get_event_loop()
289
            blocking_tasks = [
290
                loop.run_in_executor(executor, self.api_server.run)
291
            ]
292
            # log.debug('waiting for tasks')
293
            completed, pending = await asyncio.wait(blocking_tasks)
294
            # results = [t.result() for t in completed]
295
            # log.debug('results: {!r}'.format(results))
296
            log.debug('completed: %d, pending: %d',
297
                      len(completed), len(pending))
298
299 2
        task = self._loop.create_task(self.raw_event_handler())
300 2
        self._tasks.append(task)
301 2
        task = self._loop.create_task(self.msg_in_event_handler())
302 2
        self._tasks.append(task)
303 2
        task = self._loop.create_task(self.msg_out_event_handler())
304 2
        self._tasks.append(task)
305 2
        task = self._loop.create_task(self.app_event_handler())
306 2
        self._tasks.append(task)
307 2
        task = self._loop.create_task(_run_api_server_thread(self._pool))
308 2
        task.add_done_callback(_stop_loop)
309 2
        self._tasks.append(task)
310
311 2
        self.log.info("ThreadPool started: %s", self._pool)
312
313
        # ASYNC TODO: ensure all threads started correctly
314
        # This is critical, if any of them failed starting we should exit.
315
        # sys.exit(error_msg.format(thread, exception))
316
317 2
        self.log.info("Loading Kytos NApps...")
318 2
        self.napp_dir_listener.start()
319 2
        self.pre_install_napps(self.options.napps_pre_installed)
320 2
        self.load_napps()
321
322 2
        self.started_at = now()
323
324 2
    def _register_endpoints(self):
325
        """Register all rest endpoint served by kytos.
326
327
        -   Register APIServer endpoints
328
        -   Register WebUI endpoints
329
        -   Register ``/api/kytos/core/config`` endpoint
330
        """
331 2
        self.api_server.start_api()
332
        # Register controller endpoints as /api/kytos/core/...
333 2
        self.api_server.register_core_endpoint('config/',
334
                                               self.configuration_endpoint)
335 2
        self.api_server.register_core_endpoint('metadata/',
336
                                               Controller.metadata_endpoint)
337 2
        self.api_server.register_core_endpoint(
338
            'reload/<username>/<napp_name>/',
339
            self.rest_reload_napp)
340 2
        self.api_server.register_core_endpoint('reload/all',
341
                                               self.rest_reload_all_napps)
342 2
        self.auth.register_core_auth_services()
343
344 2
    def register_rest_endpoint(self, url, function, methods):
345
        """Deprecate in favor of @rest decorator."""
346 2
        self.api_server.register_rest_endpoint(url, function, methods)
347
348 2
    @classmethod
349
    def metadata_endpoint(cls):
350
        """Return the Kytos metadata.
351
352
        Returns:
353
            string: Json with current kytos metadata.
354
355
        """
356 2
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
357 2
        meta_file = open(meta_path).read()
358 2
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
359 2
        return json.dumps(metadata)
360
361 2
    def configuration_endpoint(self):
362
        """Return the configuration options used by Kytos.
363
364
        Returns:
365
            string: Json with current configurations used by kytos.
366
367
        """
368 2
        return json.dumps(self.options.__dict__)
369
370 2
    def restart(self, graceful=True):
371
        """Restart Kytos SDN Controller.
372
373
        Args:
374
            graceful(bool): Represents the way that Kytos will restart.
375
        """
376 2
        if self.started_at is not None:
377 2
            self.stop(graceful)
378 2
            self.__init__(self.options)
379
380 2
        self.start(restart=True)
381
382 2
    def stop(self, graceful=True):
383
        """Shutdown all services used by kytos.
384
385
        This method should:
386
            - stop all Websockets
387
            - stop the API Server
388
            - stop the Controller
389
        """
390 2
        if self.started_at:
391 2
            self.stop_controller(graceful)
392
393 2
    def stop_controller(self, graceful=True):
394
        """Stop the controller.
395
396
        This method should:
397
            - announce on the network that the controller will shutdown;
398
            - stop receiving incoming packages;
399
            - call the 'shutdown' method of each KytosNApp that is running;
400
            - finish reading the events on all buffers;
401
            - stop each running handler;
402
            - stop all running threads;
403
            - stop the KytosServer;
404
        """
405 2
        self.log.info("Stopping Kytos")
406
407 2
        self.buffers.send_stop_signal()
408 2
        self.api_server.stop_api_server()
409 2
        self.napp_dir_listener.stop()
410
411 2
        self.log.info("Stopping threadpool: %s", self._pool)
412
413 2
        threads = threading.enumerate()
414 2
        self.log.debug("%s threads before threadpool shutdown: %s",
415
                       len(threads), threads)
416
417 2
        try:
418
            # Python >= 3.9
419
            # pylint: disable=unexpected-keyword-arg
420 2
            self._pool.shutdown(wait=graceful, cancel_futures=True)
421
            # pylint: enable=unexpected-keyword-arg
422
        except TypeError:
423
            self._pool.shutdown(wait=graceful)
424
425
        # self.server.socket.shutdown()
426
        # self.server.socket.close()
427
428
        # for thread in self._threads.values():
429
        #     self.log.info("Stopping thread: %s", thread.name)
430
        #     thread.join()
431
432
        # for thread in self._threads.values():
433
        #     while thread.is_alive():
434
        #         self.log.info("Thread is alive: %s", thread.name)
435
        #         pass
436
437 2
        self.started_at = None
438 2
        self.unload_napps()
439 2
        self.buffers = KytosBuffers()
440
441
        # Cancel all async tasks (event handlers and servers)
442 2
        for task in self._tasks:
443
            task.cancel()
444
445
        # ASYNC TODO: close connections
446
        # self.server.server_close()
447
448
        # Shutdown the TCP server and the main asyncio loop
449 2
        self.server.shutdown()
450
451 2
    def status(self):
452
        """Return status of Kytos Server.
453
454
        If the controller kytos is running this method will be returned
455
        "Running since 'Started_At'", otherwise "Stopped".
456
457
        Returns:
458
            string: String with kytos status.
459
460
        """
461 2
        if self.started_at:
462 2
            return "Running since %s" % self.started_at
463 2
        return "Stopped"
464
465 2
    def uptime(self):
466
        """Return the uptime of kytos server.
467
468
        This method should return:
469
            - 0 if Kytos Server is stopped.
470
            - (kytos.start_at - datetime.now) if Kytos Server is running.
471
472
        Returns:
473
           datetime.timedelta: The uptime interval.
474
475
        """
476 2
        return now() - self.started_at if self.started_at else 0
477
478 2
    def notify_listeners(self, event):
479
        """Send the event to the specified listeners.
480
481
        Loops over self.events_listeners matching (by regexp) the attribute
482
        name of the event with the keys of events_listeners. If a match occurs,
483
        then send the event to each registered listener.
484
485
        Args:
486
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
487
        """
488 2
        self.log.debug("looking for listeners for %s", event)
489 2
        for event_regex, listeners in dict(self.events_listeners).items():
490
            # self.log.debug("listeners found for %s: %r => %s", event,
491
            #                event_regex, [l.__qualname__ for l in listeners])
492
            # Do not match if the event has more characters
493
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
494 2
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
495 2
                event_regex += '$'
496 2
            if re.match(event_regex, event.name):
497
                # self.log.debug('Calling listeners for %s', event)
498 2
                for listener in listeners:
499 2
                    listener(event)
500
501 2
    async def raw_event_handler(self):
502
        """Handle raw events.
503
504
        Listen to the raw_buffer and send all its events to the
505
        corresponding listeners.
506
        """
507 2
        self.log.info("Raw Event Handler started")
508 2
        while True:
509 2
            event = await self.buffers.raw.aget()
510 2
            self.notify_listeners(event)
511 2
            self.log.debug("Raw Event handler called")
512
513 2
            if event.name == "kytos/core.shutdown":
514 2
                self.log.debug("Raw Event handler stopped")
515 2
                break
516
517 2
    async def msg_in_event_handler(self):
518
        """Handle msg_in events.
519
520
        Listen to the msg_in buffer and send all its events to the
521
        corresponding listeners.
522
        """
523 2
        self.log.info("Message In Event Handler started")
524 2
        while True:
525 2
            event = await self.buffers.msg_in.aget()
526 2
            self.notify_listeners(event)
527 2
            self.log.debug("Message In Event handler called")
528
529 2
            if event.name == "kytos/core.shutdown":
530 2
                self.log.debug("Message In Event handler stopped")
531 2
                break
532
533 2
    async def publish_connection_error(self, event):
534
        """Publish connection error event.
535
536
        Args:
537
            event (KytosEvent): Event that triggered for error propagation.
538
        """
539
        event.name = \
540
            f"kytos/core.{event.destination.protocol.name}.connection.error"
541
        error_msg = f"Connection state: {event.destination.state}"
542
        event.content["exception"] = error_msg
543
        await self.buffers.app.aput(event)
544
545 2
    async def msg_out_event_handler(self):
546
        """Handle msg_out events.
547
548
        Listen to the msg_out buffer and send all its events to the
549
        corresponding listeners.
550
        """
551 2
        self.log.info("Message Out Event Handler started")
552 2
        while True:
553 2
            triggered_event = await self.buffers.msg_out.aget()
554
555 2
            if triggered_event.name == "kytos/core.shutdown":
556 2
                self.log.debug("Message Out Event handler stopped")
557 2
                break
558
559 2
            message = triggered_event.content['message']
560 2
            destination = triggered_event.destination
561 2
            try:
562 2
                if (destination and
563
                        not destination.state == ConnectionState.FINISHED):
564 2
                    packet = message.pack()
565 2
                    destination.send(packet)
566 2
                    self.log.debug('Connection %s: OUT OFP, '
567
                                   'version: %s, type: %s, xid: %s - %s',
568
                                   destination.id,
569
                                   message.header.version,
570
                                   message.header.message_type,
571
                                   message.header.xid,
572
                                   packet.hex())
573 2
                    self.notify_listeners(triggered_event)
574 2
                    self.log.debug("Message Out Event handler called")
575 2
                    continue
576
577
            except (OSError, SocketError):
578
                pass
579
580
            await self.publish_connection_error(triggered_event)
581
            self.log.info("connection closed. Cannot send message")
582
583 2
    async def app_event_handler(self):
584
        """Handle app events.
585
586
        Listen to the app buffer and send all its events to the
587
        corresponding listeners.
588
        """
589 2
        self.log.info("App Event Handler started")
590 2
        while True:
591 2
            event = await self.buffers.app.aget()
592 2
            self.notify_listeners(event)
593 2
            self.log.debug("App Event handler called")
594
595 2
            if event.name == "kytos/core.shutdown":
596 2
                self.log.debug("App Event handler stopped")
597 2
                break
598
599 2
    def get_interface_by_id(self, interface_id):
600
        """Find a Interface  with interface_id.
601
602
        Args:
603
            interface_id(str): Interface Identifier.
604
605
        Returns:
606
            Interface: Instance of Interface with the id given.
607
608
        """
609 2
        if interface_id is None:
610 2
            return None
611
612 2
        switch_id = ":".join(interface_id.split(":")[:-1])
613 2
        interface_number = int(interface_id.split(":")[-1])
614
615 2
        switch = self.switches.get(switch_id)
616
617 2
        if not switch:
618 2
            return None
619
620 2
        return switch.interfaces.get(interface_number, None)
621
622 2
    def get_switch_by_dpid(self, dpid):
623
        """Return a specific switch by dpid.
624
625
        Args:
626
            dpid (|DPID|): dpid object used to identify a switch.
627
628
        Returns:
629
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
630
631
        """
632 2
        return self.switches.get(dpid)
633
634 2
    def get_switch_or_create(self, dpid, connection=None):
635
        """Return switch or create it if necessary.
636
637
        Args:
638
            dpid (|DPID|): dpid object used to identify a switch.
639
            connection (:class:`~kytos.core.connection.Connection`):
640
                connection used by switch. If a switch has a connection that
641
                will be updated.
642
643
        Returns:
644
            :class:`~kytos.core.switch.Switch`: new or existent switch.
645
646
        """
647 2
        with self._switches_lock:
648 2
            if connection:
649 2
                self.create_or_update_connection(connection)
650
651 2
            switch = self.get_switch_by_dpid(dpid)
652 2
            event_name = 'kytos/core.switch.'
653
654 2
            if switch is None:
655 2
                switch = Switch(dpid=dpid)
656 2
                self.add_new_switch(switch)
657 2
                event_name += 'new'
658
            else:
659 2
                event_name += 'reconnected'
660
661 2
            self.set_switch_options(dpid=dpid)
662 2
            event = KytosEvent(name=event_name, content={'switch': switch})
663
664 2
            if connection:
665 2
                old_connection = switch.connection
666 2
                switch.update_connection(connection)
667
668 2
                if old_connection is not connection:
669 2
                    self.remove_connection(old_connection)
670
671 2
            self.buffers.app.put(event)
672
673 2
            return switch
674
675 2
    def set_switch_options(self, dpid):
676
        """Update the switch settings based on kytos.conf options.
677
678
        Args:
679
            dpid (str): dpid used to identify a switch.
680
681
        """
682 2
        switch = self.switches.get(dpid)
683 2
        if not switch:
684
            return
685
686 2
        vlan_pool = {}
687 2
        vlan_pool = self.options.vlan_pool
688 2
        if not vlan_pool:
689 2
            return
690
691 2
        if vlan_pool.get(dpid):
692 2
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
693 2
            for intf_num, port_list in vlan_pool[dpid].items():
694 2
                if not switch.interfaces.get((intf_num)):
695 2
                    vlan_ids = set()
696 2
                    for vlan_range in port_list:
697 2
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
698 2
                        for vlan_id in range(vlan_begin, vlan_end):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
699 2
                            vlan_ids.add(vlan_id)
700 2
                    intf_num = int(intf_num)
701 2
                    intf = Interface(name=intf_num, port_number=intf_num,
702
                                     switch=switch)
703 2
                    intf.set_available_tags(vlan_ids)
704 2
                    switch.update_interface(intf)
705
706 2
    def create_or_update_connection(self, connection):
707
        """Update a connection.
708
709
        Args:
710
            connection (:class:`~kytos.core.connection.Connection`):
711
                Instance of connection that will be updated.
712
        """
713 2
        self.connections[connection.id] = connection
714
715 2
    def get_connection_by_id(self, conn_id):
716
        """Return a existent connection by id.
717
718
        Args:
719
            id (int): id from a connection.
720
721
        Returns:
722
            :class:`~kytos.core.connection.Connection`: Instance of connection
723
                or None Type.
724
725
        """
726 2
        return self.connections.get(conn_id)
727
728 2
    def remove_connection(self, connection):
729
        """Close a existent connection and remove it.
730
731
        Args:
732
            connection (:class:`~kytos.core.connection.Connection`):
733
                Instance of connection that will be removed.
734
        """
735 2
        if connection is None:
736 2
            return False
737
738 2
        try:
739 2
            connection.close()
740 2
            del self.connections[connection.id]
741 2
        except KeyError:
742 2
            return False
743 2
        return True
744
745 2
    def remove_switch(self, switch):
746
        """Remove an existent switch.
747
748
        Args:
749
            switch (:class:`~kytos.core.switch.Switch`):
750
                Instance of switch that will be removed.
751
        """
752 2
        try:
753 2
            del self.switches[switch.dpid]
754 2
        except KeyError:
755 2
            return False
756 2
        return True
757
758 2
    def new_connection(self, event):
759
        """Handle a kytos/core.connection.new event.
760
761
        This method will read new connection event and store the connection
762
        (socket) into the connections attribute on the controller.
763
764
        It also clear all references to the connection since it is a new
765
        connection on the same ip:port.
766
767
        Args:
768
            event (~kytos.core.KytosEvent):
769
                The received event (``kytos/core.connection.new``) with the
770
                needed info.
771
        """
772 2
        self.log.info("Handling %s...", event)
773
774 2
        connection = event.source
775 2
        self.log.debug("Event source: %s", event.source)
776
777
        # Remove old connection (aka cleanup) if it exists
778 2
        if self.get_connection_by_id(connection.id):
779
            self.remove_connection(connection.id)
780
781
        # Update connections with the new connection
782 2
        self.create_or_update_connection(connection)
783
784 2
    def add_new_switch(self, switch):
785
        """Add a new switch on the controller.
786
787
        Args:
788
            switch (Switch): A Switch object
789
        """
790 2
        self.switches[switch.dpid] = switch
791
792 2
    def _import_napp(self, username, napp_name):
793
        """Import a NApp module.
794
795
        Raises:
796
            FileNotFoundError: if NApp's main.py is not found.
797
            ModuleNotFoundError: if any NApp requirement is not installed.
798
799
        """
800 2
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
801 2
        path = os.path.join(self.options.napps, username, napp_name,
802
                            'main.py')
803 2
        napp_spec = spec_from_file_location(mod_name, path)
804 2
        napp_module = module_from_spec(napp_spec)
805 2
        sys.modules[napp_spec.name] = napp_module
806 2
        napp_spec.loader.exec_module(napp_module)
807 2
        return napp_module
808
809 2
    def load_napp(self, username, napp_name):
810
        """Load a single NApp.
811
812
        Args:
813
            username (str): NApp username (makes up NApp's path).
814
            napp_name (str): Name of the NApp to be loaded.
815
        """
816 2
        if (username, napp_name) in self.napps:
817 2
            message = 'NApp %s/%s was already loaded'
818 2
            self.log.warning(message, username, napp_name)
819 2
            return
820
821 2
        try:
822 2
            napp_module = self._import_napp(username, napp_name)
823 2
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
824 2
            self.log.error("Error loading NApp '%s/%s': %s",
825
                           username, napp_name, err)
826 2
            return
827 2
        except FileNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
828 2
            msg = "NApp module not found, assuming it's a meta-napp: %s"
829 2
            self.log.warning(msg, err.filename)
830 2
            return
831
832 2
        try:
833 2
            napp = napp_module.Main(controller=self)
834 2
        except:  # noqa pylint: disable=bare-except
835 2
            self.log.critical("NApp initialization failed: %s/%s",
836
                              username, napp_name, exc_info=True)
837 2
            return
838
839 2
        self.napps[(username, napp_name)] = napp
840
841 2
        napp.start()
842 2
        self.api_server.authenticate_endpoints(napp)
843 2
        self.api_server.register_napp_endpoints(napp)
844
845
        # pylint: disable=protected-access
846 2
        for event, listeners in napp._listeners.items():
847
            self.events_listeners.setdefault(event, []).extend(listeners)
848
        # pylint: enable=protected-access
849
850 2
    def pre_install_napps(self, napps, enable=True):
851
        """Pre install and enable NApps.
852
853
        Before installing, it'll check if it's installed yet.
854
855
        Args:
856
            napps ([str]): List of NApps to be pre-installed and enabled.
857
        """
858 2
        all_napps = self.napps_manager.get_installed_napps()
859 2
        installed = [str(napp) for napp in all_napps]
860 2
        napps_diff = [napp for napp in napps if napp not in installed]
861 2
        for napp in napps_diff:
862 2
            self.napps_manager.install(napp, enable=enable)
863
864 2
    def load_napps(self):
865
        """Load all NApps enabled on the NApps dir."""
866 2
        for napp in self.napps_manager.get_enabled_napps():
867 2
            try:
868 2
                self.log.info("Loading NApp %s", napp.id)
869 2
                self.load_napp(napp.username, napp.name)
870
            except FileNotFoundError as exception:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable FileNotFoundError does not seem to be defined.
Loading history...
871
                self.log.error("Could not load NApp %s: %s",
872
                               napp.id, exception)
873
874 2
    def unload_napp(self, username, napp_name):
875
        """Unload a specific NApp.
876
877
        Args:
878
            username (str): NApp username.
879
            napp_name (str): Name of the NApp to be unloaded.
880
        """
881 2
        napp = self.napps.pop((username, napp_name), None)
882
883 2
        if napp is None:
884
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
885
        else:
886 2
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
887 2
            napp_id = NApp(username, napp_name).id
888 2
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
889 2
            napp_shutdown_fn = self.events_listeners[event.name][0]
890
            # Call listener before removing it from events_listeners
891 2
            napp_shutdown_fn(event)
892
893
            # Remove rest endpoints from that napp
894 2
            self.api_server.remove_napp_endpoints(napp)
895
896
            # Removing listeners from that napp
897
            # pylint: disable=protected-access
898 2
            for event_type, napp_listeners in napp._listeners.items():
899
                event_listeners = self.events_listeners[event_type]
900
                for listener in napp_listeners:
901
                    event_listeners.remove(listener)
902
                if not event_listeners:
903
                    del self.events_listeners[event_type]
904
            # pylint: enable=protected-access
905
906 2
    def unload_napps(self):
907
        """Unload all loaded NApps that are not core NApps."""
908
        # list() is used here to avoid the error:
909
        # 'RuntimeError: dictionary changed size during iteration'
910
        # This is caused by looping over an dictionary while removing
911
        # items from it.
912
        for (username, napp_name) in list(self.napps.keys()):  # noqa
913
            self.unload_napp(username, napp_name)
914
915 2
    def reload_napp_module(self, username, napp_name, napp_file):
916
        """Reload a NApp Module."""
917 2
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
918 2
        try:
919 2
            napp_module = import_module(mod_name)
920 2
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
921 2
            self.log.error("Module '%s' not found", mod_name)
922 2
            raise
923 2
        try:
924 2
            napp_module = reload_module(napp_module)
925 2
        except ImportError as err:
926 2
            self.log.error("Error reloading NApp '%s/%s': %s",
927
                           username, napp_name, err)
928 2
            raise
929
930 2
    def reload_napp(self, username, napp_name):
931
        """Reload a NApp."""
932 2
        self.unload_napp(username, napp_name)
933 2
        try:
934 2
            self.reload_napp_module(username, napp_name, 'settings')
935 2
            self.reload_napp_module(username, napp_name, 'main')
936 2
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
937 2
            return 400
938 2
        self.log.info("NApp '%s/%s' successfully reloaded",
939
                      username, napp_name)
940 2
        self.load_napp(username, napp_name)
941 2
        return 200
942
943 2
    def rest_reload_napp(self, username, napp_name):
944
        """Request reload a NApp."""
945 2
        res = self.reload_napp(username, napp_name)
946 2
        return 'reloaded', res
947
948 2
    def rest_reload_all_napps(self):
949
        """Request reload all NApps."""
950 2
        for napp in self.napps:
951 2
            self.reload_napp(*napp)
952
        return 'reloaded', 200
953