Test Failed
Pull Request — master (#123)
by Humberto
02:32
created

build.main.Main._restore_status()   B

Complexity

Conditions 7

Size

Total Lines 33
Code Lines 25

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 7.6773

Importance

Changes 0
Metric Value
cc 7
eloc 25
nop 1
dl 0
loc 33
ccs 19
cts 25
cp 0.76
crap 7.6773
rs 7.8799
c 0
b 0
f 0
1
"""Main module of kytos/topology Kytos Network Application.
2
3
Manage the network topology
4
"""
5 1
import time
6
7 1
from flask import jsonify, request
8
9 1
from kytos.core import KytosEvent, KytosNApp, log, rest
10 1
from kytos.core.exceptions import KytosLinkCreationError
11 1
from kytos.core.helpers import listen_to
12 1
from kytos.core.interface import Interface
13 1
from kytos.core.link import Link
14 1
from kytos.core.switch import Switch
15 1
from napps.kytos.topology import settings
16 1
from napps.kytos.topology.models import Topology
17 1
from napps.kytos.topology.storehouse import StoreHouse
18
19 1
DEFAULT_LINK_UP_TIMER = 10
20
DEFAULT_INTERFACE_RESTORE_TIMER = 2
21
RESTORE_INTERFACE_ATTEMPTS = 20
22 1
23
24
class RestoreError(Exception):
25
    """Error restoring entity data from Storehouse."""
26
27
28 1
class Main(KytosNApp):  # pylint: disable=too-many-public-methods
29
    """Main class of kytos/topology NApp.
30 1
31 1
    This class is the entry point for this napp.
32 1
    """
33 1
34 1
    def setup(self):
35 1
        """Initialize the NApp's links list."""
36
        self.links = {}
37
        self.store_items = {}
38 1
        self.switches_state = {}
39 1
        self.interfaces_state = {}
40 1
        self.links_state = {}
41
        self._verified_links = []
42 1
        self.link_up_timer = getattr(settings, 'LINK_UP_TIMER',
43
                                     DEFAULT_LINK_UP_TIMER)
44 1
        self.interface_restore = getattr(settings, 'INTERFACE_RESTORE_TIMER',
45
                                         DEFAULT_INTERFACE_RESTORE_TIMER)
46
47 1
        self.verify_storehouse('switches')
48
        self.verify_storehouse('interfaces')
49
        self.verify_storehouse('links')
50
51 1
        self.storehouse = StoreHouse(self.controller)
52 1
53
    def execute(self):
54 1
        """Execute once when the napp is running."""
55
        self._load_network_status()
56
57
    def shutdown(self):
58 1
        """Do nothing."""
59 1
        log.info('NApp kytos/topology shutting down.')
60
61 1
    def _get_link_or_create(self, endpoint_a, endpoint_b):
62
        new_link = Link(endpoint_a, endpoint_b)
63 1
64 1
        for link in self.links.values():
65 1
            if new_link == link:
66 1
                return link
67
68
        self.links[new_link.id] = new_link
69
        return new_link
70
71 1
    def _get_switches_dict(self):
72 1
        """Return a dictionary with the known switches."""
73
        switches = {'switches': {}}
74 1
        for idx, switch in enumerate(self.controller.switches.values()):
75
            switch_data = switch.as_dict()
76 1
            if not all(key in switch_data['metadata']
77
                       for key in ('lat', 'lng')):
78
                # Switches are initialized somewhere in the ocean
79 1
                switch_data['metadata']['lat'] = str(0.0)
80
                switch_data['metadata']['lng'] = str(-30.0+idx*10.0)
81 1
            switches['switches'][switch.id] = switch_data
82
        return switches
83
84 1
    def _get_links_dict(self):
85
        """Return a dictionary with the known links."""
86 1
        return {'links': {l.id: l.as_dict() for l in
87
                          self.links.values()}}
88 1
89
    def _get_topology_dict(self):
90 1
        """Return a dictionary with the known topology."""
91 1
        return {'topology': {**self._get_switches_dict(),
92 1
                             **self._get_links_dict()}}
93 1
94
    def _get_topology(self):
95 1
        """Return an object representing the topology."""
96
        return Topology(self.controller.switches, self.links)
97 1
98 1
    def _get_link_from_interface(self, interface):
99 1
        """Return the link of the interface, or None if it does not exist."""
100 1
        for link in self.links.values():
101 1
            if interface in (link.endpoint_a, link.endpoint_b):
102 1
                return link
103 1
        return None
104
105 1
    def _restore_link(self, link_id):
106
        """Restore link's administrative state from storehouse."""
107
        try:
108
            state = self.links_state[link_id]
109
        except KeyError:
110
            error = (f'The link {link_id} has no stored '
111 1
                     'administrative state to be restored.')
112 1
            raise RestoreError(error)
113 1
114
        try:
115 1
            link = self.links[link_id]
116 1
            if state['enabled']:
117
                link.enable()
118 1
            else:
119 1
                link.disable()
120
        except KeyError:
121 1
            error = ('Error restoring link status.'
122
                     f'The link {link_id} does not exist.')
123 1
            raise RestoreError(error)
124 1
        log.info(f'The state of link {link.id} has been restored.')
125 1
        self.notify_topology_update()
126
        self.update_instance_metadata(link)
127 1
128
    def _restore_switch(self, switch_id):
129
        """Restore switch's administrative state from storehouse."""
130 1
        try:
131 1
            state = self.switches_state[switch_id]
132 1
        except KeyError:
133 1
            error = (f'The switch {switch_id} has no stored'
134
                     ' administrative state to be restored.')
135 1
            raise RestoreError(error)
136
137
        try:
138
            switch = self.controller.switches[switch_id]
139
        except KeyError:
140
            # Maybe we should remove the switch from switches_state here
141 1
            error = ('Error while restoring switches status. The '
142 1
                     f'switch {switch_id} does not exist.')
143 1
            raise RestoreError(error)
144 1
145 1
        if state:
146 1
            switch.enable()
147 1
        else:
148 1
            switch.disable()
149 1
150
        log.debug('Waiting to restore administrative state of switch '
151 1
                  f'{switch_id} interfaces.')
152 1
        i = 0
153 1
        # wait to restore interfaces
154
        while not switch.interfaces and i < RESTORE_INTERFACE_ATTEMPTS:
155
            time.sleep(self.interface_restore)
156
            i += 1
157
        if not switch.interfaces:
158
            error = ('Error restoring administrative state of switch '
159 1
                     f'{switch_id} interfaces.')
160
            raise RestoreError(error)
161
162 1
        # restore interfaces
163
        for interface_id in switch.interfaces:
164 1
            iface_id = ":".join([switch_id, str(interface_id)])
165 1
            # restore only the administrative state of saved interfaces
166 1
            if iface_id not in self.interfaces_state:
167 1
                error = ("The stored topology is different from the current "
168
                         f"topology. The interface {iface_id} hasn't been "
169 1
                         "stored.")
170
                log.info(error)
171 1
                continue
172 1
            state = self.interfaces_state[iface_id]
173
            iface_number = int(interface_id)
174 1
            iface_status, lldp_status = state
175 1
            try:
176 1
                interface = switch.interfaces[iface_number]
177 1
            except KeyError:
178
                log.error('Error restoring interface status: '
179
                          '%s does not exist.', iface_id)
180
                continue
181 1
182 1
            if iface_status:
183 1
                interface.enable()
184
            else:
185 1
                interface.disable()
186
            interface.lldp = lldp_status
187
            self.update_instance_metadata(interface)
188
189
        log.info(f'The state of switch {switch_id} has been restored.')
190
191 1
    # pylint: disable=attribute-defined-outside-init
192
    def _load_network_status(self):
193 1
        """Load network status saved in storehouse."""
194
        try:
195
            status = self.storehouse.get_data()
196 1
        except FileNotFoundError as error:
197 1
            log.info(error)
198 1
            return
199 1
        if status:
200 1
            switches = status['network_status']['switches']
201 1
            self.links_state = status['network_status']['links']
202 1
203
            for switch_id, switch_att in switches.items():
204
                # get switches status
205 1
                self.switches_state[switch_id] = switch_att['enabled']
206
                iface = switch_att['interfaces']
207
                # get interface status
208
                for iface_id, iface_att in iface.items():
209
                    enabled_value = iface_att['enabled']
210 1
                    lldp_value = iface_att['lldp']
211
                    self.interfaces_state[iface_id] = (enabled_value,
212
                                                       lldp_value)
213 1
214 1
        else:
215 1
            error = 'There is no status saved to restore.'
216
            log.info(error)
217 1
218 1
    @rest('v3/')
219 1
    def get_topology(self):
220 1
        """Return the latest known topology.
221
222 1
        This topology is updated when there are network events.
223
        """
224
        return jsonify(self._get_topology_dict())
225 1
226 1
    def restore_network_status(self, obj):
227 1
        """Restore the network administrative status saved in storehouse."""
228
        try:
229 1
            if isinstance(obj, Switch):
230 1
                self._restore_switch(obj.id)
231 1
            elif isinstance(obj, Link):
232 1
                if obj.id not in self._verified_links:
233
                    self._verified_links.append(obj.id)
234 1
                    self._restore_link(obj.id)
235
        except RestoreError as exc:
236
            log.debug(exc)
237 1
238 1
    # Switch related methods
239
    @rest('v3/switches')
240 1
    def get_switches(self):
241 1
        """Return a json with all the switches in the topology."""
242
        return jsonify(self._get_switches_dict())
243 1
244
    @rest('v3/switches/<dpid>/enable', methods=['POST'])
245
    def enable_switch(self, dpid):
246 1
        """Administratively enable a switch in the topology."""
247 1
        try:
248 1
            self.controller.switches[dpid].enable()
249 1
        except KeyError:
250 1
            return jsonify("Switch not found"), 404
251
252 1
        log.info(f"Storing administrative state from switch {dpid}"
253 1
                 " to enabled.")
254 1
        self.save_status_on_storehouse()
255
        return jsonify("Operation successful"), 201
256 1
257
    @rest('v3/switches/<dpid>/disable', methods=['POST'])
258
    def disable_switch(self, dpid):
259 1
        """Administratively disable a switch in the topology."""
260 1
        try:
261 1
            self.controller.switches[dpid].disable()
262 1
        except KeyError:
263
            return jsonify("Switch not found"), 404
264 1
265 1
        log.info(f"Storing administrative state from switch {dpid}"
266 1
                 " to disabled.")
267
        self.save_status_on_storehouse()
268
        return jsonify("Operation successful"), 201
269 1
270
    @rest('v3/switches/<dpid>/metadata')
271
    def get_switch_metadata(self, dpid):
272
        """Get metadata from a switch."""
273
        try:
274
            return jsonify({"metadata":
275
                            self.controller.switches[dpid].metadata}), 200
276
        except KeyError:
277
            return jsonify("Switch not found"), 404
278
279
    @rest('v3/switches/<dpid>/metadata', methods=['POST'])
280 1
    def add_switch_metadata(self, dpid):
281 1
        """Add metadata to a switch."""
282 1
        metadata = request.get_json()
283
        try:
284 1
            switch = self.controller.switches[dpid]
285 1
        except KeyError:
286 1
            return jsonify("Switch not found"), 404
287 1
288 1
        switch.extend_metadata(metadata)
289 1
        self.notify_metadata_changes(switch, 'added')
290 1
        return jsonify("Operation successful"), 201
291 1
292
    @rest('v3/switches/<dpid>/metadata/<key>', methods=['DELETE'])
293 1
    def delete_switch_metadata(self, dpid, key):
294 1
        """Delete metadata from a switch."""
295
        try:
296 1
            switch = self.controller.switches[dpid]
297 1
        except KeyError:
298 1
            return jsonify("Switch not found"), 404
299 1
300
        switch.remove_metadata(key)
301 1
        self.notify_metadata_changes(switch, 'removed')
302 1
        return jsonify("Operation successful"), 200
303 1
304 1
    # Interface related methods
305 1
    @rest('v3/interfaces')
306 1
    def get_interfaces(self):
307 1
        """Return a json with all the interfaces in the topology."""
308
        interfaces = {}
309
        switches = self._get_switches_dict()
310 1
        for switch in switches['switches'].values():
311 1
            for interface_id, interface in switch['interfaces'].items():
312 1
                interfaces[interface_id] = interface
313
314 1
        return jsonify({'interfaces': interfaces})
315 1
316 1 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
317 1
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
318 1
    def enable_interface(self, interface_enable_id=None, dpid=None):
319 1
        """Administratively enable interfaces in the topology."""
320 1
        error_list = []  # List of interfaces that were not activated.
321 1
        msg_error = "Some interfaces couldn't be found and activated: "
322
        if dpid is None:
323 1
            dpid = ":".join(interface_enable_id.split(":")[:-1])
324 1
        try:
325
            switch = self.controller.switches[dpid]
326 1
        except KeyError as exc:
327 1
            return jsonify(f"Switch not found: {exc}"), 404
328 1
329 1
        if interface_enable_id:
330
            interface_number = int(interface_enable_id.split(":")[-1])
331 1
332 1
            try:
333 1
                switch.interfaces[interface_number].enable()
334 1
            except KeyError as exc:
335 1
                error_list.append(f"Switch {dpid} Interface {exc}")
336 1
        else:
337 1
            for interface in switch.interfaces.values():
338
                interface.enable()
339
        if not error_list:
340 1
            log.info(f"Storing administrative state for enabled interfaces.")
341
            self.save_status_on_storehouse()
342
            return jsonify("Operation successful"), 200
343 1
        return jsonify({msg_error:
344 1
                        error_list}), 409
345 1
346 1 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
347 1
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
348 1
    def disable_interface(self, interface_disable_id=None, dpid=None):
349
        """Administratively disable interfaces in the topology."""
350 1
        error_list = []  # List of interfaces that were not deactivated.
351 1
        msg_error = "Some interfaces couldn't be found and deactivated: "
352 1
        if dpid is None:
353 1
            dpid = ":".join(interface_disable_id.split(":")[:-1])
354
        try:
355 1
            switch = self.controller.switches[dpid]
356
        except KeyError as exc:
357 1
            return jsonify(f"Switch not found: {exc}"), 404
358
359
        if interface_disable_id:
360 1
            interface_number = int(interface_disable_id.split(":")[-1])
361
362 1
            try:
363 1
                switch.interfaces[interface_number].disable()
364 1
            except KeyError as exc:
365 1
                error_list.append(f"Switch {dpid} Interface {exc}")
366 1
        else:
367 1
            for interface in switch.interfaces.values():
368
                interface.disable()
369 1
        if not error_list:
370 1
            log.info(f"Storing administrative state for disabled interfaces.")
371 1
            self.save_status_on_storehouse()
372 1
            return jsonify("Operation successful"), 200
373
        return jsonify({msg_error:
374 1
                        error_list}), 409
375 1
376 1
    @rest('v3/interfaces/<interface_id>/metadata')
377
    def get_interface_metadata(self, interface_id):
378 1
        """Get metadata from an interface."""
379
        switch_id = ":".join(interface_id.split(":")[:-1])
380
        interface_number = int(interface_id.split(":")[-1])
381 1
        try:
382 1
            switch = self.controller.switches[switch_id]
383
        except KeyError:
384 1
            return jsonify("Switch not found"), 404
385 1
386 1
        try:
387 1
            interface = switch.interfaces[interface_number]
388
        except KeyError:
389 1
            return jsonify("Interface not found"), 404
390 1
391 1
        return jsonify({"metadata": interface.metadata}), 200
392 1
393
    @rest('v3/interfaces/<interface_id>/metadata', methods=['POST'])
394 1
    def add_interface_metadata(self, interface_id):
395 1
        """Add metadata to an interface."""
396
        metadata = request.get_json()
397 1
398 1
        switch_id = ":".join(interface_id.split(":")[:-1])
399
        interface_number = int(interface_id.split(":")[-1])
400
        try:
401 1
            switch = self.controller.switches[switch_id]
402
        except KeyError:
403
            return jsonify("Switch not found"), 404
404
405
        try:
406
            interface = switch.interfaces[interface_number]
407
        except KeyError:
408
            return jsonify("Interface not found"), 404
409 1
410
        interface.extend_metadata(metadata)
411
        self.notify_metadata_changes(interface, 'added')
412 1
        return jsonify("Operation successful"), 201
413 1
414 1
    @rest('v3/interfaces/<interface_id>/metadata/<key>', methods=['DELETE'])
415 1
    def delete_interface_metadata(self, interface_id, key):
416 1
        """Delete metadata from an interface."""
417 1
        switch_id = ":".join(interface_id.split(":")[:-1])
418
        interface_number = int(interface_id.split(":")[-1])
419 1
420
        try:
421
            switch = self.controller.switches[switch_id]
422 1
        except KeyError:
423 1
            return jsonify("Switch not found"), 404
424 1
425 1
        try:
426 1
            interface = switch.interfaces[interface_number]
427 1
        except KeyError:
428
            return jsonify("Interface not found"), 404
429 1
430
        if interface.remove_metadata(key) is False:
431
            return jsonify("Metadata not found"), 404
432 1
433 1
        self.notify_metadata_changes(interface, 'removed')
434 1
        return jsonify("Operation successful"), 200
435 1
436
    # Link related methods
437 1
    @rest('v3/links')
438
    def get_links(self):
439
        """Return a json with all the links in the topology.
440 1
441 1
        Links are connections between interfaces.
442 1
        """
443 1
        return jsonify(self._get_links_dict()), 200
444 1
445
    @rest('v3/links/<link_id>/enable', methods=['POST'])
446 1
    def enable_link(self, link_id):
447 1
        """Administratively enable a link in the topology."""
448 1
        try:
449
            self.links[link_id].enable()
450 1
        except KeyError:
451
            return jsonify("Link not found"), 404
452
        self.save_status_on_storehouse()
453 1
        return jsonify("Operation successful"), 201
454 1
455 1
    @rest('v3/links/<link_id>/disable', methods=['POST'])
456 1
    def disable_link(self, link_id):
457
        """Administratively disable a link in the topology."""
458 1
        try:
459 1
            self.links[link_id].disable()
460
        except KeyError:
461 1
            return jsonify("Link not found"), 404
462 1
        self.save_status_on_storehouse()
463
        return jsonify("Operation successful"), 201
464 1
465
    @rest('v3/links/<link_id>/metadata')
466
    def get_link_metadata(self, link_id):
467
        """Get metadata from a link."""
468
        try:
469
            return jsonify({"metadata": self.links[link_id].metadata}), 200
470
        except KeyError:
471 1
            return jsonify("Link not found"), 404
472 1
473 1
    @rest('v3/links/<link_id>/metadata', methods=['POST'])
474 1
    def add_link_metadata(self, link_id):
475 1
        """Add metadata to a link."""
476
        metadata = request.get_json()
477 1
        try:
478
            link = self.links[link_id]
479
        except KeyError:
480
            return jsonify("Link not found"), 404
481
482
        link.extend_metadata(metadata)
483
        self.notify_metadata_changes(link, 'added')
484 1
        return jsonify("Operation successful"), 201
485 1
486 1
    @rest('v3/links/<link_id>/metadata/<key>', methods=['DELETE'])
487 1
    def delete_link_metadata(self, link_id, key):
488 1
        """Delete metadata from a link."""
489
        try:
490 1
            link = self.links[link_id]
491
        except KeyError:
492
            return jsonify("Link not found"), 404
493
494
        if link.remove_metadata(key) is False:
495 1
            return jsonify("Metadata not found"), 404
496 1
497 1
        self.notify_metadata_changes(link, 'removed')
498 1
        return jsonify("Operation successful"), 200
499
500 1
    @listen_to('.*.switch.(new|reconnected)')
501
    def handle_new_switch(self, event):
502
        """Create a new Device on the Topology.
503 1
504
        Handle the event of a new created switch and update the topology with
505 1
        this new device.
506
        """
507
        switch = event.content['switch']
508
        switch.activate()
509
        log.debug('Switch %s added to the Topology.', switch.id)
510 1
        self.notify_topology_update()
511 1
        self.update_instance_metadata(switch)
512 1
        self.restore_network_status(switch)
513 1
514
    @listen_to('.*.connection.lost')
515 1
    def handle_connection_lost(self, event):
516
        """Remove a Device from the topology.
517
518 1
        Remove the disconnected Device and every link that has one of its
519
        interfaces.
520 1
        """
521
        switch = event.content['source'].switch
522
        if switch:
523
            switch.deactivate()
524
            log.debug('Switch %s removed from the Topology.', switch.id)
525
            self.notify_topology_update()
526 1
527 1
    def handle_interface_up(self, event):
528
        """Update the topology based on a Port Modify event.
529 1
530
        The event notifies that an interface was changed to 'up'.
531
        """
532 1
        interface = event.content['interface']
533 1
        interface.activate()
534 1
        self.notify_topology_update()
535 1
        self.update_instance_metadata(interface)
536 1
537 1
    @listen_to('.*.switch.interface.created')
538 1
    def handle_interface_created(self, event):
539
        """Update the topology based on a Port Create event."""
540 1
        self.handle_interface_up(event)
541
542 1
    def handle_interface_down(self, event):
543 1
        """Update the topology based on a Port Modify event.
544
545 1
        The event notifies that an interface was changed to 'down'.
546 1
        """
547
        interface = event.content['interface']
548
        interface.deactivate()
549 1
        self.handle_interface_link_down(event)
550 1
        self.notify_topology_update()
551
552 1
    @listen_to('.*.switch.interface.deleted')
553 1
    def handle_interface_deleted(self, event):
554 1
        """Update the topology based on a Port Delete event."""
555
        self.handle_interface_down(event)
556
557
    @listen_to('.*.switch.interface.link_up')
558 1
    def handle_interface_link_up(self, event):
559
        """Update the topology based on a Port Modify event.
560 1
561 1
        The event notifies that an interface's link was changed to 'up'.
562 1
        """
563
        interface = event.content['interface']
564 1
        self.handle_link_up(interface)
565 1
566 1
    @listen_to('kytos/maintenance.end_switch')
567
    def handle_switch_maintenance_end(self, event):
568 1
        """Handle the end of the maintenance of a switch."""
569
        switches = event.content['switches']
570
        for switch in switches:
571
            switch.enable()
572
            switch.activate()
573
            for interface in switch.interfaces.values():
574 1
                interface.enable()
575 1
                self.handle_link_up(interface)
576
577 1
    def handle_link_up(self, interface):
578
        """Notify a link is up."""
579
        link = self._get_link_from_interface(interface)
580 1
        if not link:
581 1
            return
582 1
        if link.endpoint_a == interface:
583 1
            other_interface = link.endpoint_b
584 1
        else:
585 1
            other_interface = link.endpoint_a
586 1
        interface.activate()
587 1
        if other_interface.is_active() is False:
588
            return
589 1
        if link.is_active() is False:
590
            link.update_metadata('last_status_change', time.time())
591 1
            link.activate()
592 1
593 1
            # As each run of this method uses a different thread,
594 1
            # there is no risk this sleep will lock the NApp.
595 1
            time.sleep(self.link_up_timer)
596 1
597
            last_status_change = link.get_metadata('last_status_change')
598 1
            now = time.time()
599
            if link.is_active() and \
600
                    now - last_status_change >= self.link_up_timer:
601 1
                self.notify_topology_update()
602 1
                self.update_instance_metadata(link)
603
                self.notify_link_status_change(link)
604 1
605 1
    @listen_to('.*.switch.interface.link_down')
606
    def handle_interface_link_down(self, event):
607
        """Update the topology based on a Port Modify event.
608
609
        The event notifies that an interface's link was changed to 'down'.
610 1
        """
611 1
        interface = event.content['interface']
612
        self.handle_link_down(interface)
613 1
614 1
    @listen_to('kytos/maintenance.start_switch')
615
    def handle_switch_maintenance_start(self, event):
616 1
        """Handle the start of the maintenance of a switch."""
617
        switches = event.content['switches']
618
        for switch in switches:
619
            switch.disable()
620
            switch.deactivate()
621
            for interface in switch.interfaces.values():
622
                interface.disable()
623
                if interface.is_active():
624
                    self.handle_link_down(interface)
625
626
    def handle_link_down(self, interface):
627
        """Notify a link is down."""
628
        link = self._get_link_from_interface(interface)
629
        if link and link.is_active():
630
            link.deactivate()
631
            link.update_metadata('last_status_change', time.time())
632
            self.notify_topology_update()
633
            self.notify_link_status_change(link)
634
635
    @listen_to('.*.interface.is.nni')
636 1
    def add_links(self, event):
637 1
        """Update the topology with links related to the NNI interfaces."""
638
        interface_a = event.content['interface_a']
639 1
        interface_b = event.content['interface_b']
640 1
641 1
        try:
642
            link = self._get_link_or_create(interface_a, interface_b)
643
        except KytosLinkCreationError as err:
644
            log.error(f'Error creating link: {err}.')
645
            return
646
647 1
        interface_a.update_link(link)
648 1
        interface_b.update_link(link)
649
650 1
        interface_a.nni = True
651
        interface_b.nni = True
652 1
653 1
        self.notify_topology_update()
654
        self.restore_network_status(link)
655 1
656
    # def add_host(self, event):
657 1
    #    """Update the topology with a new Host."""
658
659 1
    #    interface = event.content['port']
660 1
    #    mac = event.content['reachable_mac']
661 1
662
    #    host = Host(mac)
663
    #    link = self.topology.get_link(interface.id)
664 1
    #    if link is not None:
665 1
    #        return
666
667 1
    #    self.topology.add_link(interface.id, host.id)
668
    #    self.topology.add_device(host)
669 1
670 1
    #    if settings.DISPLAY_FULL_DUPLEX_LINKS:
671 1
    #        self.topology.add_link(host.id, interface.id)
672 1
673 1
    # pylint: disable=unused-argument
674 1
    @listen_to('.*.network_status.updated')
675
    def save_status_on_storehouse(self, event=None):
676
        """Save the network administrative status using storehouse."""
677
        status = self._get_switches_dict()
678
        status['id'] = 'network_status'
679 1
        if event:
680 1
            content = event.content
681
            log.info(f"Storing the administrative state of the"
682 1
                     f" {content['attribute']} attribute to"
683 1
                     f" {content['state']} in the interfaces"
684
                     f" {content['interface_ids']}")
685 1
        status.update(self._get_links_dict())
686
        self.storehouse.save_status(status)
687
688 1
    def notify_topology_update(self):
689 1
        """Send an event to notify about updates on the topology."""
690 1
        name = 'kytos/topology.updated'
691
        event = KytosEvent(name=name, content={'topology':
692 1
                                               self._get_topology()})
693
        self.controller.buffers.app.put(event)
694
695 1
    def notify_link_status_change(self, link):
696 1
        """Send an event to notify about a status change on a link."""
697 1
        name = 'kytos/topology.'
698 1
        if link.is_active():
699 1
            status = 'link_up'
700 1
        else:
701 1
            status = 'link_down'
702 1
        event = KytosEvent(name=name+status, content={'link': link})
703 1
        self.controller.buffers.app.put(event)
704 1
705 1
    def notify_metadata_changes(self, obj, action):
706 1
        """Send an event to notify about metadata changes."""
707 1
        if isinstance(obj, Switch):
708
            entity = 'switch'
709 1
            entities = 'switches'
710 1
        elif isinstance(obj, Interface):
711
            entity = 'interface'
712
            entities = 'interfaces'
713
        elif isinstance(obj, Link):
714
            entity = 'link'
715 1
            entities = 'links'
716 1
717
        name = f'kytos/topology.{entities}.metadata.{action}'
718 1
        event = KytosEvent(name=name, content={entity: obj,
0 ignored issues
show
introduced by
The variable entity does not seem to be defined for all execution paths.
Loading history...
719
                                               'metadata': obj.metadata})
720
        self.controller.buffers.app.put(event)
721
        log.debug(f'Metadata from {obj.id} was {action}.')
722
723
    @listen_to('.*.switch.port.created')
724
    def notify_port_created(self, original_event):
725
        """Notify when a port is created."""
726
        name = 'kytos/topology.port.created'
727 1
        event = KytosEvent(name=name, content=original_event.content)
728
        self.controller.buffers.app.put(event)
729 1
730 1
    @listen_to('kytos/topology.*.metadata.*')
731
    def save_metadata_on_store(self, event):
732 1
        """Send to storehouse the data updated."""
733 1
        name = 'kytos.storehouse.update'
734 1
        if 'switch' in event.content:
735
            store = self.store_items.get('switches')
736 1
            obj = event.content.get('switch')
737
            namespace = 'kytos.topology.switches.metadata'
738 1
        elif 'interface' in event.content:
739 1
            store = self.store_items.get('interfaces')
740
            obj = event.content.get('interface')
741
            namespace = 'kytos.topology.interfaces.metadata'
742
        elif 'link' in event.content:
743 1
            store = self.store_items.get('links')
744 1
            obj = event.content.get('link')
745 1
            namespace = 'kytos.topology.links.metadata'
746
747 1
        store.data[obj.id] = obj.metadata
0 ignored issues
show
introduced by
The variable store does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable obj does not seem to be defined for all execution paths.
Loading history...
748 1
        content = {'namespace': namespace,
0 ignored issues
show
introduced by
The variable namespace does not seem to be defined for all execution paths.
Loading history...
749 1
                   'box_id': store.box_id,
750
                   'data': store.data,
751 1
                   'callback': self.update_instance}
752 1
753 1
        event = KytosEvent(name=name, content=content)
754
        self.controller.buffers.app.put(event)
755 1
756
    @staticmethod
757
    def update_instance(event, _data, error):
758
        """Display in Kytos console if the data was updated."""
759
        entities = event.content.get('namespace', '').split('.')[-2]
760
        if error:
761
            log.error(f'Error trying to update storehouse {entities}.')
762
        else:
763
            log.debug(f'Storehouse update to entities: {entities}.')
764 1
765
    def verify_storehouse(self, entities):
766 1
        """Request a list of box saved by specific entity."""
767 1
        name = 'kytos.storehouse.list'
768 1
        content = {'namespace': f'kytos.topology.{entities}.metadata',
769 1
                   'callback': self.request_retrieve_entities}
770
        event = KytosEvent(name=name, content=content)
771 1
        self.controller.buffers.app.put(event)
772
        log.info(f'verify data in storehouse for {entities}.')
773
774
    def request_retrieve_entities(self, event, data, _error):
775 1
        """Create a box or retrieve an existent box from storehouse."""
776 1
        msg = ''
777 1
        content = {'namespace': event.content.get('namespace'),
778
                   'callback': self.load_from_store,
779
                   'data': {}}
780 1
781
        if not data:
782
            name = 'kytos.storehouse.create'
783
            msg = 'Create new box in storehouse'
784 1
        else:
785
            name = 'kytos.storehouse.retrieve'
786
            content['box_id'] = data[0]
787 1
            msg = 'Retrieve data from storehouse.'
788 1
789 1
        event = KytosEvent(name=name, content=content)
790 1
        self.controller.buffers.app.put(event)
791 1
        log.debug(msg)
792 1
793 1
    def load_from_store(self, event, box, error):
794 1
        """Save the data retrived from storehouse."""
795 1
        entities = event.content.get('namespace', '').split('.')[-2]
796 1
        if error:
797 1
            log.error('Error while get a box from storehouse.')
798 1
        else:
799 1
            self.store_items[entities] = box
800 1
            log.debug('Data updated')
801 1
802 1
    def update_instance_metadata(self, obj):
803
        """Update object instance with saved metadata."""
804 1
        metadata = None
805
        if isinstance(obj, Interface):
806
            all_metadata = self.store_items.get('interfaces', None)
807 1
            if all_metadata:
808 1
                metadata = all_metadata.data.get(obj.id)
809 1
        elif isinstance(obj, Switch):
810 1
            all_metadata = self.store_items.get('switches', None)
811 1
            if all_metadata:
812 1
                metadata = all_metadata.data.get(obj.id)
813 1
        elif isinstance(obj, Link):
814 1
            all_metadata = self.store_items.get('links', None)
815 1
            if all_metadata:
816 1
                metadata = all_metadata.data.get(obj.id)
817 1
818 1
        if metadata:
819 1
            obj.extend_metadata(metadata)
820 1
            log.debug(f'Metadata to {obj.id} was updated')
821 1
822 1
    @listen_to('kytos/maintenance.start_link')
823
    def handle_link_maintenance_start(self, event):
824
        """Deals with the start of links maintenance."""
825
        notify_links = []
826
        maintenance_links = event.content['links']
827
        for maintenance_link in maintenance_links:
828
            try:
829
                link = self.links[maintenance_link.id]
830
            except KeyError:
831
                continue
832
            notify_links.append(link)
833
        for link in notify_links:
834
            link.disable()
835
            link.deactivate()
836
            link.endpoint_a.deactivate()
837
            link.endpoint_b.deactivate()
838
            link.endpoint_a.disable()
839
            link.endpoint_b.disable()
840
            self.notify_link_status_change(link)
841
842
    @listen_to('kytos/maintenance.end_link')
843
    def handle_link_maintenance_end(self, event):
844
        """Deals with the end of links maintenance."""
845
        notify_links = []
846
        maintenance_links = event.content['links']
847
        for maintenance_link in maintenance_links:
848
            try:
849
                link = self.links[maintenance_link.id]
850
            except KeyError:
851
                continue
852
            notify_links.append(link)
853
        for link in notify_links:
854
            link.enable()
855
            link.activate()
856
            link.endpoint_a.activate()
857
            link.endpoint_b.activate()
858
            link.endpoint_a.enable()
859
            link.endpoint_b.enable()
860
            self.notify_link_status_change(link)
861