Passed
Pull Request — master (#235)
by Vinicius
05:20
created

kytos.core.controller.Controller.event_handler()   A

Complexity

Conditions 3

Size

Total Lines 10
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 10
rs 10
c 0
b 0
f 0
ccs 7
cts 7
cp 1
crap 3
1
"""Kytos SDN Platform main class.
2
3
This module contains the main class of Kytos, which is
4
:class:`~.core.Controller`.
5
6
Basic usage:
7
8
.. code-block:: python3
9
10
    from kytos.config import KytosConfig
11
    from kytos.core import Controller
12
    config = KytosConfig()
13
    controller = Controller(config.options)
14
    controller.start()
15
"""
16 1
import asyncio
17 1
import atexit
18 1
import json
19 1
import logging
20 1
import os
21 1
import re
22 1
import sys
23 1
import threading
24 1
from concurrent.futures import ThreadPoolExecutor
25 1
from importlib import import_module
26 1
from importlib import reload as reload_module
27 1
from importlib.util import module_from_spec, spec_from_file_location
28 1
from pathlib import Path
29 1
from socket import error as SocketError
30
31 1
from kytos.core.api_server import APIServer
32 1
from kytos.core.apm import init_apm
33
# from kytos.core.tcp_server import KytosRequestHandler, KytosServer
34 1
from kytos.core.atcp_server import KytosServer, KytosServerProtocol
35 1
from kytos.core.auth import Auth
36 1
from kytos.core.buffers import KytosBuffers
37 1
from kytos.core.config import KytosConfig
38 1
from kytos.core.connection import ConnectionState
39 1
from kytos.core.db import db_conn_wait
40 1
from kytos.core.dead_letter import DeadLetter
41 1
from kytos.core.events import KytosEvent
42 1
from kytos.core.exceptions import KytosAPMInitException, KytosDBInitException
43 1
from kytos.core.helpers import executors, now
44 1
from kytos.core.interface import Interface
45 1
from kytos.core.logs import LogManager
46 1
from kytos.core.napps.base import NApp
47 1
from kytos.core.napps.manager import NAppsManager
48 1
from kytos.core.napps.napp_dir_listener import NAppDirListener
49 1
from kytos.core.switch import Switch
50
51 1
__all__ = ('Controller',)
52
53
54 1
def exc_handler(_, exc, __):
55
    """Log uncaught exceptions.
56
57
    Args:
58
        exc_type (ignored): exception type
59
        exc: exception instance
60
        tb (ignored): traceback
61
    """
62
    logging.basicConfig(filename='errlog.log',
63
                        format='%(asctime)s:%(pathname)'
64
                        's:%(levelname)s:%(message)s')
65
    logging.exception('Uncaught Exception: %s', exc)
66
67
68 1
class Controller:
69
    """Main class of Kytos.
70
71
    The main responsibilities of this class are:
72
        - start a thread with :class:`~.core.tcp_server.KytosServer`;
73
        - manage KytosNApps (install, load and unload);
74
        - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`);
75
        - manage which event should be sent to NApps methods;
76
        - manage the buffers handlers, considering one thread per handler.
77
    """
78
79
    # Created issue #568 for the disabled checks.
80
    # pylint: disable=too-many-instance-attributes,too-many-public-methods
81 1
    def __init__(self, options=None, loop=None):
82
        """Init method of Controller class takes the parameters below.
83
84
        Args:
85
            options (:attr:`ParseArgs.args`): :attr:`options` attribute from an
86
                instance of :class:`~kytos.core.config.KytosConfig` class.
87
        """
88 1
        if options is None:
89
            options = KytosConfig().options['daemon']
90
91 1
        self._loop = loop or asyncio.get_event_loop()
92 1
        self._pool = ThreadPoolExecutor(max_workers=1)
93
94
        # asyncio tasks
95 1
        self._tasks = []
96
97
        #: dict: keep the main threads of the controller (buffers and handler)
98 1
        self._threads = {}
99
        #: KytosBuffers: KytosBuffer object with Controller buffers
100 1
        self.buffers = KytosBuffers(loop=self._loop)
101
        #: dict: keep track of the socket connections labeled by ``(ip, port)``
102
        #:
103
        #: This dict stores all connections between the controller and the
104
        #: switches. The key for this dict is a tuple (ip, port). The content
105
        #: is a Connection
106 1
        self.connections = {}
107
        #: dict: mapping of events and event listeners.
108
        #:
109
        #: The key of the dict is a KytosEvent (or a string that represent a
110
        #: regex to match against KytosEvents) and the value is a list of
111
        #: methods that will receive the referenced event
112 1
        self.events_listeners = {'kytos/core.connection.new':
113
                                 [self.new_connection]}
114
115
        #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance)
116
        #:
117
        #: The key is the napp name (string), while the value is the napp
118
        #: instance itself.
119 1
        self.napps = {}
120
        #: Object generated by ParseArgs on config.py file
121 1
        self.options = options
122
        #: KytosServer: Instance of KytosServer that will be listening to TCP
123
        #: connections.
124 1
        self.server = None
125
        #: dict: Current existing switches.
126
        #:
127
        #: The key is the switch dpid, while the value is a Switch object.
128 1
        self.switches = {}  # dpid: Switch()
129 1
        self._switches_lock = threading.Lock()
130
131
        #: datetime.datetime: Time when the controller finished starting.
132 1
        self.started_at = None
133
134
        #: logging.Logger: Logger instance used by Kytos.
135 1
        self.log = None
136
137
        #: Observer that handle NApps when they are enabled or disabled.
138 1
        self.napp_dir_listener = NAppDirListener(self)
139
140 1
        self.napps_manager = NAppsManager(self)
141
142
        #: API Server used to expose rest endpoints.
143 1
        self.api_server = APIServer(__name__, self.options.listen,
144
                                    self.options.api_port,
145
                                    self.napps_manager, self.options.napps)
146
147 1
        self.auth = Auth(self)
148 1
        self.dead_letter = DeadLetter(self)
149 1
        self._alisten_tasks = set()
150
151 1
        self._register_endpoints()
152
        #: Adding the napps 'enabled' directory into the PATH
153
        #: Now you can access the enabled napps with:
154
        #: from napps.<username>.<napp_name> import ?....
155 1
        sys.path.append(os.path.join(self.options.napps, os.pardir))
156 1
        sys.excepthook = exc_handler
157
158 1
    def enable_logs(self):
159
        """Register kytos log and enable the logs."""
160 1
        LogManager.load_config_file(self.options.logging, self.options.debug)
161 1
        LogManager.enable_websocket(self.api_server.server)
162 1
        self.log = logging.getLogger(__name__)
163
164 1
    @staticmethod
165 1
    def loggers():
166
        """List all logging Loggers.
167
168
        Return a list of Logger objects, with name and logging level.
169
        """
170 1
        return [logging.getLogger(name)
171
                for name in logging.root.manager.loggerDict
172
                if "kytos" in name]
173
174 1
    def toggle_debug(self, name=None):
175
        """Enable/disable logging debug messages to a given logger name.
176
177
        If the name parameter is not specified the debug will be
178
        enabled/disabled following the initial config file. It will decide
179
        to enable/disable using the 'kytos' name to find the current
180
        logger level.
181
        Obs: To disable the debug the logging will be set to NOTSET
182
183
        Args:
184
            name(text): Full hierarchy Logger name. Ex: "kytos.core.controller"
185
        """
186 1
        if name and name not in logging.root.manager.loggerDict:
187
            # A Logger name that is not declared in logging will raise an error
188
            # otherwise logging would create a new Logger.
189 1
            raise ValueError(f"Invalid logger name: {name}")
190
191 1
        if not name:
192
            # Logger name not specified.
193 1
            level = logging.getLogger('kytos').getEffectiveLevel()
194 1
            enable_debug = level != logging.DEBUG
195
196
            # Enable/disable default Loggers
197 1
            LogManager.load_config_file(self.options.logging, enable_debug)
198 1
            return
199
200
        # Get effective logger level for the name
201 1
        level = logging.getLogger(name).getEffectiveLevel()
202 1
        logger = logging.getLogger(name)
203
204 1
        if level == logging.DEBUG:
205
            # disable debug
206 1
            logger.setLevel(logging.NOTSET)
207
        else:
208
            # enable debug
209 1
            logger.setLevel(logging.DEBUG)
210
211 1
    def start(self, restart=False):
212
        """Create pidfile and call start_controller method."""
213 1
        self.enable_logs()
214 1
        if self.options.database:
215 1
            self.db_conn_or_core_shutdown()
216 1
        if self.options.apm:
217 1
            self.init_apm_or_core_shutdown()
218 1
        if not restart:
219 1
            self.create_pidfile()
220 1
        self.start_controller()
221
222 1
    def create_pidfile(self):
223
        """Create a pidfile."""
224 1
        pid = os.getpid()
225
226
        # Creates directory if it doesn't exist
227
        # System can erase /var/run's content
228 1
        pid_folder = Path(self.options.pidfile).parent
229 1
        self.log.info(pid_folder)
230
231
        # Pylint incorrectly infers Path objects
232
        # https://github.com/PyCQA/pylint/issues/224
233
        # pylint: disable=no-member
234 1
        if not pid_folder.exists():
235
            pid_folder.mkdir()
236
            pid_folder.chmod(0o1777)
237
        # pylint: enable=no-member
238
239
        # Make sure the file is deleted when controller stops
240 1
        atexit.register(Path(self.options.pidfile).unlink)
241
242
        # Checks if a pidfile exists. Creates a new file.
243 1
        try:
244 1
            pidfile = open(self.options.pidfile, mode='x')
245 1
        except OSError:
246
            # This happens if there is a pidfile already.
247
            # We shall check if the process that created the pidfile is still
248
            # running.
249 1
            try:
250 1
                existing_file = open(self.options.pidfile, mode='r')
251 1
                old_pid = int(existing_file.read())
252 1
                os.kill(old_pid, 0)
253
                # If kill() doesn't return an error, there's a process running
254
                # with the same PID. We assume it is Kytos and quit.
255
                # Otherwise, overwrite the file and proceed.
256
                error_msg = ("PID file {} exists. Delete it if Kytos is not "
257
                             "running. Aborting.")
258
                sys.exit(error_msg.format(self.options.pidfile))
259 1
            except OSError:
260 1
                try:
261 1
                    pidfile = open(self.options.pidfile, mode='w')
262
                except OSError as exception:
263
                    error_msg = "Failed to create pidfile {}: {}."
264
                    sys.exit(error_msg.format(self.options.pidfile, exception))
265
266
        # Identifies the process that created the pidfile.
267 1
        pidfile.write(str(pid))
268 1
        pidfile.close()
269
270 1
    def start_controller(self):
271
        """Start the controller.
272
273
        Starts the KytosServer (TCP Server) coroutine.
274
        Starts a thread for each buffer handler.
275
        Load the installed apps.
276
        """
277 1
        self.log.info("Starting Kytos - Kytos Controller")
278 1
        self.server = KytosServer((self.options.listen,
279
                                   int(self.options.port)),
280
                                  KytosServerProtocol,
281
                                  self,
282
                                  self.options.protocol_name)
283
284 1
        self.log.info("Starting TCP server: %s", self.server)
285 1
        self.server.serve_forever()
286
287 1
        def _stop_loop(_):
288
            loop = asyncio.get_event_loop()
289
            # print(_.result())
290
            threads = threading.enumerate()
291
            self.log.debug("%s threads before loop.stop: %s",
292
                           len(threads), threads)
293
            loop.stop()
294
295 1
        async def _run_api_server_thread(executor):
296
            log = logging.getLogger('kytos.core.controller.api_server_thread')
297
            log.debug('starting')
298
            # log.debug('creating tasks')
299
            loop = asyncio.get_event_loop()
300
            blocking_tasks = [
301
                loop.run_in_executor(executor, self.api_server.run)
302
            ]
303
            # log.debug('waiting for tasks')
304
            completed, pending = await asyncio.wait(blocking_tasks)
305
            # results = [t.result() for t in completed]
306
            # log.debug('results: {!r}'.format(results))
307
            log.debug('completed: %d, pending: %d',
308
                      len(completed), len(pending))
309
310 1
        task = self._loop.create_task(self.event_handler("conn"))
311 1
        self._tasks.append(task)
312 1
        task = self._loop.create_task(self.event_handler("raw"))
313 1
        self._tasks.append(task)
314 1
        task = self._loop.create_task(self.event_handler("msg_in"))
315 1
        self._tasks.append(task)
316 1
        task = self._loop.create_task(self.msg_out_event_handler())
317 1
        self._tasks.append(task)
318 1
        task = self._loop.create_task(self.event_handler("app"))
319 1
        self._tasks.append(task)
320 1
        task = self._loop.create_task(_run_api_server_thread(self._pool))
321 1
        task.add_done_callback(_stop_loop)
322 1
        self._tasks.append(task)
323
324 1
        self.log.info("ThreadPool started: %s", self._pool)
325
326
        # ASYNC TODO: ensure all threads started correctly
327
        # This is critical, if any of them failed starting we should exit.
328
        # sys.exit(error_msg.format(thread, exception))
329
330 1
        self.log.info("Loading Kytos NApps...")
331 1
        self.napp_dir_listener.start()
332 1
        self.pre_install_napps(self.options.napps_pre_installed)
333 1
        self.load_napps()
334
335 1
        self.started_at = now()
336
337 1
    def db_conn_or_core_shutdown(self):
338
        """Ensure db connection or core shutdown."""
339 1
        try:
340 1
            db_conn_wait(db_backend=self.options.database)
341 1
        except KytosDBInitException as exc:
342 1
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
343
344 1
    def init_apm_or_core_shutdown(self, **kwargs):
345
        """Init APM instrumentation or core shutdown."""
346
        if not self.options.apm:
347
            return
348
        try:
349
            init_apm(self.options.apm, app=self.api_server.app,
350
                     **kwargs)
351
        except KytosAPMInitException as exc:
352
            sys.exit(f"Kytos couldn't start because of {str(exc)}")
353
354 1
    def _register_endpoints(self):
355
        """Register all rest endpoint served by kytos.
356
357
        -   Register APIServer endpoints
358
        -   Register WebUI endpoints
359
        -   Register ``/api/kytos/core/config`` endpoint
360
        """
361 1
        self.api_server.start_api()
362
        # Register controller endpoints as /api/kytos/core/...
363 1
        self.api_server.register_core_endpoint('config/',
364
                                               self.configuration_endpoint)
365 1
        self.api_server.register_core_endpoint('metadata/',
366
                                               Controller.metadata_endpoint)
367 1
        self.api_server.register_core_endpoint(
368
            'reload/<username>/<napp_name>/',
369
            self.rest_reload_napp)
370 1
        self.api_server.register_core_endpoint('reload/all',
371
                                               self.rest_reload_all_napps)
372 1
        self.auth.register_core_auth_services()
373 1
        self.dead_letter.register_endpoints()
374
375 1
    def register_rest_endpoint(self, url, function, methods):
376
        """Deprecate in favor of @rest decorator."""
377 1
        self.api_server.register_rest_endpoint(url, function, methods)
378
379 1
    @classmethod
380 1
    def metadata_endpoint(cls):
381
        """Return the Kytos metadata.
382
383
        Returns:
384
            string: Json with current kytos metadata.
385
386
        """
387 1
        meta_path = "%s/metadata.py" % os.path.dirname(__file__)
388 1
        meta_file = open(meta_path).read()
389 1
        metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file))
