Passed
Push — master ( 5fbef4...646846 )
by Humberto
01:33 queued 11s
created

build.main.Main.handle_interface_link_up()   B

Complexity

Conditions 7

Size

Total Lines 32
Code Lines 23

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 23
nop 2
dl 0
loc 32
ccs 9
cts 9
cp 1
crap 7
rs 7.9279
c 0
b 0
f 0
1
"""Main module of kytos/topology Kytos Network Application.
2
3
Manage the network topology
4
"""
5 1
import time
6
7 1
from flask import jsonify, request
8 1
9 1
from kytos.core import KytosEvent, KytosNApp, log, rest
10 1
from kytos.core.helpers import listen_to
11 1
from kytos.core.interface import Interface
12
from kytos.core.link import Link
13 1
from kytos.core.switch import Switch
14
from napps.kytos.topology import settings
15
from napps.kytos.topology.models import Topology
16 1
17
DEFAULT_LINK_UP_TIMER = 10
18
19
20
class Main(KytosNApp):  # pylint: disable=too-many-public-methods
21
    """Main class of kytos/topology NApp.
22 1
23
    This class is the entry point for this napp.
24 1
    """
25 1
26
    def setup(self):
27 1
        """Initialize the NApp's links list."""
28 1
        self.links = {}
29 1
        self.store_items = {}
30
        self.link_up_timer = getattr(settings, 'LINK_UP_TIMER',
31 1
                                     DEFAULT_LINK_UP_TIMER)
32
33
        self.verify_storehouse('switches')
34 1
        self.verify_storehouse('interfaces')
35
        self.verify_storehouse('links')
36
37
    def execute(self):
38 1
        """Do nothing."""
39
40
    def shutdown(self):
41
        """Do nothing."""
42
        log.info('NApp kytos/topology shutting down.')
43
44
    def _get_link_or_create(self, endpoint_a, endpoint_b):
45
        new_link = Link(endpoint_a, endpoint_b)
46
47
        for link in self.links.values():
48 1
            if new_link == link:
49
                return link
50
51
        self.links[new_link.id] = new_link
52
        return new_link
53 1
54
    def _get_switches_dict(self):
55
        """Return a dictionary with the known switches."""
56
        return {'switches': {s.id: s.as_dict() for s in
57
                             self.controller.switches.values()}}
58 1
59
    def _get_links_dict(self):
60
        """Return a dictionary with the known links."""
61
        return {'links': {l.id: l.as_dict() for l in
62
                          self.links.values()}}
63 1
64
    def _get_topology_dict(self):
65 1
        """Return a dictionary with the known topology."""
66
        return {'topology': {**self._get_switches_dict(),
67 1
                             **self._get_links_dict()}}
68
69
    def _get_topology(self):
70
        """Return an object representing the topology."""
71
        return Topology(self.controller.switches, self.links)
72
73
    def _get_link_from_interface(self, interface):
74 1
        """Return the link of the interface, or None if it does not exist."""
75
        for link in self.links.values():
76
            if interface in (link.endpoint_a, link.endpoint_b):
77
                return link
78
        return None
79
80
    @rest('v3/')
81
    def get_topology(self):
82
        """Return the latest known topology.
83 1
84
        This topology is updated when there are network events.
85
        """
86
        return jsonify(self._get_topology_dict())
87
88 1
    # Switch related methods
89
    @rest('v3/switches')
90
    def get_switches(self):
91
        """Return a json with all the switches in the topology."""
92
        return jsonify(self._get_switches_dict())
93
94
    @rest('v3/switches/<dpid>/enable', methods=['POST'])
95
    def enable_switch(self, dpid):
96
        """Administratively enable a switch in the topology."""
97 1
        try:
98
            self.controller.switches[dpid].enable()
99
            return jsonify("Operation successful"), 201
100
        except KeyError:
101
            return jsonify("Switch not found"), 404
102
103
    @rest('v3/switches/<dpid>/disable', methods=['POST'])
