Passed
Pull Request — master (#84)
by Vinicius
03:26
created

build.main   F

Complexity

Total Complexity 175

Size/Duplication

Total Lines 1002
Duplicated Lines 5.39 %

Test Coverage

Coverage 92.01%

Importance

Changes 0
Metric Value
eloc 724
dl 54
loc 1002
ccs 576
cts 626
cp 0.9201
rs 1.876
c 0
b 0
f 0
wmc 175

72 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.handle_interface_link_up() 0 3 1
A Main.on_interface_link_up() 0 8 1
A Main.on_interface_deleted() 0 4 1
A Main.handle_switch_maintenance_end() 0 9 3
A Main.on_switch_maintenance_end() 0 4 1
A Main.handle_interface_deleted() 0 3 1
A Main.on_notify_port_created() 0 4 1
A Main.notify_port_created() 0 5 1
A Main.disable_switch() 0 13 2
A Main.on_connection_lost() 0 8 1
A Main.on_interface_link_down() 0 8 1
A Main.disable_link() 0 16 3
A Main.enable_switch() 0 13 2
A Main.on_link_available_tags() 0 5 2
B Main.enable_interface() 27 27 6
B Main.load_topology() 0 35 5
A Main.add_switch_metadata() 0 14 2
A Main._get_switches_dict() 0 12 3
A Main.get_interfaces() 0 10 3
A Main.delete_interface_metadata() 0 25 4
A Main.add_interface_metadata() 0 20 3
A Main.on_interfaces_created() 0 4 1
A Main.get_switches() 0 4 1
A Main.handle_link_maintenance_end() 0 18 4
A Main.notify_link_status_change() 0 14 3
A Main.handle_interface_down() 0 9 1
A Main.on_interface_created() 0 8 1
C Main.handle_link_up() 0 58 8
A Main.notify_switch_enabled() 0 5 1
A Main.handle_interface_created() 0 8 1
A Main.on_link_maintenance_end() 0 5 2
A Main.get_interface_metadata() 0 16 3
A Main.handle_lldp_status_updated() 0 9 4
A Main.delete_link_metadata() 0 17 3
A Main.on_lldp_status_updated() 0 4 1
A Main.shutdown() 0 3 1
A Main.notify_topology_update() 0 6 1
A Main.enable_link() 0 16 3
B Main.handle_link_down() 0 42 7
A Main.setup() 0 14 2
A Main.on_new_switch() 0 8 1
A Main.handle_new_switch() 0 9 2
A Main.execute() 0 3 1
B Main._load_switch() 0 43 5
A Main.get_switch_metadata() 0 8 2
A Main._get_metadata() 0 20 5
A Main.handle_connection_lost() 0 8 2
A Main.handle_on_link_available_tags() 0 15 2
B Main.disable_interface() 27 27 6
A Main.get_links() 0 7 1
A Main.on_link_maintenance_start() 0 5 2
A Main.handle_switch_maintenance_start() 0 10 4
A Main.get_topology() 0 7 1
A Main.handle_interface_link_down() 0 3 1
A Main._get_links_dict() 0 4 1
A Main.handle_interfaces_created() 0 11 3
A Main.handle_link_maintenance_start() 0 18 4
A Main.notify_switch_disabled() 0 5 1
A Main.add_links() 0 27 4
A Main.on_switch_maintenance_start() 0 4 1
A Main._get_link_or_create() 0 14 3
A Main._get_link_from_interface() 0 6 3
A Main.add_link_metadata() 0 13 2
A Main.notify_metadata_changes() 0 21 4
B Main._load_link() 0 27 5
A Main.delete_switch_metadata() 0 17 3
A Main._get_topology_dict() 0 4 1
A Main.ensure_db_or_core_shutdown() 0 12 2
A Main.load_interfaces_available_tags() 0 16 4
A Main.on_add_links() 0 4 1
A Main.get_link_metadata() 0 7 2
A Main._get_topology() 0 3 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Main module of kytos/topology Kytos Network Application.
2
3
Manage the network topology
4
"""
5
# pylint: disable=wrong-import-order
6
7 1
import time
8 1
from threading import Lock
9 1
from typing import List
10
11 1
from flask import jsonify, request
12 1
from pymongo.errors import AutoReconnect
13 1
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
14
15 1
from kytos.core import KytosEvent, KytosNApp, log, rest
16 1
from kytos.core.exceptions import KytosLinkCreationError
17 1
from kytos.core.helpers import listen_to
18 1
from kytos.core.interface import Interface
19 1
from kytos.core.link import Link
20 1
from kytos.core.switch import Switch
21 1
from napps.kytos.topology import settings
22 1
from napps.kytos.topology.controllers import TopoController
23 1
from napps.kytos.topology.exceptions import RestoreError
24 1
from napps.kytos.topology.models import Topology
25
26 1
DEFAULT_LINK_UP_TIMER = 10
27
28
29 1
class Main(KytosNApp):  # pylint: disable=too-many-public-methods
30
    """Main class of kytos/topology NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    def setup(self):
36
        """Initialize the NApp's links list."""
37 1
        self.links = {}
38 1
        self.store_items = {}
39 1
        self.intf_available_tags = {}
40 1
        self.link_up_timer = getattr(settings, 'LINK_UP_TIMER',
41
                                     DEFAULT_LINK_UP_TIMER)
42
43 1
        self._lock = Lock()
44 1
        self._links_lock = Lock()
45 1
        self.topo_controller = TopoController()
46 1
        if self.ensure_db_or_core_shutdown():
47
            self.topo_controller.bootstrap_indexes()
48
            self.load_topology()
49
50 1
    def ensure_db_or_core_shutdown(self) -> bool:
51
        """Ensure db or core shutdown. This should only be used on NApps
52
        like topology that is also responsible for part of kytos core entities.
53
54
        It's meant to be used on KytosNApp.setup"""
55
        try:
56
            self.topo_controller.db.command("hello")
57
            return True
58
        except AutoReconnect as exc:
59
            log.error(f"Failed to ensure DB. Exception: {str(exc)}")
60
            self.controller.stop_controller()
61
            return False
62
63 1
    def execute(self):
64
        """Execute once when the napp is running."""
65
        pass
66
67 1
    def shutdown(self):
68
        """Do nothing."""
69
        log.info('NApp kytos/topology shutting down.')
70
71 1
    @staticmethod
72
    def _get_metadata():
73
        """Return a JSON with metadata."""
74 1
        try:
75 1
            metadata = request.get_json()
76 1
            content_type = request.content_type
77 1
        except BadRequest as err:
78 1
            result = 'The request body is not a well-formed JSON.'
79 1
            raise BadRequest(result) from err
80 1
        if content_type is None:
81
            result = 'The request body is empty.'
82
            raise BadRequest(result)
83 1
        if metadata is None:
84 1
            if content_type != 'application/json':
85
                result = ('The content type must be application/json '
86
                          f'(received {content_type}).')
87
            else:
88 1
                result = 'Metadata is empty.'
89 1
            raise UnsupportedMediaType(result)
90 1
        return metadata
91
92 1
    def _get_link_or_create(self, endpoint_a, endpoint_b):
93
        """Get an existing link or create a new one.
94
95
        Returns:
96
            Tuple(Link, bool): Link and a boolean whether it has been created.
97
        """
98 1
        new_link = Link(endpoint_a, endpoint_b)
99
100 1
        for link in self.links.values():
101 1
            if new_link == link:
102 1
                return (link, False)
103
104 1
        self.links[new_link.id] = new_link
105 1
        return (new_link, True)
106
107 1
    def _get_switches_dict(self):
108
        """Return a dictionary with the known switches."""
109 1
        switches = {'switches': {}}
110 1
        for idx, switch in enumerate(self.controller.switches.values()):
111 1
            switch_data = switch.as_dict()
112 1
            if not all(key in switch_data['metadata']
113
                       for key in ('lat', 'lng')):
114
                # Switches are initialized somewhere in the ocean
115
                switch_data['metadata']['lat'] = str(0.0)
116
                switch_data['metadata']['lng'] = str(-30.0+idx*10.0)
117 1
            switches['switches'][switch.id] = switch_data
118 1
        return switches
119
120 1
    def _get_links_dict(self):
121
        """Return a dictionary with the known links."""
122 1
        return {'links': {link.id: link.as_dict() for link in
123
                          self.links.values()}}
124
125 1
    def _get_topology_dict(self):
126
        """Return a dictionary with the known topology."""
127 1
        return {'topology': {**self._get_switches_dict(),
128
                             **self._get_links_dict()}}
129
130 1
    def _get_topology(self):
131
        """Return an object representing the topology."""
132 1
        return Topology(self.controller.switches, self.links)
133
134 1
    def _get_link_from_interface(self, interface):
135
        """Return the link of the interface, or None if it does not exist."""
136 1
        for link in self.links.values():
137 1
            if interface in (link.endpoint_a, link.endpoint_b):
138 1
                return link
139 1
        return None
140
141 1
    def _load_link(self, link_att):
142 1
        endpoint_a = link_att['endpoint_a']['id']
143 1
        endpoint_b = link_att['endpoint_b']['id']
144 1
        link_str = link_att['id']
145 1
        log.info(f"Loading link: {link_str}")
146 1
        interface_a = self.controller.get_interface_by_id(endpoint_a)
147 1
        interface_b = self.controller.get_interface_by_id(endpoint_b)
148
149 1
        error = f"Fail to load endpoints for link {link_str}. "
150 1
        if not interface_a:
151 1
            raise RestoreError(f"{error}, endpoint_a {endpoint_a} not found")
152 1
        if not interface_b:
153
            raise RestoreError(f"{error}, endpoint_b {endpoint_b} not found")
154
155 1
        with self._links_lock:
156 1
            link, _ = self._get_link_or_create(interface_a, interface_b)
157
158 1
        if link_att['enabled']:
159 1
            link.enable()
160
        else:
161 1
            link.disable()
162
163 1
        link.extend_metadata(link_att["metadata"])
164 1
        interface_a.update_link(link)
165 1
        interface_b.update_link(link)
166 1
        interface_a.nni = True
167 1
        interface_b.nni = True
168
169 1
    def _load_switch(self, switch_id, switch_att):
170 1
        log.info(f'Loading switch dpid: {switch_id}')
171 1
        switch = self.controller.get_switch_or_create(switch_id)
172 1
        if switch_att['enabled']:
173 1
            switch.enable()
174
        else:
175 1
            switch.disable()
176 1
        switch.description['manufacturer'] = switch_att.get('manufacturer', '')
177 1
        switch.description['hardware'] = switch_att.get('hardware', '')
178 1
        switch.description['software'] = switch_att.get('software')
179 1
        switch.description['serial'] = switch_att.get('serial', '')
180 1
        switch.description['data_path'] = switch_att.get('data_path', '')
181 1
        switch.extend_metadata(switch_att["metadata"])
182
183 1
        for iface_id, iface_att in switch_att.get('interfaces', {}).items():
184 1
            log.info(f'Loading interface iface_id={iface_id}')
185 1
            interface = switch.update_or_create_interface(
186
                            port_no=iface_att['port_number'],
187
                            name=iface_att['name'],
188
                            address=iface_att.get('mac', None),
189
                            speed=iface_att.get('speed', None))
190 1
            if iface_att['enabled']:
191 1
                interface.enable()
192
            else:
193 1
                interface.disable()
194 1
            interface.lldp = iface_att['lldp']
195 1
            interface.extend_metadata(iface_att["metadata"])
196 1
            name = 'kytos/topology.port.created'
197 1
            event = KytosEvent(name=name, content={
198
                                              'switch': switch_id,
199
                                              'port': interface.port_number,
200
                                              'port_description': {
201
                                                  'alias': interface.name,
202
                                                  'mac': interface.address,
203
                                                  'state': interface.state
204
                                                  }
205
                                              })
206 1
            self.controller.buffers.app.put(event)
207
208 1
        intf_ids = [v["id"] for v in switch_att.get("interfaces", {}).values()]
209 1
        intf_details = self.topo_controller.get_interfaces_details(intf_ids)
210 1
        with self._links_lock:
211 1
            self.load_interfaces_available_tags(switch, intf_details)
212
213
    # pylint: disable=attribute-defined-outside-init
214 1
    def load_topology(self):
215
        """Load network topology from DB."""
216 1
        topology = self.topo_controller.get_topology()
217 1
        switches = topology["topology"]["switches"]
218 1
        links = topology["topology"]["links"]
219
220 1
        failed_switches = {}
221 1
        log.debug(f"_load_network_status switches={switches}")
222 1
        for switch_id, switch_att in switches.items():
223 1
            try:
224 1
                self._load_switch(switch_id, switch_att)
225
            # pylint: disable=broad-except
226 1
            except Exception as err:
227 1
                failed_switches[switch_id] = err
228 1
                log.error(f'Error loading switch: {err}')
229
230 1
        failed_links = {}
231 1
        log.debug(f"_load_network_status links={links}")
232 1
        for link_id, link_att in links.items():
233 1
            try:
234 1
                self._load_link(link_att)
235
            # pylint: disable=broad-except
236 1
            except Exception as err:
237 1
                failed_links[link_id] = err
238 1
                log.error(f'Error loading link {link_id}: {err}')
239
240 1
        name = 'kytos/topology.topology_loaded'
241 1
        event = KytosEvent(
242
            name=name,
243
            content={
244
                'topology': self._get_topology(),
245
                'failed_switches': failed_switches,
246
                'failed_links': failed_links
247
            })
248 1
        self.controller.buffers.app.put(event)
249
250 1
    @rest('v3/')
251
    def get_topology(self):
252
        """Return the latest known topology.
253
254
        This topology is updated when there are network events.
255
        """
256 1
        return jsonify(self._get_topology_dict())
257
258
    # Switch related methods
259 1
    @rest('v3/switches')
260
    def get_switches(self):
261
        """Return a json with all the switches in the topology."""
262
        return jsonify(self._get_switches_dict())
263
264 1
    @rest('v3/switches/<dpid>/enable', methods=['POST'])
265
    def enable_switch(self, dpid):
266
        """Administratively enable a switch in the topology."""
267 1
        try:
268 1
            switch = self.controller.switches[dpid]
269 1
            self.topo_controller.enable_switch(dpid)
270 1
            switch.enable()
271 1
        except KeyError:
272 1
            return jsonify("Switch not found"), 404
273
274 1
        self.notify_switch_enabled(dpid)
275 1
        self.notify_topology_update()
276 1
        return jsonify("Operation successful"), 201
277
278 1
    @rest('v3/switches/<dpid>/disable', methods=['POST'])
279
    def disable_switch(self, dpid):
280
        """Administratively disable a switch in the topology."""
281 1
        try:
282 1
            switch = self.controller.switches[dpid]
283 1
            self.topo_controller.disable_switch(dpid)
284 1
            switch.disable()
285 1
        except KeyError:
286 1
            return jsonify("Switch not found"), 404
287
288 1
        self.notify_switch_disabled(dpid)
289 1
        self.notify_topology_update()
290 1
        return jsonify("Operation successful"), 201
291
292 1
    @rest('v3/switches/<dpid>/metadata')
293
    def get_switch_metadata(self, dpid):
294
        """Get metadata from a switch."""
295 1
        try:
296 1
            return jsonify({"metadata":
297
                            self.controller.switches[dpid].metadata}), 200
298 1
        except KeyError:
299 1
            return jsonify("Switch not found"), 404
300
301 1
    @rest('v3/switches/<dpid>/metadata', methods=['POST'])
302
    def add_switch_metadata(self, dpid):
303
        """Add metadata to a switch."""
304 1
        metadata = self._get_metadata()
305
306 1
        try:
307 1
            switch = self.controller.switches[dpid]
308 1
        except KeyError:
309 1
            return jsonify("Switch not found"), 404
310
311 1
        self.topo_controller.add_switch_metadata(dpid, metadata)
312 1
        switch.extend_metadata(metadata)
313 1
        self.notify_metadata_changes(switch, 'added')
314 1
        return jsonify("Operation successful"), 201
315
316 1
    @rest('v3/switches/<dpid>/metadata/<key>', methods=['DELETE'])
317
    def delete_switch_metadata(self, dpid, key):
318
        """Delete metadata from a switch."""
319 1
        try:
320 1
            switch = self.controller.switches[dpid]
321 1
        except KeyError:
322 1
            return jsonify("Switch not found"), 404
323
324 1
        try:
325 1
            _ = switch.metadata[key]
326
        except KeyError:
327
            return jsonify("Metadata not found"), 404
328
329 1
        self.topo_controller.delete_switch_metadata_key(dpid, key)
330 1
        switch.remove_metadata(key)
331 1
        self.notify_metadata_changes(switch, 'removed')
332 1
        return jsonify("Operation successful"), 200
333
334
    # Interface related methods
335 1
    @rest('v3/interfaces')
336
    def get_interfaces(self):
337
        """Return a json with all the interfaces in the topology."""
338 1
        interfaces = {}
339 1
        switches = self._get_switches_dict()
340 1
        for switch in switches['switches'].values():
341 1
            for interface_id, interface in switch['interfaces'].items():
342 1
                interfaces[interface_id] = interface
343
344 1
        return jsonify({'interfaces': interfaces})
345
346 1 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
347 1
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
348 1
    def enable_interface(self, interface_enable_id=None, dpid=None):
349
        """Administratively enable interfaces in the topology."""
350 1
        if dpid is None:
351 1
            dpid = ":".join(interface_enable_id.split(":")[:-1])
352 1
        try:
353 1
            switch = self.controller.switches[dpid]
354 1
        except KeyError as exc:
355 1
            return jsonify(f"Switch not found: {exc}"), 404
356
357 1
        if interface_enable_id:
358 1
            interface_number = int(interface_enable_id.split(":")[-1])
359
360 1
            try:
361 1
                interface = switch.interfaces[interface_number]
362 1
                self.topo_controller.enable_interface(interface.id)
363 1
                interface.enable()
364 1
            except KeyError:
365 1
                msg = f"Switch {dpid} interface {interface_number} not found"
366 1
                return jsonify(msg), 404
367
        else:
368 1
            for interface in switch.interfaces.values():
369 1
                interface.enable()
370 1
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
371 1
        self.notify_topology_update()
372 1
        return jsonify("Operation successful"), 200
373
374 1 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
375 1
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
376 1
    def disable_interface(self, interface_disable_id=None, dpid=None):
377
        """Administratively disable interfaces in the topology."""
378 1
        if dpid is None:
379 1
            dpid = ":".join(interface_disable_id.split(":")[:-1])
380 1
        try:
381 1
            switch = self.controller.switches[dpid]
382 1
        except KeyError as exc:
383 1
            return jsonify(f"Switch not found: {exc}"), 404
384
385 1
        if interface_disable_id:
386 1
            interface_number = int(interface_disable_id.split(":")[-1])
387
388 1
            try:
389 1
                interface = switch.interfaces[interface_number]
390 1
                self.topo_controller.disable_interface(interface.id)
391 1
                interface.disable()
392 1
            except KeyError:
393 1
                msg = f"Switch {dpid} interface {interface_number} not found"
394 1
                return jsonify(msg), 404
395
        else:
396 1
            for interface in switch.interfaces.values():
397 1
                interface.disable()
398 1
            self.topo_controller.upsert_switch(switch.id, switch.as_dict())
399 1
        self.notify_topology_update()
400 1
        return jsonify("Operation successful"), 200
401
402 1
    @rest('v3/interfaces/<interface_id>/metadata')
403
    def get_interface_metadata(self, interface_id):
404
        """Get metadata from an interface."""
405 1
        switch_id = ":".join(interface_id.split(":")[:-1])
406 1
        interface_number = int(interface_id.split(":")[-1])
407 1
        try:
408 1
            switch = self.controller.switches[switch_id]
409 1
        except KeyError:
410 1
            return jsonify("Switch not found"), 404
411
412 1
        try:
413 1
            interface = switch.interfaces[interface_number]
414 1
        except KeyError:
415 1
            return jsonify("Interface not found"), 404
416
417 1
        return jsonify({"metadata": interface.metadata}), 200
418
419 1
    @rest('v3/interfaces/<interface_id>/metadata', methods=['POST'])
420
    def add_interface_metadata(self, interface_id):
421
        """Add metadata to an interface."""
422 1
        metadata = self._get_metadata()
423 1
        switch_id = ":".join(interface_id.split(":")[:-1])
424 1
        interface_number = int(interface_id.split(":")[-1])
425 1
        try:
426 1
            switch = self.controller.switches[switch_id]
427 1
        except KeyError:
428 1
            return jsonify("Switch not found"), 404
429
430 1
        try:
431 1
            interface = switch.interfaces[interface_number]
432 1
            self.topo_controller.add_interface_metadata(interface.id, metadata)
433 1
        except KeyError:
434 1
            return jsonify("Interface not found"), 404
435
436 1
        interface.extend_metadata(metadata)
437 1
        self.notify_metadata_changes(interface, 'added')
438 1
        return jsonify("Operation successful"), 201
439
440 1
    @rest('v3/interfaces/<interface_id>/metadata/<key>', methods=['DELETE'])
441
    def delete_interface_metadata(self, interface_id, key):
442
        """Delete metadata from an interface."""
443 1
        switch_id = ":".join(interface_id.split(":")[:-1])
444 1
        interface_number = int(interface_id.split(":")[-1])
445
446 1
        try:
447 1
            switch = self.controller.switches[switch_id]
448 1
        except KeyError:
449 1
            return jsonify("Switch not found"), 404
450
451 1
        try:
452 1
            interface = switch.interfaces[interface_number]
453 1
        except KeyError:
454 1
            return jsonify("Interface not found"), 404
455
456 1
        try:
457 1
            _ = interface.metadata[key]
458 1
        except KeyError:
459 1
            return jsonify("Metadata not found"), 404
460
461 1
        self.topo_controller.delete_interface_metadata_key(interface.id, key)
462 1
        interface.remove_metadata(key)
463 1
        self.notify_metadata_changes(interface, 'removed')
464 1
        return jsonify("Operation successful"), 200
465
466
    # Link related methods
467 1
    @rest('v3/links')
468
    def get_links(self):
469
        """Return a json with all the links in the topology.
470
471
        Links are connections between interfaces.
472
        """
473
        return jsonify(self._get_links_dict()), 200
474
475 1
    @rest('v3/links/<link_id>/enable', methods=['POST'])
476
    def enable_link(self, link_id):
477
        """Administratively enable a link in the topology."""
478 1
        try:
479 1
            with self._links_lock:
480 1
                link = self.links[link_id]
481 1
                self.topo_controller.enable_link(link_id)
482 1
                link.enable()
483 1
        except KeyError:
484 1
            return jsonify("Link not found"), 404
485 1
        self.notify_link_status_change(
486
            self.links[link_id],
487
            reason='link enabled'
488
        )
489 1
        self.notify_topology_update()
490 1
        return jsonify("Operation successful"), 201
491
492 1
    @rest('v3/links/<link_id>/disable', methods=['POST'])
493
    def disable_link(self, link_id):
494
        """Administratively disable a link in the topology."""
495 1
        try:
496 1
            with self._links_lock:
497 1
                link = self.links[link_id]
498 1
                self.topo_controller.disable_link(link_id)
499 1
                link.disable()
500 1
        except KeyError:
501 1
            return jsonify("Link not found"), 404
502 1
        self.notify_link_status_change(
503
            self.links[link_id],
504
            reason='link disabled'
505
        )
506 1
        self.notify_topology_update()
507 1
        return jsonify("Operation successful"), 201
508
509 1
    @rest('v3/links/<link_id>/metadata')
510
    def get_link_metadata(self, link_id):
511
        """Get metadata from a link."""
512 1
        try:
513 1
            return jsonify({"metadata": self.links[link_id].metadata}), 200
514 1
        except KeyError:
515 1
            return jsonify("Link not found"), 404
516
517 1
    @rest('v3/links/<link_id>/metadata', methods=['POST'])
518
    def add_link_metadata(self, link_id):
519
        """Add metadata to a link."""
520 1
        metadata = self._get_metadata()
521 1
        try:
522 1
            link = self.links[link_id]
523 1
        except KeyError:
524 1
            return jsonify("Link not found"), 404
525
526 1
        self.topo_controller.add_link_metadata(link_id, metadata)
527 1
        link.extend_metadata(metadata)
528 1
        self.notify_metadata_changes(link, 'added')
529 1
        return jsonify("Operation successful"), 201
530
531 1
    @rest('v3/links/<link_id>/metadata/<key>', methods=['DELETE'])
532
    def delete_link_metadata(self, link_id, key):
533
        """Delete metadata from a link."""
534 1
        try:
535 1
            link = self.links[link_id]
536 1
        except KeyError:
537 1
            return jsonify("Link not found"), 404
538
539 1
        try:
540 1
            _ = link.metadata[key]
541 1
        except KeyError:
542 1
            return jsonify("Metadata not found"), 404
543
544 1
        self.topo_controller.delete_link_metadata_key(link.id, key)
545 1
        link.remove_metadata(key)
546 1
        self.notify_metadata_changes(link, 'removed')
547 1
        return jsonify("Operation successful"), 200
548
549 1
    @listen_to("kytos/.*.link_available_tags")
550
    def on_link_available_tags(self, event):
551
        """Handle on_link_available_tags."""
552
        with self._links_lock:
553
            self.handle_on_link_available_tags(event.content.get("link"))
554
555 1
    def handle_on_link_available_tags(self, link):
556
        """Handle on_link_available_tags."""
557 1
        if link.id not in self.links:
558
            return
559 1
        endpoint_a = self.links[link.id].endpoint_a
560 1
        endpoint_b = self.links[link.id].endpoint_b
561 1
        values_a = [tag.value for tag in endpoint_a.available_tags]
562 1
        values_b = [tag.value for tag in endpoint_b.available_tags]
563 1
        ids_details = [
564
            (endpoint_a.id, {"_id": endpoint_a.id,
565
                             "available_vlans": values_a}),
566
            (endpoint_b.id, {"_id": endpoint_b.id,
567
                             "available_vlans": values_b})
568
        ]
569 1
        self.topo_controller.bulk_upsert_interface_details(ids_details)
570
571 1
    @listen_to('.*.switch.(new|reconnected)')
572
    def on_new_switch(self, event):
573
        """Create a new Device on the Topology.
574
575
        Handle the event of a new created switch and update the topology with
576
        this new device. Also notify if the switch is enabled.
577
        """
578
        self.handle_new_switch(event)
579
580 1
    def handle_new_switch(self, event):
581
        """Create a new Device on the Topology."""
582 1
        switch = event.content['switch']
583 1
        switch.activate()
584 1
        self.topo_controller.upsert_switch(switch.id, switch.as_dict())
585 1
        log.debug('Switch %s added to the Topology.', switch.id)
586 1
        self.notify_topology_update()
587 1
        if switch.is_enabled():
588 1
            self.notify_switch_enabled(switch.id)
589
590 1
    @listen_to('.*.connection.lost')
591
    def on_connection_lost(self, event):
592
        """Remove a Device from the topology.
593
594
        Remove the disconnected Device and every link that has one of its
595
        interfaces.
596
        """
597
        self.handle_connection_lost(event)
598
599 1
    def handle_connection_lost(self, event):
600
        """Remove a Device from the topology."""
601 1
        switch = event.content['source'].switch
602 1
        if switch:
603 1
            switch.deactivate()
604 1
            self.topo_controller.deactivate_switch(switch.id)
605 1
            log.debug('Switch %s removed from the Topology.', switch.id)
606 1
            self.notify_topology_update()
607
608 1
    def handle_interfaces_created(self, event):
609
        """Update the topology based on the interfaces created."""
610 1
        interfaces = event.content["interfaces"]
611 1
        if not interfaces:
612
            return
613 1
        switch = interfaces[0].switch
614 1
        self.topo_controller.upsert_switch(switch.id, switch.as_dict())
615 1
        name = "kytos/topology.switch.interface.created"
616 1
        for interface in interfaces:
617 1
            event = KytosEvent(name=name, content={'interface': interface})
618 1
            self.controller.buffers.app.put(event)
619
620 1
    def handle_interface_created(self, event):
621
        """Update the topology based on an interface created event.
622
623
        It's handled as a link_up in case a switch send a
624
        created event again and it can be belong to a link.
625
        """
626 1
        interface = event.content['interface']
627 1
        self.handle_interface_link_up(interface)
628
629 1
    @listen_to('.*.topology.switch.interface.created')
630
    def on_interface_created(self, event):
631
        """Handle individual interface create event.
632
633
        It's handled as a link_up in case a switch send a
634
        created event it can belong to an existign link.
635
        """
636
        self.handle_interface_created(event)
637
638 1
    @listen_to('.*.switch.interfaces.created')
639
    def on_interfaces_created(self, event):
640
        """Update the topology based on a list of created interfaces."""
641
        self.handle_interfaces_created(event)
642
643 1
    def handle_interface_down(self, event):
644
        """Update the topology based on a Port Modify event.
645
646
        The event notifies that an interface was changed to 'down'.
647
        """
648 1
        interface = event.content['interface']
649 1
        interface.deactivate()
650 1
        self.topo_controller.deactivate_interface(interface.id)
651 1
        self.handle_interface_link_down(interface)
652
653 1
    @listen_to('.*.switch.interface.deleted')
654
    def on_interface_deleted(self, event):
655
        """Update the topology based on a Port Delete event."""
656
        self.handle_interface_deleted(event)
657
658 1
    def handle_interface_deleted(self, event):
659
        """Update the topology based on a Port Delete event."""
660 1
        self.handle_interface_down(event)
661
662 1
    @listen_to('.*.switch.interface.link_up')
663
    def on_interface_link_up(self, event):
664
        """Update the topology based on a Port Modify event.
665
666
        The event notifies that an interface's link was changed to 'up'.
667
        """
668
        interface = event.content['interface']
669
        self.handle_interface_link_up(interface)
670
671 1
    def handle_interface_link_up(self, interface):
672
        """Update the topology based on a Port Modify event."""
673 1
        self.handle_link_up(interface)
674
675 1
    @listen_to('kytos/maintenance.end_switch')
676
    def on_switch_maintenance_end(self, event):
677
        """Handle the end of the maintenance of a switch."""
678
        self.handle_switch_maintenance_end(event)
679
680 1
    def handle_switch_maintenance_end(self, event):
681
        """Handle the end of the maintenance of a switch."""
682 1
        switches = event.content['switches']
683 1
        for switch in switches:
684 1
            switch.enable()
685 1
            switch.activate()
686 1
            for interface in switch.interfaces.values():
687 1
                interface.enable()
688 1
                self.handle_link_up(interface)
689
690 1
    def handle_link_up(self, interface):
691
        """Notify a link is up."""
692 1
        interface.activate()
693 1
        self.topo_controller.activate_interface(interface.id)
694 1
        self.notify_topology_update()
695 1
        with self._links_lock:
696 1
            link = self._get_link_from_interface(interface)
697 1
        if not link:
698
            return
699 1
        if link.endpoint_a == interface:
700 1
            other_interface = link.endpoint_b
701
        else:
702 1
            other_interface = link.endpoint_a
703 1
        if other_interface.is_active() is False:
704 1
            return
705 1
        if link.is_active() is False:
706 1
            link.update_metadata('last_status_change', time.time())
707 1
            link.activate()
708
709
            # As each run of this method uses a different thread,
710
            # there is no risk this sleep will lock the NApp.
711 1
            time.sleep(self.link_up_timer)
712
713 1
            last_status_change = link.get_metadata('last_status_change')
714 1
            now = time.time()
715 1
            if link.is_active() and \
716
                    now - last_status_change >= self.link_up_timer:
717 1
                link.update_metadata('last_status_is_active', True)
718 1
                self.topo_controller._update_link(
719
                    link.id,
720
                    {
721
                        "$set": {
722
                            "metadata.last_status_change": last_status_change,
723
                            "metadata.last_status_is_active": True,
724
                            "active": True,
725
                        }
726
                    },
727
                )
728 1
                self.notify_topology_update()
729 1
                self.notify_link_status_change(link, reason='link up')
730
        else:
731 1
            last_status_change = time.time()
732 1
            metadata = {'last_status_change': last_status_change,
733
                        'last_status_is_active': True}
734 1
            link.extend_metadata(metadata)
735 1
            self.topo_controller.add_link_metadata(link.id, metadata)
736 1
            self.topo_controller._update_link(
737
                link.id,
738
                {
739
                    "$set": {
740
                        "metadata.last_status_change": last_status_change,
741
                        "metadata.last_status_is_active": True,
742
                        "active": True,
743
                    }
744
                },
745
            )
746 1
            self.notify_topology_update()
747 1
            self.notify_link_status_change(link, reason='link up')
748
749 1
    @listen_to('.*.switch.interface.link_down')
750
    def on_interface_link_down(self, event):
751
        """Update the topology based on a Port Modify event.
752
753
        The event notifies that an interface's link was changed to 'down'.
754
        """
755
        interface = event.content['interface']
756
        self.handle_interface_link_down(interface)
757
758 1
    def handle_interface_link_down(self, interface):
759
        """Update the topology based on an interface."""
760 1
        self.handle_link_down(interface)
761
762 1
    @listen_to('kytos/maintenance.start_switch')
763
    def on_switch_maintenance_start(self, event):
764
        """Handle the start of the maintenance of a switch."""
765
        self.handle_switch_maintenance_start(event)
766
767 1
    def handle_switch_maintenance_start(self, event):
768
        """Handle the start of the maintenance of a switch."""
769 1
        switches = event.content['switches']
770 1
        for switch in switches:
771 1
            switch.disable()
772 1
            switch.deactivate()
773 1
            for interface in switch.interfaces.values():
774 1
                interface.disable()
775 1
                if interface.is_active():
776 1
                    self.handle_link_down(interface)
777
778 1
    def handle_link_down(self, interface):
779
        """Notify a link is down."""
780 1
        link = self._get_link_from_interface(interface)
781 1
        if link and link.is_active():
782 1
            link.deactivate()
783 1
            last_status_change = time.time()
784 1
            metadata = {
785
                "last_status_change": last_status_change,
786
                "last_status_is_active": False,
787
            }
788 1
            link.extend_metadata(metadata)
789 1
            update_expr = {
790
                "$set": {
791
                    "active": False,
792
                    "metadata.last_status_change": last_status_change,
793
                    "metadata.last_status_is_active": False,
794
                }
795
            }
796 1
            self.topo_controller._update_link(link.id, update_expr)
797 1
            self.notify_link_status_change(link, reason="link down")
798 1
        if link and not link.is_active():
799 1
            with self._links_lock:
800 1
                last_status = link.get_metadata('last_status_is_active')
801 1
                last_status_change = link.get_metadata('last_status_change')
802 1
                metadata = {
803
                    "last_status_change": last_status_change,
804
                    "last_status_is_active": last_status,
805
                }
806 1
                if last_status:
807 1
                    link.extend_metadata(metadata)
808 1
                    update_expr = {
809
                        "$set": {
810
                            "active": False,
811
                            "metadata.last_status_change": last_status_change,
812
                            "metadata.last_status_is_active": last_status,
813
                        }
814
                    }
815 1
                    self.topo_controller._update_link(link.id, update_expr)
816 1
                    self.notify_link_status_change(link, reason='link down')
817 1
        interface.deactivate()
818 1
        self.topo_controller.deactivate_interface(interface.id)
819 1
        self.notify_topology_update()
820
821 1
    @listen_to('.*.interface.is.nni')
822
    def on_add_links(self, event):
823
        """Update the topology with links related to the NNI interfaces."""
824
        self.add_links(event)
825
826 1
    def add_links(self, event):
827
        """Update the topology with links related to the NNI interfaces."""
828 1
        interface_a = event.content['interface_a']
829 1
        interface_b = event.content['interface_b']
830
831 1
        try:
832 1
            with self._links_lock:
833 1
                link, created = self._get_link_or_create(interface_a,
834
                                                         interface_b)
835 1
                interface_a.update_link(link)
836 1
                interface_b.update_link(link)
837
838 1
                link.endpoint_a = interface_a
839 1
                link.endpoint_b = interface_b
840
841 1
                interface_a.nni = True
842 1
                interface_b.nni = True
843
844
        except KytosLinkCreationError as err:
845
            log.error(f'Error creating link: {err}.')
846
            return
847
848 1
        if created:
849 1
            link.update_metadata('last_status_is_active', True)
850 1
            self.notify_link_status_change(link, reason='link up')
851 1
            self.notify_topology_update()
852 1
            self.topo_controller.upsert_link(link.id, link.as_dict())
853
854 1
    @listen_to('.*.of_lldp.network_status.updated')
855
    def on_lldp_status_updated(self, event):
856
        """Handle of_lldp.network_status.updated from of_lldp."""
857
        self.handle_lldp_status_updated(event)
858
859 1
    def handle_lldp_status_updated(self, event) -> None:
860
        """Handle .*.network_status.updated events from of_lldp."""
861 1
        content = event.content
862 1
        interface_ids = content["interface_ids"]
863 1
        for interface_id in interface_ids:
864 1
            if content["state"] == "disabled":
865 1
                self.topo_controller.disable_interface_lldp(interface_id)
866 1
            elif content["state"] == "enabled":
867 1
                self.topo_controller.enable_interface_lldp(interface_id)
868
869 1
    def notify_switch_enabled(self, dpid):
870
        """Send an event to notify that a switch is enabled."""
871 1
        name = 'kytos/topology.switch.enabled'
872 1
        event = KytosEvent(name=name, content={'dpid': dpid})
873 1
        self.controller.buffers.app.put(event)
874
875 1
    def notify_switch_disabled(self, dpid):
876
        """Send an event to notify that a switch is disabled."""
877 1
        name = 'kytos/topology.switch.disabled'
878 1
        event = KytosEvent(name=name, content={'dpid': dpid})
879 1
        self.controller.buffers.app.put(event)
880
881 1
    def notify_topology_update(self):
882
        """Send an event to notify about updates on the topology."""
883 1
        name = 'kytos/topology.updated'
884 1
        event = KytosEvent(name=name, content={'topology':
885
                                               self._get_topology()})
886 1
        self.controller.buffers.app.put(event)
887
888 1
    def notify_link_status_change(self, link, reason='not given'):
889
        """Send an event to notify about a status change on a link."""
890 1
        name = 'kytos/topology.'
891 1
        if link.is_active() and link.is_enabled():
892 1
            status = 'link_up'
893
        else:
894
            status = 'link_down'
895 1
        event = KytosEvent(
896
            name=name+status,
897
            content={
898
                'link': link,
899
                'reason': reason
900
            })
901 1
        self.controller.buffers.app.put(event)
902
903 1
    def notify_metadata_changes(self, obj, action):
904
        """Send an event to notify about metadata changes."""
905 1
        if isinstance(obj, Switch):
906 1
            entity = 'switch'
907 1
            entities = 'switches'
908 1
        elif isinstance(obj, Interface):
909 1
            entity = 'interface'
910 1
            entities = 'interfaces'
911 1
        elif isinstance(obj, Link):
912 1
            entity = 'link'
913 1
            entities = 'links'
914
        else:
915 1
            raise ValueError(
916
                'Invalid object, supported: Switch, Interface, Link'
917
            )
918
919 1
        name = f'kytos/topology.{entities}.metadata.{action}'
920 1
        event = KytosEvent(name=name, content={entity: obj,
921
                                               'metadata': obj.metadata})
922 1
        self.controller.buffers.app.put(event)
923 1
        log.debug(f'Metadata from {obj.id} was {action}.')
924
925 1
    @listen_to('.*.switch.port.created')
926
    def on_notify_port_created(self, event):
927
        """Notify when a port is created."""
928
        self.notify_port_created(event)
929
930 1
    def notify_port_created(self, event):
931
        """Notify when a port is created."""
932 1
        name = 'kytos/topology.port.created'
933 1
        event = KytosEvent(name=name, content=event.content)
934 1
        self.controller.buffers.app.put(event)
935
936 1
    @staticmethod
937 1
    def load_interfaces_available_tags(switch: Switch,
938
                                       interfaces_details: List[dict]) -> None:
939
        """Load interfaces available tags (vlans)."""
940 1
        if not interfaces_details:
941
            return
942 1
        for interface_details in interfaces_details:
943 1
            available_vlans = interface_details["available_vlans"]
944 1
            if not available_vlans:
945
                continue
946 1
            log.debug(f"Interface id {interface_details['id']} loading "
947
                      f"{len(interface_details['available_vlans'])} "
948
                      "available tags")
949 1
            port_number = int(interface_details["id"].split(":")[-1])
950 1
            interface = switch.interfaces[port_number]
951 1
            interface.set_available_tags(interface_details['available_vlans'])
952
953 1
    @listen_to('kytos/maintenance.start_link')
954
    def on_link_maintenance_start(self, event):
955
        """Deals with the start of links maintenance."""
956
        with self._links_lock:
957
            self.handle_link_maintenance_start(event)
958
959 1
    def handle_link_maintenance_start(self, event):
960
        """Deals with the start of links maintenance."""
961 1
        notify_links = []
962 1
        maintenance_links = event.content['links']
963 1
        for maintenance_link in maintenance_links:
964 1
            try:
965 1
                link = self.links[maintenance_link.id]
966 1
            except KeyError:
967 1
                continue
968 1
            notify_links.append(link)
969 1
        for link in notify_links:
970 1
            link.disable()
971 1
            link.deactivate()
972 1
            link.endpoint_a.deactivate()
973 1
            link.endpoint_b.deactivate()
974 1
            link.endpoint_a.disable()
975 1
            link.endpoint_b.disable()
976 1
            self.notify_link_status_change(link, reason='maintenance')
977
978 1
    @listen_to('kytos/maintenance.end_link')
979
    def on_link_maintenance_end(self, event):
980
        """Deals with the end of links maintenance."""
981
        with self._links_lock:
982
            self.handle_link_maintenance_end(event)
983
984 1
    def handle_link_maintenance_end(self, event):
985
        """Deals with the end of links maintenance."""
986 1
        notify_links = []
987 1
        maintenance_links = event.content['links']
988 1
        for maintenance_link in maintenance_links:
989 1
            try:
990 1
                link = self.links[maintenance_link.id]
991 1
            except KeyError:
992 1
                continue
993 1
            notify_links.append(link)
994 1
        for link in notify_links:
995 1
            link.enable()
996 1
            link.activate()
997 1
            link.endpoint_a.activate()
998 1
            link.endpoint_b.activate()
999 1
            link.endpoint_a.enable()
1000 1
            link.endpoint_b.enable()
1001
            self.notify_link_status_change(link, reason='maintenance')
1002