390 1
        return json.dumps(metadata)
391
392 1
    def configuration_endpoint(self):
393
        """Return the configuration options used by Kytos.
394
395
        Returns:
396
            string: Json with current configurations used by kytos.
397
398
        """
399 1
        return json.dumps(self.options.__dict__)
400
401 1
    def restart(self, graceful=True):
402
        """Restart Kytos SDN Controller.
403
404
        Args:
405
            graceful(bool): Represents the way that Kytos will restart.
406
        """
407 1
        if self.started_at is not None:
408 1
            self.stop(graceful)
409 1
            self.__init__(self.options)
410
411 1
        self.start(restart=True)
412
413 1
    def stop(self, graceful=True):
414
        """Shutdown all services used by kytos.
415
416
        This method should:
417
            - stop all Websockets
418
            - stop the API Server
419
            - stop the Controller
420
        """
421 1
        if self.started_at:
422 1
            self.stop_controller(graceful)
423
424 1
    def stop_controller(self, graceful=True):
425
        """Stop the controller.
426
427
        This method should:
428
            - announce on the network that the controller will shutdown;
429
            - stop receiving incoming packages;
430
            - call the 'shutdown' method of each KytosNApp that is running;
431
            - finish reading the events on all buffers;
432
            - stop each running handler;
433
            - stop all running threads;
434
            - stop the KytosServer;
435
        """