104
    def disable_switch(self, dpid):
105
        """Administratively disable a switch in the topology."""
106 1
        try:
107
            self.controller.switches[dpid].disable()
108
            return jsonify("Operation successful"), 201
109
        except KeyError:
110
            return jsonify("Switch not found"), 404
111
112
    @rest('v3/switches/<dpid>/metadata')
113
    def get_switch_metadata(self, dpid):
114
        """Get metadata from a switch."""
115 1
        try:
116
            return jsonify({"metadata":
117
                            self.controller.switches[dpid].metadata}), 200
118
        except KeyError:
119
            return jsonify("Switch not found"), 404
120
121
    @rest('v3/switches/<dpid>/metadata', methods=['POST'])
122
    def add_switch_metadata(self, dpid):
123
        """Add metadata to a switch."""
124
        metadata = request.get_json()
125
        try:
126
            switch = self.controller.switches[dpid]
127
        except KeyError:
128 1
            return jsonify("Switch not found"), 404
129
130
        switch.extend_metadata(metadata)
131
        self.notify_metadata_changes(switch, 'added')
132
        return jsonify("Operation successful"), 201
133
134
    @rest('v3/switches/<dpid>/metadata/<key>', methods=['DELETE'])
135
    def delete_switch_metadata(self, dpid, key):
136
        """Delete metadata from a switch."""
137
        try:
138
            switch = self.controller.switches[dpid]
139
        except KeyError:
140
            return jsonify("Switch not found"), 404
141 1
142
        switch.remove_metadata(key)
143
        self.notify_metadata_changes(switch, 'removed')
144
        return jsonify("Operation successful"), 200
145
146
    # Interface related methods
147
    @rest('v3/interfaces')
148
    def get_interfaces(self):
149
        """Return a json with all the interfaces in the topology."""
150
        interfaces = {}
151
        switches = self._get_switches_dict()
152 1
        for switch in switches['switches'].values():
153
            for interface_id, interface in switch['interfaces'].items():
154
                interfaces[interface_id] = interface
155
156
        return jsonify({'interfaces': interfaces})
157
158 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/enable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
159
    @rest('v3/interfaces/<interface_enable_id>/enable', methods=['POST'])
160
    def enable_interface(self, interface_enable_id=None, dpid=None):
161
        """Administratively enable interfaces in the topology."""
162
        error_list = []  # List of interfaces that were not activated.
163
        msg_error = "Some interfaces couldn't be found and activated: "
164
        if dpid is None:
165
            dpid = ":".join(interface_enable_id.split(":")[:-1])
166
        try:
167
            switch = self.controller.switches[dpid]
168
        except KeyError as exc:
169
            return jsonify(f"Switch not found: {exc}"), 404
170 1
171
        if interface_enable_id:
172
            interface_number = int(interface_enable_id.split(":")[-1])
173
174
            try:
175
                switch.interfaces[interface_number].enable()
176
            except KeyError as exc:
177
                error_list.append(f"Switch {dpid} Interface {exc}")
178
        else:
179
            for interface in switch.interfaces.values():
180
                interface.enable()
181
        if not error_list:
182
            return jsonify("Operation successful"), 200
183
        return jsonify({msg_error:
184
                        error_list}), 409
185
186 View Code Duplication
    @rest('v3/interfaces/switch/<dpid>/disable', methods=['POST'])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
187
    @rest('v3/interfaces/<interface_disable_id>/disable', methods=['POST'])
188 1
    def disable_interface(self, interface_disable_id=None, dpid=None):
189
        """Administratively disable interfaces in the topology."""
190
        error_list = []  # List of interfaces that were not deactivated.
191
        msg_error = "Some interfaces couldn't be found and deactivated: "
192
        if dpid is None:
193
            dpid = ":".join(interface_disable_id.split(":")[:-1])
194
        try:
195
            switch = self.controller.switches[dpid]
196
        except KeyError as exc:
197
            return jsonify(f"Switch not found: {exc}"), 404
198
199
        if interface_disable_id:
200
            interface_number = int(interface_disable_id.split(":")[-1])
201
202
            try:
203
                switch.interfaces[interface_number].disable()
204
            except KeyError as exc:
205 1
                error_list.append(f"Switch {dpid} Interface {exc}")
206
        else:
207
            for interface in switch.interfaces.values():
208
                interface.disable()
209
        if not error_list:
210
            return jsonify("Operation successful"), 200
211
        return jsonify({msg_error:
212
                        error_list}), 409
213
214
    @rest('v3/interfaces/<interface_id>/metadata')
215
    def get_interface_metadata(self, interface_id):
216
        """Get metadata from an interface."""
217
        switch_id = ":".join(interface_id.split(":")[:-1])
218
        interface_number = int(interface_id.split(":")[-1])
219
        try:
220
            switch = self.controller.switches[switch_id]
221
        except KeyError:
222
            return jsonify("Switch not found"), 404
223
224
        try:
225
            interface = switch.interfaces[interface_number]
226 1
        except KeyError:
227
            return jsonify("Interface not found"), 404
228
229
        return jsonify({"metadata": interface.metadata}), 200
230
231
    @rest('v3/interfaces/<interface_id>/metadata', methods=['POST'])
232
    def add_interface_metadata(self, interface_id):
233
        """Add metadata to an interface."""
234
        metadata = request.get_json()
235
236
        switch_id = ":".join(interface_id.split(":")[:-1])
237
        interface_number = int(interface_id.split(":")[-1])
238
        try:
239
            switch = self.controller.switches[switch_id]
240
        except KeyError:
241
            return jsonify("Switch not found"), 404
242
243
        try:
244
            interface = switch.interfaces[interface_number]
245
        except KeyError:
246
            return jsonify("Interface not found"), 404
247
248
        interface.extend_metadata(metadata)
249 1
        self.notify_metadata_changes(interface, 'added')
250
        return jsonify("Operation successful"), 201
251
252
    @rest('v3/interfaces/<interface_id>/metadata/<key>', methods=['DELETE'])
253
    def delete_interface_metadata(self, interface_id, key):
254
        """Delete metadata from an interface."""
255
        switch_id = ":".join(interface_id.split(":")[:-1])
256
        interface_number = int(interface_id.split(":")[-1])
257 1
258
        try:
259
            switch = self.controller.switches[switch_id]
260
        except KeyError:
261
            return jsonify("Switch not found"), 404
262
263
        try:
264
            interface = switch.interfaces[interface_number]
265
        except KeyError:
266
            return jsonify("Interface not found"), 404
267 1
268
        if interface.remove_metadata(key) is False:
269
            return jsonify("Metadata not found"), 404
270
271
        self.notify_metadata_changes(interface, 'removed')
272
        return jsonify("Operation successful"), 200
273
274
    # Link related methods
275
    @rest('v3/links')
276
    def get_links(self):
277 1
        """Return a json with all the links in the topology.
278
279
        Links are connections between interfaces.
280
        """
281
        return jsonify(self._get_links_dict()), 200
282
283
    @rest('v3/links/<link_id>/enable', methods=['POST'])
284
    def enable_link(self, link_id):
285 1
        """Administratively enable a link in the topology."""
286
        try:
287
            self.links[link_id].enable()
288
        except KeyError:
289
            return jsonify("Link not found"), 404
290
291
        return jsonify("Operation successful"), 201
292
293
    @rest('v3/links/<link_id>/disable', methods=['POST'])
294
    def disable_link(self, link_id):
295
        """Administratively disable a link in the topology."""
296
        try:
297
            self.links[link_id].disable()
298 1
        except KeyError:
299
            return jsonify("Link not found"), 404
300
301
        return jsonify("Operation successful"), 201
302
303
    @rest('v3/links/<link_id>/metadata')
304
    def get_link_metadata(self, link_id):
305
        """Get metadata from a link."""
306
        try:
307
            return jsonify({"metadata": self.links[link_id].metadata}), 200
