Test Failed
Branch fix/tox_py37_scrutinizer (c6e8bb)
by Vinicius
03:52
created

build.main.Main.notify_metadata_changes()   A

Complexity

Conditions 4

Size

Total Lines 23
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 18
nop 3
dl 0
loc 23
rs 9.5
c 0
b 0
f 0
1
"""Main module of kytos/topology Kytos Network Application.
2
3
Manage the network topology
4
"""
5
import time
6
from threading import Lock
7
8
from flask import jsonify, request
9
from werkzeug.exceptions import BadRequest, UnsupportedMediaType
10
11
from kytos.core import KytosEvent, KytosNApp, log, rest
12
from kytos.core.exceptions import KytosLinkCreationError
13
from kytos.core.helpers import listen_to
14
from kytos.core.interface import Interface
15
from kytos.core.link import Link
16
from kytos.core.switch import Switch
17
from napps.kytos.topology import settings
18
from napps.kytos.topology.exceptions import RestoreError
19
from napps.kytos.topology.models import Topology
20
from napps.kytos.topology.storehouse import StoreHouse
21
22
DEFAULT_LINK_UP_TIMER = 10
23
24
25
class Main(KytosNApp):  # pylint: disable=too-many-public-methods
26
    """Main class of kytos/topology NApp.
27
28
    This class is the entry point for this napp.
29
    """
30
31
    def setup(self):
32
        """Initialize the NApp's links list."""
33
        self.links = {}
34
        self.store_items = {}
35
        self.intf_available_tags = {}
36
        self.link_up_timer = getattr(settings, 'LINK_UP_TIMER',
37
                                     DEFAULT_LINK_UP_TIMER)
38
39
        self.verify_storehouse('switches')
40
        self.verify_storehouse('interfaces')
41
        self.verify_storehouse('links')
42
43
        self.storehouse = StoreHouse(self.controller)
44
45
        self._lock = Lock()
46
        self._links_lock = Lock()
47
48
    # pylint: disable=unused-argument,arguments-differ
49
    @listen_to('kytos/storehouse.loaded')
50
    def execute(self, event=None):
51
        """Execute once when the napp is running."""
52
        with self._lock:
53
            self._load_network_status()
54
55
    def shutdown(self):
56
        """Do nothing."""
57
        log.info('NApp kytos/topology shutting down.')
58
59
    @staticmethod
60
    def _get_metadata():
61
        """Return a JSON with metadata."""
62
        try:
63
            metadata = request.get_json()
64
            content_type = request.content_type
65
        except BadRequest as err:
66
            result = 'The request body is not a well-formed JSON.'
67
            raise BadRequest(result) from err
68
        if content_type is None:
69
            result = 'The request body is empty.'
70
            raise BadRequest(result)
71
        if metadata is None:
72
            if content_type != 'application/json':
73
                result = ('The content type must be application/json '
74
                          f'(received {content_type}).')
75
            else:
76
                result = 'Metadata is empty.'
77
            raise UnsupportedMediaType(result)
78
        return metadata
79
80
    def _get_link_or_create(self, endpoint_a, endpoint_b):
81
        """Get an existing link or create a new one.
82
83
        Returns:
84
            Tuple(Link, bool): Link and a boolean whether it has been created.
85
        """
86
        new_link = Link(endpoint_a, endpoint_b)
87
88
        for link in self.links.values():
89
            if new_link == link:
90
                return (link, False)
91
92
        self.links[new_link.id] = new_link
93
        return (new_link, True)
94
95
    def _get_switches_dict(self):
96
        """Return a dictionary with the known switches."""
97
        switches = {'switches': {}}
98
        for idx, switch in enumerate(self.controller.switches.values()):
99
            switch_data = switch.as_dict()
100
            if not all(key in switch_data['metadata']
101
                       for key in ('lat', 'lng')):
102
                # Switches are initialized somewhere in the ocean
103
                switch_data['metadata']['lat'] = str(0.0)
104
                switch_data['metadata']['lng'] = str(-30.0+idx*10.0)
105
            switches['switches'][switch.id] = switch_data
106
        return switches
107
108
    def _get_links_dict(self):
109
        """Return a dictionary with the known links."""
110
        return {'links': {link.id: link.as_dict() for link in
111
                          self.links.values()}}
112
113
    def _get_links_dict_with_tags(self):
114
        """Return a dict with the known links with stored available_tags."""
115
        links = {}
116
        for link in self.links.values():
117
            link_dict = link.as_dict()
118
            endpoint_a = link_dict["endpoint_a"]
119
            endpoint_b = link_dict["endpoint_b"]
120
            for endpoint in (endpoint_a, endpoint_b):
121
                if endpoint["id"] in self.intf_available_tags:
122
                    endpoint["available_tags"] = self.intf_available_tags[
123
                        endpoint["id"]
124
                    ]
125
            links[link.id] = link_dict
126
        return {"links": links}
