1 | """Kytos SDN Platform main class. |
||
2 | |||
3 | This module contains the main class of Kytos, which is |
||
4 | :class:`~.core.Controller`. |
||
5 | |||
6 | Basic usage: |
||
7 | |||
8 | .. code-block:: python3 |
||
9 | |||
10 | from kytos.config import KytosConfig |
||
11 | from kytos.core import Controller |
||
12 | config = KytosConfig() |
||
13 | controller = Controller(config.options) |
||
14 | controller.start() |
||
15 | """ |
||
16 | 1 | import asyncio |
|
17 | 1 | import atexit |
|
18 | 1 | import json |
|
19 | 1 | import logging |
|
20 | 1 | import os |
|
21 | 1 | import re |
|
22 | 1 | import sys |
|
23 | 1 | import threading |
|
24 | 1 | from concurrent.futures import ThreadPoolExecutor |
|
25 | 1 | from importlib import import_module |
|
26 | 1 | from importlib import reload as reload_module |
|
27 | 1 | from importlib.util import module_from_spec, spec_from_file_location |
|
28 | 1 | from pathlib import Path |
|
29 | |||
30 | 1 | from kytos.core.api_server import APIServer |
|
31 | # from kytos.core.tcp_server import KytosRequestHandler, KytosServer |
||
32 | 1 | from kytos.core.atcp_server import KytosServer, KytosServerProtocol |
|
33 | 1 | from kytos.core.auth import Auth |
|
34 | 1 | from kytos.core.buffers import KytosBuffers |
|
35 | 1 | from kytos.core.config import KytosConfig |
|
36 | 1 | from kytos.core.connection import ConnectionState |
|
37 | 1 | from kytos.core.events import KytosEvent |
|
38 | 1 | from kytos.core.helpers import now |
|
39 | 1 | from kytos.core.interface import Interface |
|
40 | 1 | from kytos.core.logs import LogManager |
|
41 | 1 | from kytos.core.napps.base import NApp |
|
42 | 1 | from kytos.core.napps.manager import NAppsManager |
|
43 | 1 | from kytos.core.napps.napp_dir_listener import NAppDirListener |
|
44 | 1 | from kytos.core.switch import Switch |
|
45 | |||
46 | 1 | __all__ = ('Controller',) |
|
47 | |||
48 | |||
49 | 1 | class Controller: |
|
50 | """Main class of Kytos. |
||
51 | |||
52 | The main responsibilities of this class are: |
||
53 | - start a thread with :class:`~.core.tcp_server.KytosServer`; |
||
54 | - manage KytosNApps (install, load and unload); |
||
55 | - keep the buffers (instance of :class:`~.core.buffers.KytosBuffers`); |
||
56 | - manage which event should be sent to NApps methods; |
||
57 | - manage the buffers handlers, considering one thread per handler. |
||
58 | """ |
||
59 | |||
60 | # Created issue #568 for the disabled checks. |
||
61 | # pylint: disable=too-many-instance-attributes,too-many-public-methods |
||
62 | 1 | def __init__(self, options=None, loop=None): |
|
63 | """Init method of Controller class takes the parameters below. |
||
64 | |||
65 | Args: |
||
66 | options (:attr:`ParseArgs.args`): :attr:`options` attribute from an |
||
67 | instance of :class:`~kytos.core.config.KytosConfig` class. |
||
68 | """ |
||
69 | 1 | if options is None: |
|
70 | options = KytosConfig().options['daemon'] |
||
71 | |||
72 | 1 | self._loop = loop or asyncio.get_event_loop() |
|
73 | 1 | self._pool = ThreadPoolExecutor(max_workers=1) |
|
74 | |||
75 | #: dict: keep the main threads of the controller (buffers and handler) |
||
76 | 1 | self._threads = {} |
|
77 | #: KytosBuffers: KytosBuffer object with Controller buffers |
||
78 | 1 | self.buffers = KytosBuffers(loop=self._loop) |
|
79 | #: dict: keep track of the socket connections labeled by ``(ip, port)`` |
||
80 | #: |
||
81 | #: This dict stores all connections between the controller and the |
||
82 | #: switches. The key for this dict is a tuple (ip, port). The content |
||
83 | #: is another dict with the connection information. |
||
84 | 1 | self.connections = {} |
|
85 | #: dict: mapping of events and event listeners. |
||
86 | #: |
||
87 | #: The key of the dict is a KytosEvent (or a string that represent a |
||
88 | #: regex to match agains KytosEvents) and the value is a list of |
||
89 | #: methods that will receive the referenced event |
||
90 | 1 | self.events_listeners = {'kytos/core.connection.new': |
|
91 | [self.new_connection]} |
||
92 | |||
93 | #: dict: Current loaded apps - ``'napp_name'``: ``napp`` (instance) |
||
94 | #: |
||
95 | #: The key is the napp name (string), while the value is the napp |
||
96 | #: instance itself. |
||
97 | 1 | self.napps = {} |
|
98 | #: Object generated by ParseArgs on config.py file |
||
99 | 1 | self.options = options |
|
100 | #: KytosServer: Instance of KytosServer that will be listening to TCP |
||
101 | #: connections. |
||
102 | 1 | self.server = None |
|
103 | #: dict: Current existing switches. |
||
104 | #: |
||
105 | #: The key is the switch dpid, while the value is a Switch object. |
||
106 | 1 | self.switches = {} # dpid: Switch() |
|
107 | |||
108 | #: datetime.datetime: Time when the controller finished starting. |
||
109 | 1 | self.started_at = None |
|
110 | |||
111 | #: logging.Logger: Logger instance used by Kytos. |
||
112 | 1 | self.log = None |
|
113 | |||
114 | #: Observer that handle NApps when they are enabled or disabled. |
||
115 | 1 | self.napp_dir_listener = NAppDirListener(self) |
|
116 | |||
117 | 1 | self.napps_manager = NAppsManager(self) |
|
118 | |||
119 | #: API Server used to expose rest endpoints. |
||
120 | 1 | self.api_server = APIServer(__name__, self.options.listen, |
|
121 | self.options.api_port, |
||
122 | self.napps_manager, self.options.napps) |
||
123 | |||
124 | 1 | self.auth = Auth(self) |
|
125 | |||
126 | 1 | self._register_endpoints() |
|
127 | #: Adding the napps 'enabled' directory into the PATH |
||
128 | #: Now you can access the enabled napps with: |
||
129 | #: from napps.<username>.<napp_name> import ?.... |
||
130 | 1 | sys.path.append(os.path.join(self.options.napps, os.pardir)) |
|
131 | |||
132 | 1 | def enable_logs(self): |
|
133 | """Register kytos log and enable the logs.""" |
||
134 | 1 | LogManager.load_config_file(self.options.logging, self.options.debug) |
|
135 | 1 | LogManager.enable_websocket(self.api_server.server) |
|
136 | 1 | self.log = logging.getLogger(__name__) |
|
137 | |||
138 | 1 | def start(self, restart=False): |
|
139 | """Create pidfile and call start_controller method.""" |
||
140 | self.enable_logs() |
||
141 | if not restart: |
||
142 | self.create_pidfile() |
||
143 | self.start_controller() |
||
144 | |||
145 | 1 | def create_pidfile(self): |
|
146 | """Create a pidfile.""" |
||
147 | pid = os.getpid() |
||
148 | |||
149 | # Creates directory if it doesn't exist |
||
150 | # System can erase /var/run's content |
||
151 | pid_folder = Path(self.options.pidfile).parent |
||
152 | self.log.info(pid_folder) |
||
153 | |||
154 | # Pylint incorrectly infers Path objects |
||
155 | # https://github.com/PyCQA/pylint/issues/224 |
||
156 | # pylint: disable=no-member |
||
157 | if not pid_folder.exists(): |
||
158 | pid_folder.mkdir() |
||
159 | pid_folder.chmod(0o1777) |
||
160 | # pylint: enable=no-member |
||
161 | |||
162 | # Make sure the file is deleted when controller stops |
||
163 | atexit.register(Path(self.options.pidfile).unlink) |
||
164 | |||
165 | # Checks if a pidfile exists. Creates a new file. |
||
166 | try: |
||
167 | pidfile = open(self.options.pidfile, mode='x') |
||
168 | except OSError: |
||
169 | # This happens if there is a pidfile already. |
||
170 | # We shall check if the process that created the pidfile is still |
||
171 | # running. |
||
172 | try: |
||
173 | existing_file = open(self.options.pidfile, mode='r') |
||
174 | old_pid = int(existing_file.read()) |
||
175 | os.kill(old_pid, 0) |
||
176 | # If kill() doesn't return an error, there's a process running |
||
177 | # with the same PID. We assume it is Kytos and quit. |
||
178 | # Otherwise, overwrite the file and proceed. |
||
179 | error_msg = ("PID file {} exists. Delete it if Kytos is not " |
||
180 | "running. Aborting.") |
||
181 | sys.exit(error_msg.format(self.options.pidfile)) |
||
182 | except OSError: |
||
183 | try: |
||
184 | pidfile = open(self.options.pidfile, mode='w') |
||
185 | except OSError as exception: |
||
186 | error_msg = "Failed to create pidfile {}: {}." |
||
187 | sys.exit(error_msg.format(self.options.pidfile, exception)) |
||
188 | |||
189 | # Identifies the process that created the pidfile. |
||
190 | pidfile.write(str(pid)) |
||
191 | pidfile.close() |
||
192 | |||
193 | 1 | def start_controller(self): |
|
194 | """Start the controller. |
||
195 | |||
196 | Starts the KytosServer (TCP Server) coroutine. |
||
197 | Starts a thread for each buffer handler. |
||
198 | Load the installed apps. |
||
199 | """ |
||
200 | self.log.info("Starting Kytos - Kytos Controller") |
||
201 | self.server = KytosServer((self.options.listen, |
||
202 | int(self.options.port)), |
||
203 | KytosServerProtocol, |
||
204 | self, |
||
205 | self.options.protocol_name) |
||
206 | |||
207 | self.log.info("Starting TCP server: %s", self.server) |
||
208 | self.server.serve_forever() |
||
209 | |||
210 | def _stop_loop(_): |
||
211 | loop = asyncio.get_event_loop() |
||
212 | # print(_.result()) |
||
213 | threads = threading.enumerate() |
||
214 | self.log.debug("%s threads before loop.stop: %s", |
||
215 | len(threads), threads) |
||
216 | loop.stop() |
||
217 | |||
218 | async def _run_api_server_thread(executor): |
||
219 | log = logging.getLogger('kytos.core.controller.api_server_thread') |
||
220 | log.debug('starting') |
||
221 | # log.debug('creating tasks') |
||
222 | loop = asyncio.get_event_loop() |
||
223 | blocking_tasks = [ |
||
224 | loop.run_in_executor(executor, self.api_server.run) |
||
225 | ] |
||
226 | # log.debug('waiting for tasks') |
||
227 | completed, pending = await asyncio.wait(blocking_tasks) |
||
228 | # results = [t.result() for t in completed] |
||
229 | # log.debug('results: {!r}'.format(results)) |
||
230 | log.debug('completed: %d, pending: %d', |
||
231 | len(completed), len(pending)) |
||
232 | |||
233 | task = self._loop.create_task(self.raw_event_handler()) |
||
234 | task = self._loop.create_task(self.msg_in_event_handler()) |
||
235 | task = self._loop.create_task(self.msg_out_event_handler()) |
||
236 | task = self._loop.create_task(self.app_event_handler()) |
||
237 | task = self._loop.create_task(_run_api_server_thread(self._pool)) |
||
238 | task.add_done_callback(_stop_loop) |
||
239 | |||
240 | self.log.info("ThreadPool started: %s", self._pool) |
||
241 | |||
242 | # ASYNC TODO: ensure all threads started correctly |
||
243 | # This is critical, if any of them failed starting we should exit. |
||
244 | # sys.exit(error_msg.format(thread, exception)) |
||
245 | |||
246 | self.log.info("Loading Kytos NApps...") |
||
247 | self.napp_dir_listener.start() |
||
248 | self.pre_install_napps(self.options.napps_pre_installed) |
||
249 | self.load_napps() |
||
250 | |||
251 | self.started_at = now() |
||
252 | |||
253 | 1 | def _register_endpoints(self): |
|
254 | """Register all rest endpoint served by kytos. |
||
255 | |||
256 | - Register APIServer endpoints |
||
257 | - Register WebUI endpoints |
||
258 | - Register ``/api/kytos/core/config`` endpoint |
||
259 | """ |
||
260 | 1 | self.api_server.start_api() |
|
261 | # Register controller endpoints as /api/kytos/core/... |
||
262 | 1 | self.api_server.register_core_endpoint('config/', |
|
263 | self.configuration_endpoint) |
||
264 | 1 | self.api_server.register_core_endpoint('metadata/', |
|
265 | Controller.metadata_endpoint) |
||
266 | 1 | self.api_server.register_core_endpoint( |
|
267 | 'reload/<username>/<napp_name>/', |
||
268 | self.rest_reload_napp) |
||
269 | 1 | self.api_server.register_core_endpoint('reload/all', |
|
270 | self.rest_reload_all_napps) |
||
271 | 1 | self.auth.register_core_auth_services() |
|
272 | |||
273 | 1 | def register_rest_endpoint(self, url, function, methods): |
|
274 | """Deprecate in favor of @rest decorator.""" |
||
275 | 1 | self.api_server.register_rest_endpoint(url, function, methods) |
|
276 | |||
277 | 1 | @classmethod |
|
278 | def metadata_endpoint(cls): |
||
279 | """Return the Kytos metadata. |
||
280 | |||
281 | Returns: |
||
282 | string: Json with current kytos metadata. |
||
283 | |||
284 | """ |
||
285 | meta_path = "%s/metadata.py" % os.path.dirname(__file__) |
||
286 | meta_file = open(meta_path).read() |
||
287 | metadata = dict(re.findall(r"(__[a-z]+__)\s*=\s*'([^']+)'", meta_file)) |
||
288 | return json.dumps(metadata) |
||
289 | |||
290 | 1 | def configuration_endpoint(self): |
|
291 | """Return the configuration options used by Kytos. |
||
292 | |||
293 | Returns: |
||
294 | string: Json with current configurations used by kytos. |
||
295 | |||
296 | """ |
||
297 | 1 | return json.dumps(self.options.__dict__) |
|
298 | |||
299 | 1 | def restart(self, graceful=True): |
|
300 | """Restart Kytos SDN Controller. |
||
301 | |||
302 | Args: |
||
303 | graceful(bool): Represents the way that Kytos will restart. |
||
304 | """ |
||
305 | if self.started_at is not None: |
||
306 | self.stop(graceful) |
||
307 | self.__init__(self.options) |
||
308 | |||
309 | self.start(restart=True) |
||
310 | |||
311 | 1 | def stop(self, graceful=True): |
|
312 | """Shutdown all services used by kytos. |
||
313 | |||
314 | This method should: |
||
315 | - stop all Websockets |
||
316 | - stop the API Server |
||
317 | - stop the Controller |
||
318 | """ |
||
319 | if self.started_at: |
||
320 | self.stop_controller(graceful) |
||
321 | |||
322 | 1 | def stop_controller(self, graceful=True): |
|
323 | """Stop the controller. |
||
324 | |||
325 | This method should: |
||
326 | - announce on the network that the controller will shutdown; |
||
327 | - stop receiving incoming packages; |
||
328 | - call the 'shutdown' method of each KytosNApp that is running; |
||
329 | - finish reading the events on all buffers; |
||
330 | - stop each running handler; |
||
331 | - stop all running threads; |
||
332 | - stop the KytosServer; |
||
333 | """ |
||
334 | self.log.info("Stopping Kytos") |
||
335 | |||
336 | self.buffers.send_stop_signal() |
||
337 | self.api_server.stop_api_server() |
||
338 | self.napp_dir_listener.stop() |
||
339 | |||
340 | self.log.info("Stopping threadpool: %s", self._pool) |
||
341 | |||
342 | threads = threading.enumerate() |
||
343 | self.log.debug("%s threads before threadpool shutdown: %s", |
||
344 | len(threads), threads) |
||
345 | |||
346 | self._pool.shutdown(wait=graceful) |
||
347 | |||
348 | # self.server.socket.shutdown() |
||
349 | # self.server.socket.close() |
||
350 | |||
351 | # for thread in self._threads.values(): |
||
352 | # self.log.info("Stopping thread: %s", thread.name) |
||
353 | # thread.join() |
||
354 | |||
355 | # for thread in self._threads.values(): |
||
356 | # while thread.is_alive(): |
||
357 | # self.log.info("Thread is alive: %s", thread.name) |
||
358 | # pass |
||
359 | |||
360 | self.started_at = None |
||
361 | self.unload_napps() |
||
362 | self.buffers = KytosBuffers() |
||
363 | |||
364 | # ASYNC TODO: close connections |
||
365 | # self.server.server_close() |
||
366 | |||
367 | # Shutdown the TCP server and the main asyncio loop |
||
368 | self.server.shutdown() |
||
369 | |||
370 | 1 | def status(self): |
|
371 | """Return status of Kytos Server. |
||
372 | |||
373 | If the controller kytos is running this method will be returned |
||
374 | "Running since 'Started_At'", otherwise "Stopped". |
||
375 | |||
376 | Returns: |
||
377 | string: String with kytos status. |
||
378 | |||
379 | """ |
||
380 | if self.started_at: |
||
381 | return "Running since %s" % self.started_at |
||
382 | return "Stopped" |
||
383 | |||
384 | 1 | def uptime(self): |
|
385 | """Return the uptime of kytos server. |
||
386 | |||
387 | This method should return: |
||
388 | - 0 if Kytos Server is stopped. |
||
389 | - (kytos.start_at - datetime.now) if Kytos Server is running. |
||
390 | |||
391 | Returns: |
||
392 | datetime.timedelta: The uptime interval. |
||
393 | |||
394 | """ |
||
395 | return now() - self.started_at if self.started_at else 0 |
||
396 | |||
397 | 1 | def notify_listeners(self, event): |
|
398 | """Send the event to the specified listeners. |
||
399 | |||
400 | Loops over self.events_listeners matching (by regexp) the attribute |
||
401 | name of the event with the keys of events_listeners. If a match occurs, |
||
402 | then send the event to each registered listener. |
||
403 | |||
404 | Args: |
||
405 | event (~kytos.core.KytosEvent): An instance of a KytosEvent. |
||
406 | """ |
||
407 | self.log.debug("looking for listeners for %s", event) |
||
408 | for event_regex, listeners in dict(self.events_listeners).items(): |
||
409 | # self.log.debug("listeners found for %s: %r => %s", event, |
||
410 | # event_regex, [l.__qualname__ for l in listeners]) |
||
411 | # Do not match if the event has more characters |
||
412 | # e.g. "shutdown" won't match "shutdown.kytos/of_core" |
||
413 | if event_regex[-1] != '$' or event_regex[-2] == '\\': |
||
414 | event_regex += '$' |
||
415 | if re.match(event_regex, event.name): |
||
416 | # self.log.debug('Calling listeners for %s', event) |
||
417 | for listener in listeners: |
||
418 | listener(event) |
||
419 | |||
420 | 1 | async def raw_event_handler(self): |
|
421 | """Handle raw events. |
||
422 | |||
423 | Listen to the raw_buffer and send all its events to the |
||
424 | corresponding listeners. |
||
425 | """ |
||
426 | self.log.info("Raw Event Handler started") |
||
427 | while True: |
||
428 | event = await self.buffers.raw.aget() |
||
429 | self.notify_listeners(event) |
||
430 | self.log.debug("Raw Event handler called") |
||
431 | |||
432 | if event.name == "kytos/core.shutdown": |
||
433 | self.log.debug("Raw Event handler stopped") |
||
434 | break |
||
435 | |||
436 | 1 | async def msg_in_event_handler(self): |
|
437 | """Handle msg_in events. |
||
438 | |||
439 | Listen to the msg_in buffer and send all its events to the |
||
440 | corresponding listeners. |
||
441 | """ |
||
442 | self.log.info("Message In Event Handler started") |
||
443 | while True: |
||
444 | event = await self.buffers.msg_in.aget() |
||
445 | self.notify_listeners(event) |
||
446 | self.log.debug("Message In Event handler called") |
||
447 | |||
448 | if event.name == "kytos/core.shutdown": |
||
449 | self.log.debug("Message In Event handler stopped") |
||
450 | break |
||
451 | |||
452 | 1 | async def msg_out_event_handler(self): |
|
453 | """Handle msg_out events. |
||
454 | |||
455 | Listen to the msg_out buffer and send all its events to the |
||
456 | corresponding listeners. |
||
457 | """ |
||
458 | self.log.info("Message Out Event Handler started") |
||
459 | while True: |
||
460 | triggered_event = await self.buffers.msg_out.aget() |
||
461 | |||
462 | if triggered_event.name == "kytos/core.shutdown": |
||
463 | self.log.debug("Message Out Event handler stopped") |
||
464 | break |
||
465 | |||
466 | message = triggered_event.content['message'] |
||
467 | destination = triggered_event.destination |
||
468 | if (destination and |
||
469 | not destination.state == ConnectionState.FINISHED): |
||
470 | packet = message.pack() |
||
471 | destination.send(packet) |
||
472 | self.log.debug('Connection %s: OUT OFP, ' |
||
473 | 'version: %s, type: %s, xid: %s - %s', |
||
474 | destination.id, |
||
475 | message.header.version, |
||
476 | message.header.message_type, |
||
477 | message.header.xid, |
||
478 | packet.hex()) |
||
479 | self.notify_listeners(triggered_event) |
||
480 | self.log.debug("Message Out Event handler called") |
||
481 | else: |
||
482 | self.log.info("connection closed. Cannot send message") |
||
483 | |||
484 | 1 | async def app_event_handler(self): |
|
485 | """Handle app events. |
||
486 | |||
487 | Listen to the app buffer and send all its events to the |
||
488 | corresponding listeners. |
||
489 | """ |
||
490 | self.log.info("App Event Handler started") |
||
491 | while True: |
||
492 | event = await self.buffers.app.aget() |
||
493 | self.notify_listeners(event) |
||
494 | self.log.debug("App Event handler called") |
||
495 | |||
496 | if event.name == "kytos/core.shutdown": |
||
497 | self.log.debug("App Event handler stopped") |
||
498 | break |
||
499 | |||
500 | 1 | def get_interface_by_id(self, interface_id): |
|
501 | """Find a Interface with interface_id. |
||
502 | |||
503 | Args: |
||
504 | interface_id(str): Interface Identifier. |
||
505 | |||
506 | Returns: |
||
507 | Interface: Instance of Interface with the id given. |
||
508 | |||
509 | """ |
||
510 | if interface_id is None: |
||
511 | return None |
||
512 | |||
513 | switch_id = ":".join(interface_id.split(":")[:-1]) |
||
514 | interface_number = int(interface_id.split(":")[-1]) |
||
515 | |||
516 | switch = self.switches.get(switch_id) |
||
517 | |||
518 | if not switch: |
||
519 | return None |
||
520 | |||
521 | return switch.interfaces.get(interface_number, None) |
||
522 | |||
523 | 1 | def get_switch_by_dpid(self, dpid): |
|
524 | """Return a specific switch by dpid. |
||
525 | |||
526 | Args: |
||
527 | dpid (|DPID|): dpid object used to identify a switch. |
||
528 | |||
529 | Returns: |
||
530 | :class:`~kytos.core.switch.Switch`: Switch with dpid specified. |
||
531 | |||
532 | """ |
||
533 | 1 | return self.switches.get(dpid) |
|
534 | |||
535 | 1 | def get_switch_or_create(self, dpid, connection): |
|
536 | """Return switch or create it if necessary. |
||
537 | |||
538 | Args: |
||
539 | dpid (|DPID|): dpid object used to identify a switch. |
||
540 | connection (:class:`~kytos.core.connection.Connection`): |
||
541 | connection used by switch. If a switch has a connection that |
||
542 | will be updated. |
||
543 | |||
544 | Returns: |
||
545 | :class:`~kytos.core.switch.Switch`: new or existent switch. |
||
546 | |||
547 | """ |
||
548 | 1 | self.create_or_update_connection(connection) |
|
549 | 1 | switch = self.get_switch_by_dpid(dpid) |
|
550 | 1 | event_name = 'kytos/core.switch.' |
|
551 | |||
552 | 1 | if switch is None: |
|
553 | switch = Switch(dpid=dpid) |
||
554 | self.add_new_switch(switch) |
||
555 | event_name += 'new' |
||
556 | else: |
||
557 | 1 | event_name += 'reconnected' |
|
558 | |||
559 | 1 | self.set_switch_options(dpid=dpid) |
|
560 | 1 | event = KytosEvent(name=event_name, content={'switch': switch}) |
|
561 | |||
562 | 1 | old_connection = switch.connection |
|
563 | 1 | switch.update_connection(connection) |
|
564 | |||
565 | 1 | if old_connection is not connection: |
|
566 | self.remove_connection(old_connection) |
||
567 | |||
568 | 1 | self.buffers.app.put(event) |
|
569 | |||
570 | 1 | return switch |
|
571 | |||
572 | 1 | def set_switch_options(self, dpid): |
|
573 | """Update the switch settings based on kytos.conf options. |
||
574 | |||
575 | Args: |
||
576 | dpid (str): dpid used to identify a switch. |
||
577 | |||
578 | """ |
||
579 | 1 | switch = self.switches.get(dpid) |
|
580 | 1 | if not switch: |
|
581 | return |
||
582 | |||
583 | 1 | vlan_pool = {} |
|
584 | 1 | try: |
|
585 | 1 | vlan_pool = json.loads(self.options.vlan_pool) |
|
586 | 1 | if not vlan_pool: |
|
587 | return |
||
588 | except (TypeError, json.JSONDecodeError) as err: |
||
589 | self.log.error("Invalid vlan_pool settings: %s", err) |
||
590 | |||
591 | 1 | if vlan_pool.get(dpid): |
|
592 | 1 | self.log.info("Loading vlan_pool configuration for dpid %s", dpid) |
|
593 | 1 | for intf_num, port_list in vlan_pool[dpid].items(): |
|
594 | 1 | if not switch.interfaces.get((intf_num)): |
|
595 | 1 | vlan_ids = set() |
|
596 | 1 | for vlan_range in port_list: |
|
597 | 1 | (vlan_begin, vlan_end) = (vlan_range[0:2]) |
|
598 | 1 | for vlan_id in range(vlan_begin, vlan_end): |
|
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
599 | 1 | vlan_ids.add(vlan_id) |
|
600 | 1 | intf_num = int(intf_num) |
|
601 | 1 | intf = Interface(name=intf_num, port_number=intf_num, |
|
602 | switch=switch) |
||
603 | 1 | intf.set_available_tags(vlan_ids) |
|
604 | 1 | switch.update_interface(intf) |
|
605 | |||
606 | 1 | def create_or_update_connection(self, connection): |
|
607 | """Update a connection. |
||
608 | |||
609 | Args: |
||
610 | connection (:class:`~kytos.core.connection.Connection`): |
||
611 | Instance of connection that will be updated. |
||
612 | """ |
||
613 | 1 | self.connections[connection.id] = connection |
|
614 | |||
615 | 1 | def get_connection_by_id(self, conn_id): |
|
616 | """Return a existent connection by id. |
||
617 | |||
618 | Args: |
||
619 | id (int): id from a connection. |
||
620 | |||
621 | Returns: |
||
622 | :class:`~kytos.core.connection.Connection`: Instance of connection |
||
623 | or None Type. |
||
624 | |||
625 | """ |
||
626 | return self.connections.get(conn_id) |
||
627 | |||
628 | 1 | def remove_connection(self, connection): |
|
629 | """Close a existent connection and remove it. |
||
630 | |||
631 | Args: |
||
632 | connection (:class:`~kytos.core.connection.Connection`): |
||
633 | Instance of connection that will be removed. |
||
634 | """ |
||
635 | if connection is None: |
||
636 | return False |
||
637 | |||
638 | try: |
||
639 | connection.close() |
||
640 | del self.connections[connection.id] |
||
641 | except KeyError: |
||
642 | return False |
||
643 | return True |
||
644 | |||
645 | 1 | def remove_switch(self, switch): |
|
646 | """Remove an existent switch. |
||
647 | |||
648 | Args: |
||
649 | switch (:class:`~kytos.core.switch.Switch`): |
||
650 | Instance of switch that will be removed. |
||
651 | """ |
||
652 | try: |
||
653 | del self.switches[switch.dpid] |
||
654 | except KeyError: |
||
655 | return False |
||
656 | return True |
||
657 | |||
658 | 1 | def new_connection(self, event): |
|
659 | """Handle a kytos/core.connection.new event. |
||
660 | |||
661 | This method will read new connection event and store the connection |
||
662 | (socket) into the connections attribute on the controller. |
||
663 | |||
664 | It also clear all references to the connection since it is a new |
||
665 | connection on the same ip:port. |
||
666 | |||
667 | Args: |
||
668 | event (~kytos.core.KytosEvent): |
||
669 | The received event (``kytos/core.connection.new``) with the |
||
670 | needed info. |
||
671 | """ |
||
672 | self.log.info("Handling %s...", event) |
||
673 | |||
674 | connection = event.source |
||
675 | self.log.debug("Event source: %s", event.source) |
||
676 | |||
677 | # Remove old connection (aka cleanup) if it exists |
||
678 | if self.get_connection_by_id(connection.id): |
||
679 | self.remove_connection(connection.id) |
||
680 | |||
681 | # Update connections with the new connection |
||
682 | self.create_or_update_connection(connection) |
||
683 | |||
684 | 1 | def add_new_switch(self, switch): |
|
685 | """Add a new switch on the controller. |
||
686 | |||
687 | Args: |
||
688 | switch (Switch): A Switch object |
||
689 | """ |
||
690 | self.switches[switch.dpid] = switch |
||
691 | |||
692 | 1 | def _import_napp(self, username, napp_name): |
|
693 | """Import a NApp module. |
||
694 | |||
695 | Raises: |
||
696 | FileNotFoundError: if NApp's main.py is not found. |
||
697 | ModuleNotFoundError: if any NApp requirement is not installed. |
||
698 | |||
699 | """ |
||
700 | mod_name = '.'.join(['napps', username, napp_name, 'main']) |
||
701 | path = os.path.join(self.options.napps, username, napp_name, |
||
702 | 'main.py') |
||
703 | napp_spec = spec_from_file_location(mod_name, path) |
||
704 | napp_module = module_from_spec(napp_spec) |
||
705 | sys.modules[napp_spec.name] = napp_module |
||
706 | napp_spec.loader.exec_module(napp_module) |
||
707 | return napp_module |
||
708 | |||
709 | 1 | def load_napp(self, username, napp_name): |
|
710 | """Load a single NApp. |
||
711 | |||
712 | Args: |
||
713 | username (str): NApp username (makes up NApp's path). |
||
714 | napp_name (str): Name of the NApp to be loaded. |
||
715 | """ |
||
716 | if (username, napp_name) in self.napps: |
||
717 | message = 'NApp %s/%s was already loaded' |
||
718 | self.log.warning(message, username, napp_name) |
||
719 | return |
||
720 | |||
721 | try: |
||
722 | napp_module = self._import_napp(username, napp_name) |
||
723 | except ModuleNotFoundError as err: |
||
724 | self.log.error("Error loading NApp '%s/%s': %s", |
||
725 | username, napp_name, err) |
||
726 | return |
||
727 | except FileNotFoundError as err: |
||
728 | msg = "NApp module not found, assuming it's a meta napp: %s" |
||
729 | self.log.warning(msg, err.filename) |
||
730 | return |
||
731 | |||
732 | napp = napp_module.Main(controller=self) |
||
733 | |||
734 | self.napps[(username, napp_name)] = napp |
||
735 | |||
736 | napp.start() |
||
737 | self.api_server.register_napp_endpoints(napp) |
||
738 | |||
739 | # pylint: disable=protected-access |
||
740 | for event, listeners in napp._listeners.items(): |
||
741 | self.events_listeners.setdefault(event, []).extend(listeners) |
||
742 | # pylint: enable=protected-access |
||
743 | |||
744 | 1 | def pre_install_napps(self, napps, enable=True): |
|
745 | """Pre install and enable NApps. |
||
746 | |||
747 | Before installing, it'll check if it's installed yet. |
||
748 | |||
749 | Args: |
||
750 | napps ([str]): List of NApps to be pre-installed and enabled. |
||
751 | """ |
||
752 | all_napps = self.napps_manager.get_installed_napps() |
||
753 | installed = [str(napp) for napp in all_napps] |
||
754 | napps_diff = [napp for napp in napps if napp not in installed] |
||
755 | for napp in napps_diff: |
||
756 | self.napps_manager.install(napp, enable=enable) |
||
757 | |||
758 | 1 | def load_napps(self): |
|
759 | """Load all NApps enabled on the NApps dir.""" |
||
760 | for napp in self.napps_manager.get_enabled_napps(): |
||
761 | try: |
||
762 | self.log.info("Loading NApp %s", napp.id) |
||
763 | self.load_napp(napp.username, napp.name) |
||
764 | except FileNotFoundError as exception: |
||
765 | self.log.error("Could not load NApp %s: %s", |
||
766 | napp.id, exception) |
||
767 | |||
768 | 1 | def unload_napp(self, username, napp_name): |
|
769 | """Unload a specific NApp. |
||
770 | |||
771 | Args: |
||
772 | username (str): NApp username. |
||
773 | napp_name (str): Name of the NApp to be unloaded. |
||
774 | """ |
||
775 | 1 | napp = self.napps.pop((username, napp_name), None) |
|
776 | |||
777 | 1 | if napp is None: |
|
778 | self.log.warning('NApp %s/%s was not loaded', username, napp_name) |
||
779 | else: |
||
780 | 1 | self.log.info("Shutting down NApp %s/%s...", username, napp_name) |
|
781 | 1 | napp_id = NApp(username, napp_name).id |
|
782 | 1 | event = KytosEvent(name='kytos/core.shutdown.' + napp_id) |
|
783 | 1 | napp_shutdown_fn = self.events_listeners[event.name][0] |
|
784 | # Call listener before removing it from events_listeners |
||
785 | 1 | napp_shutdown_fn(event) |
|
786 | |||
787 | # Remove rest endpoints from that napp |
||
788 | 1 | self.api_server.remove_napp_endpoints(napp) |
|
789 | |||
790 | # Removing listeners from that napp |
||
791 | # pylint: disable=protected-access |
||
792 | 1 | for event_type, napp_listeners in napp._listeners.items(): |
|
793 | event_listeners = self.events_listeners[event_type] |
||
794 | for listener in napp_listeners: |
||
795 | event_listeners.remove(listener) |
||
796 | if not event_listeners: |
||
797 | del self.events_listeners[event_type] |
||
798 | # pylint: enable=protected-access |
||
799 | |||
800 | 1 | def unload_napps(self): |
|
801 | """Unload all loaded NApps that are not core NApps.""" |
||
802 | # list() is used here to avoid the error: |
||
803 | # 'RuntimeError: dictionary changed size during iteration' |
||
804 | # This is caused by looping over an dictionary while removing |
||
805 | # items from it. |
||
806 | for (username, napp_name) in list(self.napps.keys()): # noqa |
||
807 | self.unload_napp(username, napp_name) |
||
808 | |||
809 | 1 | def reload_napp_module(self, username, napp_name, napp_file): |
|
810 | """Reload a NApp Module.""" |
||
811 | mod_name = '.'.join(['napps', username, napp_name, napp_file]) |
||
812 | try: |
||
813 | napp_module = import_module(mod_name) |
||
814 | except ModuleNotFoundError as err: |
||
815 | self.log.error("Module '%s' not found", mod_name) |
||
816 | raise |
||
817 | try: |
||
818 | napp_module = reload_module(napp_module) |
||
819 | except ImportError as err: |
||
820 | self.log.error("Error reloading NApp '%s/%s': %s", |
||
821 | username, napp_name, err) |
||
822 | raise |
||
823 | |||
824 | 1 | def reload_napp(self, username, napp_name): |
|
825 | """Reload a NApp.""" |
||
826 | self.unload_napp(username, napp_name) |
||
827 | try: |
||
828 | self.reload_napp_module(username, napp_name, 'settings') |
||
829 | self.reload_napp_module(username, napp_name, 'main') |
||
830 | except (ModuleNotFoundError, ImportError): |
||
831 | return 400 |
||
832 | self.log.info("NApp '%s/%s' successfully reloaded", |
||
833 | username, napp_name) |
||
834 | self.load_napp(username, napp_name) |
||
835 | return 200 |
||
836 | |||
837 | 1 | def rest_reload_napp(self, username, napp_name): |
|
838 | """Request reload a NApp.""" |
||
839 | res = self.reload_napp(username, napp_name) |
||
840 | return 'reloaded', res |
||
841 | |||
842 | 1 | def rest_reload_all_napps(self): |
|
843 | """Request reload all NApps.""" |
||
844 | for napp in self.napps: |
||
845 | self.reload_napp(*napp) |
||
846 | return 'reloaded', 200 |
||
847 |