308
        except KeyError:
309
            return jsonify("Link not found"), 404
310
311
    @rest('v3/links/<link_id>/metadata', methods=['POST'])
312 1
    def add_link_metadata(self, link_id):
313
        """Add metadata to a link."""
314
        metadata = request.get_json()
315
        try:
316
            link = self.links[link_id]
317
        except KeyError:
318
            return jsonify("Link not found"), 404
319 1
320 1
        link.extend_metadata(metadata)
321 1
        self.notify_metadata_changes(link, 'added')
322 1
        return jsonify("Operation successful"), 201
323 1
324
    @rest('v3/links/<link_id>/metadata/<key>', methods=['DELETE'])
325 1
    def delete_link_metadata(self, link_id, key):
326
        """Delete metadata from a link."""
327
        try:
328
            link = self.links[link_id]
329
        except KeyError:
330
            return jsonify("Link not found"), 404
331
332 1
        if link.remove_metadata(key) is False:
333 1
            return jsonify("Metadata not found"), 404
334 1
335 1
        self.notify_metadata_changes(link, 'removed')
336 1
        return jsonify("Operation successful"), 200
337
338 1
    @listen_to('.*.switch.(new|reconnected)')
339
    def handle_new_switch(self, event):
340
        """Create a new Device on the Topology.
341
342
        Handle the event of a new created switch and update the topology with
343 1
        this new device.
344 1
        """
345 1
        switch = event.content['switch']
346 1
        switch.activate()
347
        log.debug('Switch %s added to the Topology.', switch.id)
348 1
        self.notify_topology_update()
349
        self.update_instance_metadata(switch)
350
351 1
    @listen_to('.*.connection.lost')
352
    def handle_connection_lost(self, event):
353 1
        """Remove a Device from the topology.
354
355
        Remove the disconnected Device and every link that has one of its
356
        interfaces.
357
        """
358 1
        switch = event.content['source'].switch
359 1
        if switch:
360 1
            switch.deactivate()
361 1
            log.debug('Switch %s removed from the Topology.', switch.id)
362
            self.notify_topology_update()
363 1
364
    def handle_interface_up(self, event):
365
        """Update the topology based on a Port Modify event.
366 1
367
        The event notifies that an interface was changed to 'up'.
368 1
        """
369
        interface = event.content['interface']
370
        interface.activate()
371
        self.notify_topology_update()
372
        self.update_instance_metadata(interface)
373
374 1
    @listen_to('.*.switch.interface.created')
375 1
    def handle_interface_created(self, event):
376 1
        """Update the topology based on a Port Create event."""
377 1
        self.handle_interface_up(event)
378 1
379 1
    def handle_interface_down(self, event):
380 1
        """Update the topology based on a Port Modify event.
381
382 1
        The event notifies that an interface was changed to 'down'.
383
        """
384
        interface = event.content['interface']
385
        interface.deactivate()
386
        self.handle_interface_link_down(event)
387
        self.notify_topology_update()
388 1
389 1
    @listen_to('.*.switch.interface.deleted')
390 1
    def handle_interface_deleted(self, event):
391 1
        """Update the topology based on a Port Delete event."""
392 1
        self.handle_interface_down(event)
393 1
394
    @listen_to('.*.switch.interface.link_up')
395 1
    def handle_interface_link_up(self, event):
396
        """Update the topology based on a Port Modify event.
397
398 1
        The event notifies that an interface's link was changed to 'up'.
399 1
        """
400
        interface = event.content['interface']
401 1
        link = self._get_link_from_interface(interface)
402 1
        if not link:
403 1
            return
404
        if link.endpoint_a == interface:
405 1
            other_interface = link.endpoint_b
406 1
        else:
407
            other_interface = link.endpoint_a
408 1
        interface.activate()
409
        if other_interface.is_active() is False:
410
            return
411
        if link.is_active() is False:
412
            link.update_metadata('last_status_change', time.time())
413
            link.activate()