127
128
    def _get_topology_dict(self):
129
        """Return a dictionary with the known topology."""
130
        return {'topology': {**self._get_switches_dict(),
131
                             **self._get_links_dict()}}
132
133
    def _get_topology(self):
134
        """Return an object representing the topology."""
135
        return Topology(self.controller.switches, self.links)
136
137
    def _get_link_from_interface(self, interface):
138
        """Return the link of the interface, or None if it does not exist."""
139
        for link in self.links.values():
140
            if interface in (link.endpoint_a, link.endpoint_b):
141
                return link
142
        return None
143
144
    @staticmethod
145
    def _load_intf_available_tags(interface, intf_dict):
146
        """Load interface available tags given its dict."""
147
        has_available_tags = "available_tags" in intf_dict
148
        if has_available_tags:
149
            interface.set_available_tags(intf_dict["available_tags"])
150
        return has_available_tags
151
152
    def _load_link(self, link_att):
153
        dpid_a = link_att['endpoint_a']['switch']
154
        dpid_b = link_att['endpoint_b']['switch']
155
        port_a = link_att['endpoint_a']['port_number']
156
        port_b = link_att['endpoint_b']['port_number']
157
        link_str = f'{dpid_a}:{port_a}-{dpid_b}:{port_b}'
158
        log.info(f'Loading link from storehouse {link_str}')
159
160
        try:
161
            switch_a = self.controller.switches[dpid_a]
162
            switch_b = self.controller.switches[dpid_b]
163
            interface_a = switch_a.interfaces[port_a]
164
            interface_b = switch_b.interfaces[port_b]
165
        except Exception as err:
166
            error = f'Fail to load endpoints for link {link_str}: {err}'
167
            raise RestoreError(error) from err
168
169
        with self._links_lock:
170
            link, _ = self._get_link_or_create(interface_a, interface_b)
171
172
        if link_att['enabled']:
173
            link.enable()
174
        else:
175
            link.disable()
176
177
        interface_a.update_link(link)
178
        interface_b.update_link(link)
179
        interface_a.nni = True
180
        interface_b.nni = True
181
        for interface, interface_dict in (
182
            (interface_a, link_att["endpoint_a"]),
183
            (interface_b, link_att["endpoint_b"]),
184
        ):
185
            if self._load_intf_available_tags(interface, interface_dict):
186
                log.info(
187
                    f"Loaded {len(interface.available_tags)}"
188
                    f" available tags for {interface.id}"
189
                )
190
        self.update_instance_metadata(link)
191
192
    def _load_switch(self, switch_id, switch_att):
193
        log.info(f'Loading switch from storehouse dpid={switch_id}')
194
        switch = self.controller.get_switch_or_create(switch_id)
195
        if switch_att['enabled']:
196
            switch.enable()
197
        else:
198
            switch.disable()
199
        switch.description['manufacturer'] = switch_att.get('manufacturer', '')
200
        switch.description['hardware'] = switch_att.get('hardware', '')
201
        switch.description['software'] = switch_att.get('software')
202
        switch.description['serial'] = switch_att.get('serial', '')
203
        switch.description['data_path'] = switch_att.get('data_path', '')
204
        self.update_instance_metadata(switch)
205
206
        for iface_id, iface_att in switch_att.get('interfaces', {}).items():
207
            log.info(f'Loading interface iface_id={iface_id}')
208
            interface = switch.update_or_create_interface(
209
                            port_no=iface_att['port_number'],
210
                            name=iface_att['name'],
211
                            address=iface_att.get('mac', None),
212
                            speed=iface_att.get('speed', None))
213
            if iface_att['enabled']:
214
                interface.enable()
215
            else:
216
                interface.disable()
217
            interface.lldp = iface_att['lldp']
218
            self.update_instance_metadata(interface)
219
            name = 'kytos/topology.port.created'
220
            event = KytosEvent(name=name, content={
221
                                              'switch': switch_id,
222
                                              'port': interface.port_number,
223
                                              'port_description': {
224
                                                  'alias': interface.name,
225
                                                  'mac': interface.address,
226
                                                  'state': interface.state
227
                                                  }
228
                                              })
229
            self.controller.buffers.app.put(event)
230
231
    # pylint: disable=attribute-defined-outside-init
232
    def _load_network_status(self):
233
        """Load network status saved in storehouse."""
234
        try:
235
            status = self.storehouse.get_data()
236
        except FileNotFoundError as error:
237
            log.error(f'Fail to load network status from storehouse: {error}')
238
            return
239
240
        if not status:
241
            log.info('There is no status saved to restore.')
242
            return
243
244
        switches = status['network_status']['switches']
245
        links = status['network_status']['links']
246
247
        failed_switches = {}
248
        log.debug(f"_load_network_status switches={switches}")
249
        for switch_id, switch_att in switches.items():
250
            try:
251
                self._load_switch(switch_id, switch_att)
252
            # pylint: disable=broad-except