436 1
        self.log.info("Stopping Kytos")
437
438 1
        self.buffers.send_stop_signal()
439 1
        self.api_server.stop_api_server()
440 1
        self.napp_dir_listener.stop()
441
442 1
        self.log.info("Stopping threadpool: %s", self._pool)
443 1
        for pool_name in executors:
444 1
            self.log.info("Stopping threadpool: %s", pool_name)
445
446 1
        threads = threading.enumerate()
447 1
        self.log.debug("%s threads before threadpool shutdown: %s",
448
                       len(threads), threads)
449
450 1
        try:
451
            # Python >= 3.9
452
            # pylint: disable=unexpected-keyword-arg
453 1
            self._pool.shutdown(wait=graceful, cancel_futures=True)
454 1
            for executor_pool in executors.values():
455 1
                executor_pool.shutdown(wait=graceful, cancel_futures=True)
456
        except TypeError:
457
            self._pool.shutdown(wait=graceful)
458
            for executor_pool in executors.values():
459
                executor_pool.shutdown(wait=graceful)
460
461
        # self.server.socket.shutdown()
462
        # self.server.socket.close()
463
464
        # for thread in self._threads.values():
465
        #     self.log.info("Stopping thread: %s", thread.name)
466
        #     thread.join()
467
468
        # for thread in self._threads.values():