414
415
            # As each run of this method uses a different thread,
416
            # there is no risk this sleep will lock the NApp.
417
            time.sleep(self.link_up_timer)
418
419
            last_status_change = link.get_metadata('last_status_change')
420
            now = time.time()
421
            if link.is_active() and \
422
                    now - last_status_change >= self.link_up_timer:
423
                self.notify_topology_update()
424
                self.update_instance_metadata(link)
425
                self.notify_link_status_change(link)
426
427 1
    @listen_to('.*.switch.interface.link_down')
428
    def handle_interface_link_down(self, event):
429 1
        """Update the topology based on a Port Modify event.
430 1
431
        The event notifies that an interface's link was changed to 'down'.
432 1
        """
433
        interface = event.content['interface']
434 1
        link = self._get_link_from_interface(interface)
435
        if link and link.is_active():
436 1
            link.deactivate()
437 1
            link.update_metadata('last_status_change', time.time())
438 1
            self.notify_topology_update()
439
            self.notify_link_status_change(link)
440
441 1
    @listen_to('.*.interface.is.nni')
442 1
    def add_links(self, event):
443
        """Update the topology with links related to the NNI interfaces."""
444 1
        interface_a = event.content['interface_a']
445
        interface_b = event.content['interface_b']
446 1
447 1
        link = self._get_link_or_create(interface_a, interface_b)
448 1
        interface_a.update_link(link)
449
        interface_b.update_link(link)
450
451
        interface_a.nni = True
452
        interface_b.nni = True
453
454
        self.notify_topology_update()
455
456 1
    # def add_host(self, event):
457 1
    #    """Update the topology with a new Host."""
458
459 1
    #    interface = event.content['port']
460 1
    #    mac = event.content['reachable_mac']
461
462 1
    #    host = Host(mac)
463
    #    link = self.topology.get_link(interface.id)
464
    #    if link is not None:
465 1
    #        return
466 1
467 1
    #    self.topology.add_link(interface.id, host.id)
468
    #    self.topology.add_device(host)
469 1
470
    #    if settings.DISPLAY_FULL_DUPLEX_LINKS:
471
    #        self.topology.add_link(host.id, interface.id)
472
473
    def notify_topology_update(self):
474
        """Send an event to notify about updates on the topology."""
475
        name = 'kytos/topology.updated'
476
        event = KytosEvent(name=name, content={'topology':
477
                                               self._get_topology()})
478
        self.controller.buffers.app.put(event)
479
480
    def notify_link_status_change(self, link):
481
        """Send an event to notify about a status change on a link."""
482
        name = 'kytos/topology.'
483
        if link.is_active():
484
            status = 'link_up'
485
        else:
486
            status = 'link_down'
487
        event = KytosEvent(name=name+status, content={'link': link})
488
        self.controller.buffers.app.put(event)
489
490
    def notify_metadata_changes(self, obj, action):
491
        """Send an event to notify about metadata changes."""
492
        if isinstance(obj, Switch):
493
            entity = 'switch'
494
            entities = 'switches'
495 1
        elif isinstance(obj, Interface):
496
            entity = 'interface'
497
            entities = 'interfaces'
498
        elif isinstance(obj, Link):
499
            entity = 'link'
500
            entities = 'links'
501
502
        name = f'kytos/topology.{entities}.metadata.{action}'
503
        event = KytosEvent(name=name, content={entity: obj,
0 ignored issues
show
introduced by
The variable entity does not seem to be defined for all execution paths.
Loading history...
504 1
                                               'metadata': obj.metadata})
505
        self.controller.buffers.app.put(event)
506 1
        log.debug(f'Metadata from {obj.id} was {action}.')
507 1
508
    @listen_to('.*.switch.port.created')
509 1
    def notify_port_created(self, original_event):
510 1
        """Notify when a port is created."""
511 1
        name = 'kytos/topology.port.created'
512
        event = KytosEvent(name=name, content=original_event.content)
513 1
        self.controller.buffers.app.put(event)