253
            except Exception as err:
254
                failed_switches[switch_id] = err
255
                log.error(f'Error loading switch: {err}')
256
257
        failed_links = {}
258
        log.debug(f"_load_network_status links={links}")
259
        for link_id, link_att in links.items():
260
            try:
261
                self._load_link(link_att)
262
            # pylint: disable=broad-except
263
            except Exception as err:
264
                failed_links[link_id] = err
265
                log.error(f'Error loading link {link_id}: {err}')
266
267
        name = 'kytos/topology.topology_loaded'
268
        event = KytosEvent(
269
            name=name,
270
            content={
271
                'topology': self._get_topology(),
272
                'failed_switches': failed_switches,
273
                'failed_links': failed_links
274
            })
275
        self.controller.buffers.app.put(event)
276
277
    @rest('v3/')
278
    def get_topology(self):
279
        """Return the latest known topology.
280
281
        This topology is updated when there are network events.
282
        """
283
        return jsonify(self._get_topology_dict())
284
285
    # Switch related methods
286
    @rest('v3/switches')
287
    def get_switches(self):
288
        """Return a json with all the switches in the topology."""
289
        return jsonify(self._get_switches_dict())
290
291
    @rest('v3/switches/<dpid>/enable', methods=['POST'])
292
    def enable_switch(self, dpid):
293
        """Administratively enable a switch in the topology."""
294
        try:
295
            self.controller.switches[dpid].enable()
296
        except KeyError:
297
            return jsonify("Switch not found"), 404
298
299
        log.info(f"Storing administrative state from switch {dpid}"
300
                 " to enabled.")
301
        self.save_status_on_storehouse()
302
        self.notify_switch_enabled(dpid)
303
        self.notify_topology_update()
304
        return jsonify("Operation successful"), 201
305
306
    @rest('v3/switches/<dpid>/disable', methods=['POST'])
307
    def disable_switch(self, dpid):
308
        """Administratively disable a switch in the topology."""
309
        try:
310
            self.controller.switches[dpid].disable()
311
        except KeyError:
312
            return jsonify("Switch not found"), 404
313
314
        log.info(f"Storing administrative state from switch {dpid}"
315
                 " to disabled.")
316
        self.save_status_on_storehouse()
317
        self.notify_switch_disabled(dpid)
318
        self.notify_topology_update()
319
        return jsonify("Operation successful"), 201
320
321
    @rest('v3/switches/<dpid>/metadata')
322
    def get_switch_metadata(self, dpid):
323
        """Get metadata from a switch."""
324
        try:
325
            return jsonify({"metadata":
326
                            self.controller.switches[dpid].metadata}), 200
327
        except KeyError:
328
            return jsonify("Switch not found"), 404
329
330
    @rest('v3/switches/<dpid>/metadata', methods=['POST'])
331
    def add_switch_metadata(self, dpid):
332
        """Add metadata to a switch."""
333
        metadata = self._get_metadata()
334
335
        try:
336
            switch = self.controller.switches[dpid]
337
        except KeyError:
338
            return jsonify("Switch not found"), 404
339
340
        switch.extend_metadata(metadata)
341
        self.notify_metadata_changes(switch, 'added')
342
        return jsonify("Operation successful"), 201
343
344
    @rest('v3/switches/<dpid>/metadata/<key>', methods=['DELETE'])
345
    def delete_switch_metadata(self, dpid, key):
346
        """Delete metadata from a switch."""
347
        try:
348
            switch = self.controller.switches[dpid]
349
        except KeyError:
350
            return jsonify("Switch not found"), 404
351
352
        switch.remove_metadata(key)
353
        self.notify_metadata_changes(switch, 'removed')
354
        return jsonify("Operation successful"), 200
355
356
    # Interface related methods
357
    @rest('v3/interfaces')
358
    def get_interfaces(self):
359
        """Return a json with all the interfaces in the topology."""
360
        interfaces = {}
361
        switches = self._get_switches_dict()
362
        for switch in switches['switches'].values():
363
            for interface_id, interface in switch['interfaces'].items():
364
                interfaces[interface_id] = interface
365
366
        return jsonify({'interfaces': interfaces})
367
368 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
369
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
370
    def enable_interface(self, interface_enable_id=None, dpid=None):
371
        """Administratively enable interfaces in the topology."""
372
        if dpid is None:
373
            dpid = ":".join(interface_enable_id.split(":")[:-1])
374
        try:
375
            switch = self.controller.switches[dpid]
376
        except KeyError as exc:
377
            return jsonify(f"Switch not found: {exc}"), 404
378
379
        if interface_enable_id:
380
            interface_number = int(interface_enable_id.split(":")[-1])
381
382
            try:
383
                switch.interfaces[interface_number].enable()
384
            except KeyError:
385
                msg = f"Switch {dpid} interface {interface_number} not found"
386
                return jsonify(msg), 404
387
        else:
388
            for interface in switch.interfaces.values():
389
                interface.enable()
390
        log.info("Storing administrative state for enabled interfaces.")
391
        self.save_status_on_storehouse()
392
        self.notify_topology_update()
393
        return jsonify("Operation successful"), 200
394
395 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
396
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
397
    def disable_interface(self, interface_disable_id=None, dpid=None):
398
        """Administratively disable interfaces in the topology."""
399
        if dpid is None:
400
            dpid = ":".join(interface_disable_id.split(":")[:-1])
401
        try:
402
            switch = self.controller.switches[dpid]
403
        except KeyError as exc:
404
            return jsonify(f"Switch not found: {exc}"), 404
405
406
        if interface_disable_id:
407
            interface_number = int(interface_disable_id.split(":")[-1])
408
409
            try:
410
                switch.interfaces[interface_number].disable()
411
            except KeyError:
412
                msg = f"Switch {dpid} interface {interface_number} not found"
413
                return jsonify(msg), 404
414
        else:
415
            for interface in switch.interfaces.values():
416
                interface.disable()
417
        log.info("Storing administrative state for disabled interfaces.")
418
        self.save_status_on_storehouse()
419
        self.notify_topology_update()
420
        return jsonify("Operation successful"), 200
421
422
    @rest('v3/interfaces/<interface_id>/metadata')
423
    def get_interface_metadata(self, interface_id):
424
        """Get metadata from an interface."""
425
        switch_id = ":".join(interface_id.split(":")[:-1])
426
        interface_number = int(interface_id.split(":")[-1])
427
        try:
428
            switch = self.controller.switches[switch_id]
429
        except KeyError:
430
            return jsonify("Switch not found"), 404
431
432
        try:
433
            interface = switch.interfaces[interface_number]
434
        except KeyError:
435
            return jsonify("Interface not found"), 404
436
437
        return jsonify({"metadata": interface.metadata}), 200