469
        #     while thread.is_alive():
470
        #         self.log.info("Thread is alive: %s", thread.name)
471
        #         pass
472
473 1
        self.started_at = None
474 1
        self.unload_napps()
475 1
        self.buffers = KytosBuffers()
476
477
        # Cancel all async tasks (event handlers and servers)
478 1
        for task in self._tasks:
479
            task.cancel()
480
481
        # ASYNC TODO: close connections
482
        # self.server.server_close()
483
484
        # Shutdown the TCP server and the main asyncio loop
485 1
        self.server.shutdown()
486
487 1
    def status(self):
488
        """Return status of Kytos Server.
489
490
        If the controller kytos is running this method will be returned
491
        "Running since 'Started_At'", otherwise "Stopped".
492
493
        Returns:
494
            string: String with kytos status.
495
496
        """
497 1
        if self.started_at:
498 1
            return "Running since %s" % self.started_at
499 1
        return "Stopped"
500
501 1
    def uptime(self):
502
        """Return the uptime of kytos server.
503
504
        This method should return:
505
            - 0 if Kytos Server is stopped.
506
            - (kytos.start_at - datetime.now) if Kytos Server is running.
507
508
        Returns:
509
           datetime.timedelta: The uptime interval.
510
511
        """
512 1
        return now() - self.started_at if self.started_at else 0