514
515
    @listen_to('kytos/topology.*.metadata.*')
516
    def save_metadata_on_store(self, event):
517
        """Send to storehouse the data updated."""
518
        name = 'kytos.storehouse.update'
519
        if 'switch' in event.content:
520
            store = self.store_items.get('switches')
521
            obj = event.content.get('switch')
522
            namespace = 'kytos.topology.switches.metadata'
523
        elif 'interface' in event.content:
524
            store = self.store_items.get('interfaces')
525
            obj = event.content.get('interface')
526
            namespace = 'kytos.topology.iterfaces.metadata'
527
        elif 'link' in event.content:
528
            store = self.store_items.get('links')
529
            obj = event.content.get('link')
530
            namespace = 'kytos.topology.links.metadata'
531
532 1
        store.data[obj.id] = obj.metadata
0 ignored issues
show
introduced by
The variable obj does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable store does not seem to be defined for all execution paths.
Loading history...
533
        content = {'namespace': namespace,
0 ignored issues
show
introduced by
The variable namespace does not seem to be defined for all execution paths.
Loading history...
534
                   'box_id': store.box_id,
535
                   'data': store.data,
536
                   'callback': self.update_instance}
537
538
        event = KytosEvent(name=name, content=content)
539
        self.controller.buffers.app.put(event)
540
541 1
    @staticmethod
542
    def update_instance(event, _data, error):
543
        """Display in Kytos console if the data was updated."""
544
        entities = event.content.get('namespace', '').split('.')[-2]
545
        if error:
546
            log.error(f'Error trying to update storehouse {entities}.')
547
        else:
548
            log.debug(f'Storehouse update to entities: {entities}.')
549
550
    def verify_storehouse(self, entities):
551
        """Request a list of box saved by specific entity."""
552
        name = 'kytos.storehouse.list'
553
        content = {'namespace': f'kytos.topology.{entities}.metadata',
554
                   'callback': self.request_retrieve_entities}
555
        event = KytosEvent(name=name, content=content)
556
        self.controller.buffers.app.put(event)
557
        log.info(f'verify data in storehouse for {entities}.')
558
559
    def request_retrieve_entities(self, event, data, _error):
560
        """Create a box or retrieve an existent box from storehouse."""
561
        msg = ''
562
        content = {'namespace': event.content.get('namespace'),
563
                   'callback': self.load_from_store,
564
                   'data': {}}
565
566
        if not data:
567
            name = 'kytos.storehouse.create'
568
            msg = 'Create new box in storehouse'
569
        else:
570
            name = 'kytos.storehouse.retrieve'
571
            content['box_id'] = data[0]
572
            msg = 'Retrieve data from storeohouse.'
573
574
        event = KytosEvent(name=name, content=content)
575
        self.controller.buffers.app.put(event)
576
        log.debug(msg)
577
578
    def load_from_store(self, event, box, error):
579
        """Save the data retrived from storehouse."""
580
        entities = event.content.get('namespace', '').split('.')[-2]
581
        if error:
582
            log.error('Error while get a box from storehouse.')
583
        else:
584
            self.store_items[entities] = box
585
            log.debug('Data updated')
586
587
    def update_instance_metadata(self, obj):
588
        """Update object instance with saved metadata."""
589
        metadata = None
590
        if isinstance(obj, Interface):
591
            all_metadata = self.store_items.get('interfaces', None)
592
            if all_metadata:
593
                metadata = all_metadata.data.get(obj.id)
594
        elif isinstance(obj, Switch):
595
            all_metadata = self.store_items.get('switches', None)
596
            if all_metadata:
597
                metadata = all_metadata.data.get(obj.id)
598
        elif isinstance(obj, Link):
599
            all_metadata = self.store_items.get('links', None)
600
            if all_metadata:
601
                metadata = all_metadata.data.get(obj.id)
602
603
        if metadata:
604
            obj.extend_metadata(metadata)
605
            log.debug(f'Metadata to {obj.id} was updated')
606