438
439 View Code Duplication
    @rest('v3/interfaces/<interface_id>/metadata', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
440
    def add_interface_metadata(self, interface_id):
441
        """Add metadata to an interface."""
442
        metadata = self._get_metadata()
443
        switch_id = ":".join(interface_id.split(":")[:-1])
444
        interface_number = int(interface_id.split(":")[-1])
445
        try:
446
            switch = self.controller.switches[switch_id]
447
        except KeyError:
448
            return jsonify("Switch not found"), 404
449
450
        try:
451
            interface = switch.interfaces[interface_number]
452
        except KeyError:
453
            return jsonify("Interface not found"), 404
454
455
        interface.extend_metadata(metadata)
456
        self.notify_metadata_changes(interface, 'added')
457
        return jsonify("Operation successful"), 201
458
459 View Code Duplication
    @rest('v3/interfaces/<interface_id>/metadata/<key>', methods=['DELETE'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
    def delete_interface_metadata(self, interface_id, key):
461
        """Delete metadata from an interface."""
462
        switch_id = ":".join(interface_id.split(":")[:-1])
463
        interface_number = int(interface_id.split(":")[-1])
464
465
        try:
466
            switch = self.controller.switches[switch_id]
467
        except KeyError:
468
            return jsonify("Switch not found"), 404
469
470
        try:
471
            interface = switch.interfaces[interface_number]
472
        except KeyError:
473
            return jsonify("Interface not found"), 404
474
475
        if interface.remove_metadata(key) is False:
476
            return jsonify("Metadata not found"), 404
477
478
        self.notify_metadata_changes(interface, 'removed')
479
        return jsonify("Operation successful"), 200
480
481
    # Link related methods
482
    @rest('v3/links')
483
    def get_links(self):
484
        """Return a json with all the links in the topology.
485
486
        Links are connections between interfaces.
487
        """
488
        return jsonify(self._get_links_dict()), 200
489
490
    @rest('v3/links/<link_id>/enable', methods=['POST'])
491
    def enable_link(self, link_id):
492
        """Administratively enable a link in the topology."""
493
        try:
494
            with self._links_lock:
495
                self.links[link_id].enable()
496
        except KeyError:
497
            return jsonify("Link not found"), 404
498
        self.save_status_on_storehouse()
499
        self.notify_link_status_change(
500
            self.links[link_id],
501
            reason='link enabled'
502
        )
503
        self.notify_topology_update()
504
        return jsonify("Operation successful"), 201
505
506
    @rest('v3/links/<link_id>/disable', methods=['POST'])
507
    def disable_link(self, link_id):
508
        """Administratively disable a link in the topology."""
509
        try:
510
            with self._links_lock:
511
                self.links[link_id].disable()
512
        except KeyError:
513
            return jsonify("Link not found"), 404
514
        self.save_status_on_storehouse()
515
        self.notify_link_status_change(
516
            self.links[link_id],
517
            reason='link disabled'
518
        )
519
        self.notify_topology_update()
520
        return jsonify("Operation successful"), 201
521
522
    @rest('v3/links/<link_id>/metadata')
523
    def get_link_metadata(self, link_id):
524
        """Get metadata from a link."""
525
        try:
526
            return jsonify({"metadata": self.links[link_id].metadata}), 200
527
        except KeyError:
528
            return jsonify("Link not found"), 404
529
530
    @rest('v3/links/<link_id>/metadata', methods=['POST'])
531
    def add_link_metadata(self, link_id):
532
        """Add metadata to a link."""
533
        metadata = self._get_metadata()
534
        try:
535
            link = self.links[link_id]
536
        except KeyError:
537
            return jsonify("Link not found"), 404
538
539
        link.extend_metadata(metadata)
540
        self.notify_metadata_changes(link, 'added')
541
        return jsonify("Operation successful"), 201
542
543
    @rest('v3/links/<link_id>/metadata/<key>', methods=['DELETE'])
544
    def delete_link_metadata(self, link_id, key):
545
        """Delete metadata from a link."""
546
        try:
547
            link = self.links[link_id]
548
        except KeyError:
549
            return jsonify("Link not found"), 404
550
551
        if link.remove_metadata(key) is False:
552
            return jsonify("Metadata not found"), 404
553
554
        self.notify_metadata_changes(link, 'removed')
555
        return jsonify("Operation successful"), 200
556
557
    @listen_to("kytos/.*.link_available_tags")
558
    def on_link_available_tags(self, event):
559
        """Handle on_link_available_tags."""
560
        with self._links_lock:
561
            self.handle_on_link_available_tags(event.content.get("link"))
562
563
    def handle_on_link_available_tags(self, link):
564
        """Handle on_link_available_tags."""
565
        if link.id not in self.links:
566
            return
567
        endpoint_a = self.links[link.id].endpoint_a
568
        endpoint_b = self.links[link.id].endpoint_b
569
        tags_a = [tag.value for tag in endpoint_a.available_tags]
570
        tags_b = [tag.value for tag in endpoint_b.available_tags]
571
        self.intf_available_tags[endpoint_a.id] = tags_a
572
        self.intf_available_tags[endpoint_b.id] = tags_b
573
574
        self.save_status_on_storehouse()
575
576
    @listen_to('.*.switch.(new|reconnected)')
577
    def on_new_switch(self, event):
578
        """Create a new Device on the Topology.
579
580
        Handle the event of a new created switch and update the topology with
581
        this new device. Also notify if the switch is enabled.
582
        """
583
        self.handle_new_switch(event)
584
585
    def handle_new_switch(self, event):
586
        """Create a new Device on the Topology."""
587
        switch = event.content['switch']
588
        switch.activate()
589
        log.debug('Switch %s added to the Topology.', switch.id)
590
        self.notify_topology_update()
591
        self.update_instance_metadata(switch)
592
        if switch.is_enabled():
593
            self.notify_switch_enabled(switch.id)
594
595
    @listen_to('.*.connection.lost')
596
    def on_connection_lost(self, event):
597
        """Remove a Device from the topology.
598
599
        Remove the disconnected Device and every link that has one of its
600
        interfaces.
601
        """
602
        self.handle_connection_lost(event)
603
604
    def handle_connection_lost(self, event):
605
        """Remove a Device from the topology."""
606
        switch = event.content['source'].switch
607
        if switch:
608
            switch.deactivate()
609
            log.debug('Switch %s removed from the Topology.', switch.id)
610
            self.notify_topology_update()
611
612
    def handle_interface_created(self, event):
613
        """Update the topology based on a Port Created event.
614
615
        It's handled as a link_up in case a switch send a
616
        created event again and it can be belong to a link.
617
        """
618
        interface = event.content['interface']
619
620
        with self._links_lock:
621
            if interface.id in self.intf_available_tags:
622
                available_tags = self.intf_available_tags[interface.id]
623
                interface_dict = {"available_tags": available_tags}
624
                if self._load_intf_available_tags(interface, interface_dict):
625
                    log.info(
626
                        f"Loaded {len(interface.available_tags)}"
627
                        f" available tags for {interface.id}"
628
                    )
629
630
        self.update_instance_metadata(interface)
631
        self.handle_interface_link_up(interface)
632
633
    @listen_to('.*.switch.interface.created')
634
    def on_interface_created(self, event):
635
        """Update the topology based on a Port Create event.
636
637
        It's handled as a link_up in case a switch send a
638
        created event again and it can be belong to a link.
639
        """
640
        self.handle_interface_created(event)
641
642
    def handle_interface_down(self, event):
643
        """Update the topology based on a Port Modify event.
644
645
        The event notifies that an interface was changed to 'down'.
646
        """
647
        interface = event.content['interface']
648
        interface.deactivate()
649
        self.handle_interface_link_down(interface)
650
651
    @listen_to('.*.switch.interface.deleted')
652
    def on_interface_deleted(self, event):
653
        """Update the topology based on a Port Delete event."""
654
        self.handle_interface_deleted(event)
655
656
    def handle_interface_deleted(self, event):
657
        """Update the topology based on a Port Delete event."""
658
        self.handle_interface_down(event)
659
660
    @listen_to('.*.switch.interface.link_up')
661
    def on_interface_link_up(self, event):
662
        """Update the topology based on a Port Modify event.
663
664
        The event notifies that an interface's link was changed to 'up'.
665
        """
666
        interface = event.content['interface']
667
        self.handle_interface_link_up(interface)
668
669
    def handle_interface_link_up(self, interface):
670
        """Update the topology based on a Port Modify event."""
671
        self.handle_link_up(interface)
672
673
    @listen_to('kytos/maintenance.end_switch')
674
    def on_switch_maintenance_end(self, event):
675
        """Handle the end of the maintenance of a switch."""
676
        self.handle_switch_maintenance_end(event)
677
678
    def handle_switch_maintenance_end(self, event):
679
        """Handle the end of the maintenance of a switch."""
680
        switches = event.content['switches']
681
        for switch in switches:
682
            switch.enable()
683
            switch.activate()
684
            for interface in switch.interfaces.values():
685
                interface.enable()
686
                self.handle_link_up(interface)
687
688
    def handle_link_up(self, interface):
689
        """Notify a link is up."""
690
        interface.activate()
691
        self.notify_topology_update()
692
        with self._links_lock:
693
            link = self._get_link_from_interface(interface)
694
        if not link:
695
            return
696
        if link.endpoint_a == interface:
697
            other_interface = link.endpoint_b
698
        else:
699
            other_interface = link.endpoint_a
700
        if other_interface.is_active() is False:
701
            return
702
        if link.is_active() is False:
703
            link.update_metadata('last_status_change', time.time())
704
            link.activate()
705
706
            # As each run of this method uses a different thread,
707
            # there is no risk this sleep will lock the NApp.
708
            time.sleep(self.link_up_timer)
709
710
            last_status_change = link.get_metadata('last_status_change')
711
            now = time.time()
712
            if link.is_active() and \
713
                    now - last_status_change >= self.link_up_timer:
714
                self.update_instance_metadata(link)
715
                self.notify_topology_update()
716
                self.notify_link_status_change(link, reason='link up')
717
        else:
718
            link.update_metadata('last_status_change', time.time())
719
            self.update_instance_metadata(link)
720
            self.notify_topology_update()
721
            self.notify_link_status_change(link, reason='link up')
722
        link.update_metadata('last_status_is_active', True)
723
724
    @listen_to('.*.switch.interface.link_down')
725
    def on_interface_link_down(self, event):
726
        """Update the topology based on a Port Modify event.
727
728
        The event notifies that an interface's link was changed to 'down'.
729
        """
730
        interface = event.content['interface']
731
        self.handle_interface_link_down(interface)
732
733
    def handle_interface_link_down(self, interface):
734
        """Update the topology based on an interface."""
735
        self.handle_link_down(interface)
736
737
    @listen_to('kytos/maintenance.start_switch')
738
    def on_switch_maintenance_start(self, event):
739
        """Handle the start of the maintenance of a switch."""
740
        self.handle_switch_maintenance_start(event)
741
742
    def handle_switch_maintenance_start(self, event):
743
        """Handle the start of the maintenance of a switch."""
744
        switches = event.content['switches']
745
        for switch in switches:
746
            switch.disable()
747
            switch.deactivate()
748
            for interface in switch.interfaces.values():
749
                interface.disable()
750
                if interface.is_active():
751
                    self.handle_link_down(interface)
752
753
    def handle_link_down(self, interface):
754
        """Notify a link is down."""
755
        link = self._get_link_from_interface(interface)
756
        if link and link.is_active():
757
            link.deactivate()
758
            link.update_metadata('last_status_change', time.time())
759
            link.update_metadata('last_status_is_active', False)
760
            self.notify_link_status_change(link, reason='link down')
761
        if link and not link.is_active():
762
            with self._links_lock:
763
                last_status = link.get_metadata('last_status_is_active')
764
                if last_status:
765
                    link.update_metadata('last_status_is_active', False)
766
                    link.update_metadata('last_status_change', time.time())
767
                    self.notify_link_status_change(link, reason='link down')
768
        interface.deactivate()
769
        self.notify_topology_update()
770
771
    @listen_to('.*.interface.is.nni')
772
    def on_add_links(self, event):
773
        """Update the topology with links related to the NNI interfaces."""
774
        self.add_links(event)
775
776
    def add_links(self, event):
777
        """Update the topology with links related to the NNI interfaces."""
778
        interface_a = event.content['interface_a']
779
        interface_b = event.content['interface_b']
780
781
        try:
782
            with self._links_lock:
783
                link, created = self._get_link_or_create(interface_a,
784
                                                         interface_b)
785
                interface_a.update_link(link)
786
                interface_b.update_link(link)
787
788
                link.endpoint_a = interface_a
789
                link.endpoint_b = interface_b
790
791
                interface_a.nni = True
792
                interface_b.nni = True
793
794
        except KytosLinkCreationError as err:
795
            log.error(f'Error creating link: {err}.')
796
            return
797
798
        if created:
799
            self.update_instance_metadata(link)
800
            link.update_metadata('last_status_is_active', True)
801
            self.notify_link_status_change(link, reason='link up')
802
            self.notify_topology_update()
803
804
    # def add_host(self, event):
805
    #    """Update the topology with a new Host."""
806
807
    #    interface = event.content['port']
808
    #    mac = event.content['reachable_mac']
809
810
    #    host = Host(mac)
811
    #    link = self.topology.get_link(interface.id)
812
    #    if link is not None:
813
    #        return
814
815
    #    self.topology.add_link(interface.id, host.id)
816
    #    self.topology.add_device(host)
817
818
    #    if settings.DISPLAY_FULL_DUPLEX_LINKS:
819
    #        self.topology.add_link(host.id, interface.id)
820
821
    @listen_to('.*.network_status.updated')
822
    def on_network_status_updated(self, event):
823
        """Handle *.network_status.updated events, specially from of_lldp."""
824
        content = event.content
825
        log.info(f"Storing the administrative state of the"
826
                 f" {content['attribute']} attribute to"
827
                 f" {content['state']} in the interfaces"
828
                 f" {content['interface_ids']}")
829
        self.handle_network_status_updated()
830
831
    def handle_network_status_updated(self) -> None:
832
        """Handle *.network_status.updated events, specially from of_lldp."""
833
        self.save_status_on_storehouse()
834
835
    def save_status_on_storehouse(self):
836
        """Save the network administrative status using storehouse."""
837
        with self._lock:
838
            status = self._get_switches_dict()
839
            status['id'] = 'network_status'
840
            status.update(self._get_links_dict_with_tags())
841
            self.storehouse.save_status(status)
842
843
    def notify_switch_enabled(self, dpid):
844
        """Send an event to notify that a switch is enabled."""
845
        name = 'kytos/topology.switch.enabled'
846
        event = KytosEvent(name=name, content={'dpid': dpid})
847
        self.controller.buffers.app.put(event)
848
849
    def notify_switch_disabled(self, dpid):
850
        """Send an event to notify that a switch is disabled."""
851
        name = 'kytos/topology.switch.disabled'
852
        event = KytosEvent(name=name, content={'dpid': dpid})
853
        self.controller.buffers.app.put(event)
854
855
    def notify_topology_update(self):
856
        """Send an event to notify about updates on the topology."""
857
        name = 'kytos/topology.updated'
858
        event = KytosEvent(name=name, content={'topology':
859
                                               self._get_topology()})
860
        self.controller.buffers.app.put(event)
861
862
    def notify_link_status_change(self, link, reason='not given'):
863
        """Send an event to notify about a status change on a link."""
864
        name = 'kytos/topology.'
865
        if link.is_active() and link.is_enabled():
866
            status = 'link_up'
867
        else:
868
            status = 'link_down'
869
        event = KytosEvent(
870
            name=name+status,
871
            content={
872
                'link': link,
873
                'reason': reason
874
            })
875
        self.controller.buffers.app.put(event)
876
877
    def notify_metadata_changes(self, obj, action):
878
        """Send an event to notify about metadata changes."""
879
        if isinstance(obj, Switch):
880
            entity = 'switch'
881
            entities = 'switches'
882
        elif isinstance(obj, Interface):
883
            entity = 'interface'
884
            entities = 'interfaces'
885
        elif isinstance(obj, Link):
886
            entity = 'link'
887
            entities = 'links'
888
        else:
889
            raise ValueError(
890
                'Invalid object, supported: Switch, Interface, Link'
891
            )
892
893
        self.save_metadata_on_store(obj, entities)
894
895
        name = f'kytos/topology.{entities}.metadata.{action}'
896
        event = KytosEvent(name=name, content={entity: obj,
897
                                               'metadata': obj.metadata})
898
        self.controller.buffers.app.put(event)
899
        log.debug(f'Metadata from {obj.id} was {action}.')
900
901
    @listen_to('.*.switch.port.created')
902
    def on_notify_port_created(self, event):
903
        """Notify when a port is created."""
904
        self.notify_port_created(event)
905
906
    def notify_port_created(self, event):
907
        """Notify when a port is created."""
908
        name = 'kytos/topology.port.created'
909
        event = KytosEvent(name=name, content=event.content)
910
        self.controller.buffers.app.put(event)
911
912
    def save_metadata_on_store(self, obj, entities):
913
        """Send to storehouse the data updated."""
914
        name = 'kytos.storehouse.update'
915
        store = self.store_items.get(entities)
916
        namespace = f'kytos.topology.{entities}.metadata'
917
918
        store.data[obj.id] = obj.metadata
919
        content = {'namespace': namespace,
920
                   'box_id': store.box_id,
921
                   'data': store.data,
922
                   'callback': self.update_instance}
923
924
        event = KytosEvent(name=name, content=content)
925
        self.controller.buffers.app.put(event)
926
927
    @staticmethod
928
    def update_instance(event, _data, error):
929
        """Display in Kytos console if the data was updated."""
930
        entities = event.content.get('namespace', '').split('.')[-2]
931
        if error:
932
            log.error(f'Error trying to update storehouse {entities}.')
933
        else:
934
            log.debug(f'Storehouse update to entities: {entities}.')
935
936
    def verify_storehouse(self, entities):
937
        """Request a list of box saved by specific entity."""
938
        name = 'kytos.storehouse.list'
939
        content = {'namespace': f'kytos.topology.{entities}.metadata',
940
                   'callback': self.request_retrieve_entities}
941
        event = KytosEvent(name=name, content=content)
942
        self.controller.buffers.app.put(event)
943
        log.info(f'verify data in storehouse for {entities}.')
944
945
    def request_retrieve_entities(self, event, data, _error):
946
        """Create a box or retrieve an existent box from storehouse."""
947
        msg = ''
948
        content = {'namespace': event.content.get('namespace'),
949
                   'callback': self.load_from_store,
950
                   'data': {}}
951
952
        if not data:
953
            name = 'kytos.storehouse.create'
954
            msg = 'Create new box in storehouse'
955
        else:
956
            name = 'kytos.storehouse.retrieve'
957
            content['box_id'] = data[0]
958
            msg = 'Retrieve data from storehouse.'
959
960
        event = KytosEvent(name=name, content=content)
961
        self.controller.buffers.app.put(event)
962
        log.debug(msg)
963
964
    def load_from_store(self, event, box, error):
965
        """Save the data retrived from storehouse."""
966
        entities = event.content.get('namespace', '').split('.')[-2]
967
        if error:
968
            log.error('Error while get a box from storehouse.')
969
        else:
970
            self.store_items[entities] = box
971
            log.debug('Data updated')
972
973
    def update_instance_metadata(self, obj):
974
        """Update object instance with saved metadata."""
975
        metadata = None
976
        if isinstance(obj, Interface):
977
            all_metadata = self.store_items.get('interfaces', None)
978
            if all_metadata:
979
                metadata = all_metadata.data.get(obj.id)
980
        elif isinstance(obj, Switch):
981
            all_metadata = self.store_items.get('switches', None)
982
            if all_metadata:
983
                metadata = all_metadata.data.get(obj.id)
984
        elif isinstance(obj, Link):
985
            all_metadata = self.store_items.get('links', None)
986
            if all_metadata:
987
                metadata = all_metadata.data.get(obj.id)
988
        if metadata:
989
            obj.extend_metadata(metadata)
990
            log.debug(f'Metadata to {obj.id} was updated')
991
992
    @listen_to('kytos/maintenance.start_link')
993
    def on_link_maintenance_start(self, event):
994
        """Deals with the start of links maintenance."""
995
        with self._links_lock:
996
            self.handle_link_maintenance_start(event)
997
998
    def handle_link_maintenance_start(self, event):
999
        """Deals with the start of links maintenance."""
1000
        notify_links = []
1001
        maintenance_links = event.content['links']
1002
        for maintenance_link in maintenance_links:
1003
            try:
1004
                link = self.links[maintenance_link.id]
1005
            except KeyError:
1006
                continue
1007
            notify_links.append(link)
1008
        for link in notify_links:
1009
            link.disable()
1010
            link.deactivate()
1011
            link.endpoint_a.deactivate()
1012
            link.endpoint_b.deactivate()
1013
            link.endpoint_a.disable()
1014
            link.endpoint_b.disable()
1015
            self.notify_link_status_change(link, reason='maintenance')
1016
1017
    @listen_to('kytos/maintenance.end_link')
1018
    def on_link_maintenance_end(self, event):
1019
        """Deals with the end of links maintenance."""
1020
        with self._links_lock:
1021
            self.handle_link_maintenance_end(event)
1022
1023
    def handle_link_maintenance_end(self, event):
1024
        """Deals with the end of links maintenance."""
1025
        notify_links = []
1026
        maintenance_links = event.content['links']
1027
        for maintenance_link in maintenance_links:
1028
            try:
1029
                link = self.links[maintenance_link.id]
1030
            except KeyError:
1031
                continue
1032
            notify_links.append(link)
1033
        for link in notify_links:
1034
            link.enable()
1035
            link.activate()
1036
            link.endpoint_a.activate()
1037
            link.endpoint_b.activate()
1038
            link.endpoint_a.enable()
1039
            link.endpoint_b.enable()
1040
            self.notify_link_status_change(link, reason='maintenance')
1041