513
514 1
    def notify_listeners(self, event):
515
        """Send the event to the specified listeners.
516
517
        Loops over self.events_listeners matching (by regexp) the attribute
518
        name of the event with the keys of events_listeners. If a match occurs,
519
        then send the event to each registered listener.
520
521
        Args:
522
            event (~kytos.core.KytosEvent): An instance of a KytosEvent.
523
        """
524
525 1
        if logging.DEBUG >= self.log.getEffectiveLevel():
526
            self.log.debug("looking for listeners for %s", event)
527 1
        for event_regex, listeners in dict(self.events_listeners).items():
528
            # self.log.debug("listeners found for %s: %r => %s", event,
529
            #                event_regex, [l.__qualname__ for l in listeners])
530
            # Do not match if the event has more characters
531
            # e.g. "shutdown" won't match "shutdown.kytos/of_core"
532 1
            if event_regex[-1] != '$' or event_regex[-2] == '\\':
533 1
                event_regex += '$'
534 1
            if re.match(event_regex, event.name):
535 1
                if logging.DEBUG >= self.log.getEffectiveLevel():
536
                    self.log.debug('Calling listeners for %s', event)
537 1
                for listener in listeners:
538 1
                    if asyncio.iscoroutinefunction(listener):
539
                        task = asyncio.create_task(listener(event))
540
                        self._alisten_tasks.add(task)
541
                        task.add_done_callback(self._alisten_tasks.discard)
542
                    else:
543 1
                        listener(event)
544
545 1
    async def event_handler(self, buffer_name: str):
546
        """Default event handler that gets from an event buffer."""
547 1
        event_buffer = getattr(self.buffers, buffer_name)
548
        while True:
549 1
            event = await event_buffer.aget()
550 1
            self.notify_listeners(event)
551
552 1
            if event.name == "kytos/core.shutdown":
553 1
                self.log.debug("Raw Event handler stopped")
554 1
                break
555
556 1
    async def publish_connection_error(self, event):
557
        """Publish connection error event.
558
559
        Args:
560
            event (KytosEvent): Event that triggered for error propagation.
561
        """
562
        event.name = \
563
            f"kytos/core.{event.destination.protocol.name}.connection.error"
564
        error_msg = f"Connection state: {event.destination.state}"
565
        event.content["exception"] = error_msg
566
        await self.buffers.app.aput(event)
567
568 1
    async def msg_out_event_handler(self):
569
        """Handle msg_out events.
570
571
        Listen to the msg_out buffer and send all its events to the
572
        corresponding listeners.
573
        """
574 1
        self.log.info("Message Out Event Handler started")
575
        while True:
576 1
            triggered_event = await self.buffers.msg_out.aget()
577
578 1
            if triggered_event.name == "kytos/core.shutdown":
579 1
                self.log.debug("Message Out Event handler stopped")
580 1
                break
581
582 1
            message = triggered_event.content['message']
583 1
            destination = triggered_event.destination
584 1
            try:
585 1
                if (destination and
586
                        not destination.state == ConnectionState.FINISHED):
587 1
                    packet = message.pack()
588 1
                    destination.send(packet)
589 1
                    if logging.DEBUG >= self.log.getEffectiveLevel():
590
                        self.log.debug('Connection %s: OUT OFP, '
591
                                       'version: %s, type: %s, xid: %s - %s',
592
                                       destination.id,
593
                                       message.header.version,
594
                                       message.header.message_type,
595
                                       message.header.xid,
596
                                       packet.hex())
597 1
                    self.notify_listeners(triggered_event)
598 1
                    continue
599
600
            except (OSError, SocketError):
601
                pass
602
603
            await self.publish_connection_error(triggered_event)
604
            self.log.info("connection closed. Cannot send message")
605
606 1
    def get_interface_by_id(self, interface_id):
607
        """Find a Interface  with interface_id.
608
609
        Args:
610
            interface_id(str): Interface Identifier.
611
612
        Returns:
613
            Interface: Instance of Interface with the id given.
614
615
        """
616 1
        if interface_id is None:
617 1
            return None
618
619 1
        switch_id = ":".join(interface_id.split(":")[:-1])
620 1
        interface_number = int(interface_id.split(":")[-1])
621
622 1
        switch = self.switches.get(switch_id)
623
624 1
        if not switch:
625 1
            return None
626
627 1
        return switch.interfaces.get(interface_number, None)
628
629 1
    def get_switch_by_dpid(self, dpid):
630
        """Return a specific switch by dpid.
631
632
        Args:
633
            dpid (|DPID|): dpid object used to identify a switch.
634
635
        Returns:
636
            :class:`~kytos.core.switch.Switch`: Switch with dpid specified.
637
638
        """
639 1
        return self.switches.get(dpid)
640
641 1
    def get_switch_or_create(self, dpid, connection=None):
642
        """Return switch or create it if necessary.
643
644
        Args:
645
            dpid (|DPID|): dpid object used to identify a switch.
646
            connection (:class:`~kytos.core.connection.Connection`):
647
                connection used by switch. If a switch has a connection that
648
                will be updated.
649
650
        Returns:
651
            :class:`~kytos.core.switch.Switch`: new or existent switch.
652
653
        """
654 1
        with self._switches_lock:
655 1
            if connection:
656 1
                self.create_or_update_connection(connection)
657
658 1
            switch = self.get_switch_by_dpid(dpid)
659 1
            event_name = 'kytos/core.switch.'
660
661 1
            if switch is None:
662 1
                switch = Switch(dpid=dpid)
663 1
                self.add_new_switch(switch)
664 1
                event_name += 'new'
665
            else:
666 1
                event_name += 'reconnected'
667
668 1
            self.set_switch_options(dpid=dpid)
669 1
            event = KytosEvent(name=event_name, content={'switch': switch})
670
671 1
            if connection:
672 1
                old_connection = switch.connection
673 1
                switch.update_connection(connection)
674
675 1
                if old_connection is not connection:
676 1
                    self.remove_connection(old_connection)
677
678 1
            self.buffers.app.put(event)
679
680 1
            return switch
681
682 1
    def set_switch_options(self, dpid):
683
        """Update the switch settings based on kytos.conf options.
684
685
        Args:
686
            dpid (str): dpid used to identify a switch.
687
688
        """
689 1
        switch = self.switches.get(dpid)
690 1
        if not switch:
691
            return
692
693 1
        vlan_pool = {}
694 1
        vlan_pool = self.options.vlan_pool
695 1
        if not vlan_pool:
696 1
            return
697
698 1
        if vlan_pool.get(dpid):
699 1
            self.log.info("Loading vlan_pool configuration for dpid %s", dpid)
700 1
            for intf_num, port_list in vlan_pool[dpid].items():
701 1
                if not switch.interfaces.get((intf_num)):
702 1
                    vlan_ids = set()
703 1
                    for vlan_range in port_list:
704 1
                        (vlan_begin, vlan_end) = (vlan_range[0:2])
705 1
                        for vlan_id in range(vlan_begin, vlan_end):
706 1
                            vlan_ids.add(vlan_id)
707 1
                    intf_num = int(intf_num)
708 1
                    intf = Interface(name=intf_num, port_number=intf_num,
709
                                     switch=switch)
710 1
                    intf.set_available_tags(vlan_ids)
711 1
                    switch.update_interface(intf)
712
713 1
    def create_or_update_connection(self, connection):
714
        """Update a connection.
715
716
        Args:
717
            connection (:class:`~kytos.core.connection.Connection`):
718
                Instance of connection that will be updated.
719
        """
720 1
        self.connections[connection.id] = connection
721
722 1
    def get_connection_by_id(self, conn_id):
723
        """Return a existent connection by id.
724
725
        Args:
726
            id (int): id from a connection.
727
728
        Returns:
729
            :class:`~kytos.core.connection.Connection`: Instance of connection
730
                or None Type.
731
732
        """
733 1
        return self.connections.get(conn_id)
734
735 1
    def remove_connection(self, connection):
736
        """Close a existent connection and remove it.
737
738
        Args:
739
            connection (:class:`~kytos.core.connection.Connection`):
740
                Instance of connection that will be removed.
741
        """
742 1
        if connection is None:
743 1
            return False
744
745 1
        try:
746 1
            connection.close()
747 1
            del self.connections[connection.id]
748 1
        except KeyError:
749 1
            return False
750 1
        return True
751
752 1
    def remove_switch(self, switch):
753
        """Remove an existent switch.
754
755
        Args:
756
            switch (:class:`~kytos.core.switch.Switch`):
757
                Instance of switch that will be removed.
758
        """
759 1
        try:
760 1
            del self.switches[switch.dpid]
761 1
        except KeyError:
762 1
            return False
763 1
        return True
764
765 1
    def new_connection(self, event):
766
        """Handle a kytos/core.connection.new event.
767
768
        This method will read new connection event and store the connection
769
        (socket) into the connections attribute on the controller.
770
771
        It also clear all references to the connection since it is a new
772
        connection on the same ip:port.
773
774
        Args:
775
            event (~kytos.core.KytosEvent):
776
                The received event (``kytos/core.connection.new``) with the
777
                needed info.
778
        """
779 1
        self.log.info("Handling %s...", event)
780
781 1
        connection = event.source
782 1
        self.log.debug("Event source: %s", event.source)
783
784
        # Remove old connection (aka cleanup) if it exists
785 1
        if self.get_connection_by_id(connection.id):
786
            self.remove_connection(connection.id)
787
788
        # Update connections with the new connection
789 1
        self.create_or_update_connection(connection)
790
791 1
    def add_new_switch(self, switch):
792
        """Add a new switch on the controller.
793
794
        Args:
795
            switch (Switch): A Switch object
796
        """
797 1
        self.switches[switch.dpid] = switch
798
799 1
    def _import_napp(self, username, napp_name):
800
        """Import a NApp module.
801
802
        Raises:
803
            FileNotFoundError: if NApp's main.py is not found.
804
            ModuleNotFoundError: if any NApp requirement is not installed.
805
806
        """
807 1
        mod_name = '.'.join(['napps', username, napp_name, 'main'])
808 1
        path = os.path.join(self.options.napps, username, napp_name,
809
                            'main.py')
810 1
        napp_spec = spec_from_file_location(mod_name, path)
811 1
        napp_module = module_from_spec(napp_spec)
812 1
        sys.modules[napp_spec.name] = napp_module
813 1
        napp_spec.loader.exec_module(napp_module)
814 1
        return napp_module
815
816 1
    def load_napp(self, username, napp_name):
817
        """Load a single NApp.
818
819
        Args:
820
            username (str): NApp username (makes up NApp's path).
821
            napp_name (str): Name of the NApp to be loaded.
822
        """
823 1
        if (username, napp_name) in self.napps:
824 1
            message = 'NApp %s/%s was already loaded'
825 1
            self.log.warning(message, username, napp_name)
826 1
            return
827
828 1
        try:
829 1
            napp_module = self._import_napp(username, napp_name)
830 1
        except ModuleNotFoundError as err:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
831 1
            self.log.error("Error loading NApp '%s/%s': %s",
832
                           username, napp_name, err)
833 1
            return
834 1
        except FileNotFoundError as err:
835 1
            msg = "NApp module not found, assuming it's a meta-napp: %s"
836 1
            self.log.warning(msg, err.filename)
837 1
            return
838
839 1
        try:
840 1
            napp = napp_module.Main(controller=self)
841 1
        except:  # noqa pylint: disable=bare-except
842 1
            self.log.critical("NApp initialization failed: %s/%s",
843
                              username, napp_name, exc_info=True)
844 1
            return
845
846 1
        self.napps[(username, napp_name)] = napp
847
848 1
        napp.start()
849 1
        self.api_server.authenticate_endpoints(napp)
850 1
        self.api_server.register_napp_endpoints(napp)
851
852
        # pylint: disable=protected-access
853 1
        for event, listeners in napp._listeners.items():
854
            self.events_listeners.setdefault(event, []).extend(listeners)
855
        # pylint: enable=protected-access
856
857 1
    def pre_install_napps(self, napps, enable=True):
858
        """Pre install and enable NApps.
859
860
        Before installing, it'll check if it's installed yet.
861
862
        Args:
863
            napps ([str]): List of NApps to be pre-installed and enabled.
864
        """
865 1
        all_napps = self.napps_manager.get_installed_napps()
866 1
        installed = [str(napp) for napp in all_napps]
867 1
        napps_diff = [napp for napp in napps if napp not in installed]
868 1
        for napp in napps_diff:
869 1
            self.napps_manager.install(napp, enable=enable)
870
871 1
    def load_napps(self):
872
        """Load all NApps enabled on the NApps dir."""
873 1
        for napp in self.napps_manager.get_enabled_napps():
874 1
            try:
875 1
                self.log.info("Loading NApp %s", napp.id)
876 1
                self.load_napp(napp.username, napp.name)
877
            except FileNotFoundError as exception:
878
                self.log.error("Could not load NApp %s: %s",
879
                               napp.id, exception)
880
881 1
    def unload_napp(self, username, napp_name):
882
        """Unload a specific NApp.
883
884
        Args:
885
            username (str): NApp username.
886
            napp_name (str): Name of the NApp to be unloaded.
887
        """
888 1
        napp = self.napps.pop((username, napp_name), None)
889
890 1
        if napp is None:
891
            self.log.warning('NApp %s/%s was not loaded', username, napp_name)
892
        else:
893 1
            self.log.info("Shutting down NApp %s/%s...", username, napp_name)
894 1
            napp_id = NApp(username, napp_name).id
895 1
            event = KytosEvent(name='kytos/core.shutdown.' + napp_id)
896 1
            napp_shutdown_fn = self.events_listeners[event.name][0]
897
            # Call listener before removing it from events_listeners
898 1
            napp_shutdown_fn(event)
899
900
            # Remove rest endpoints from that napp
901 1
            self.api_server.remove_napp_endpoints(napp)
902
903
            # Removing listeners from that napp
904
            # pylint: disable=protected-access
905 1
            for event_type, napp_listeners in napp._listeners.items():
906
                event_listeners = self.events_listeners[event_type]
907
                for listener in napp_listeners:
908
                    event_listeners.remove(listener)
909
                if not event_listeners:
910
                    del self.events_listeners[event_type]
911
            # pylint: enable=protected-access
912
913 1
    def unload_napps(self):
914
        """Unload all loaded NApps that are not core NApps."""
915
        # list() is used here to avoid the error:
916
        # 'RuntimeError: dictionary changed size during iteration'
917
        # This is caused by looping over an dictionary while removing
918
        # items from it.
919
        for (username, napp_name) in list(self.napps.keys()):  # noqa
920
            self.unload_napp(username, napp_name)
921
922 1
    def reload_napp_module(self, username, napp_name, napp_file):
923
        """Reload a NApp Module."""
924 1
        mod_name = '.'.join(['napps', username, napp_name, napp_file])
925 1
        try:
926 1
            napp_module = import_module(mod_name)
927 1
        except ModuleNotFoundError:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
928 1
            self.log.error("Module '%s' not found", mod_name)
929 1
            raise
930 1
        try:
931 1
            napp_module = reload_module(napp_module)
932 1
        except ImportError as err:
933 1
            self.log.error("Error reloading NApp '%s/%s': %s",
934
                           username, napp_name, err)
935 1
            raise
936
937 1
    def reload_napp(self, username, napp_name):
938
        """Reload a NApp."""
939 1
        self.unload_napp(username, napp_name)
940 1
        try:
941 1
            self.reload_napp_module(username, napp_name, 'settings')
942 1
            self.reload_napp_module(username, napp_name, 'main')
943 1
        except (ModuleNotFoundError, ImportError):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable ModuleNotFoundError does not seem to be defined.
Loading history...
944 1
            return 400
945 1
        self.log.info("NApp '%s/%s' successfully reloaded",
946
                      username, napp_name)
947 1
        self.load_napp(username, napp_name)
948 1
        return 200
949
950 1
    def rest_reload_napp(self, username, napp_name):
951
        """Request reload a NApp."""
952 1
        res = self.reload_napp(username, napp_name)
953 1
        return 'reloaded', res
954
955 1
    def rest_reload_all_napps(self):
956
        """Request reload all NApps."""
957 1
        for napp in self.napps:
958 1
            self.reload_napp(*napp)
959
        return 'reloaded', 200
960