002_recover_vlans.get_range()   B
last analyzed

Complexity

Conditions 8

Size

Total Lines 27
Code Lines 22

Duplication

Lines 27
Ratio 100 %

Importance

Changes 0
Metric Value
cc 8
eloc 22
nop 2
dl 27
loc 27
rs 7.3333
c 0
b 0
f 0
1
from collections import defaultdict
2
from kytos.core.tag_ranges import range_difference
3
4
# Change to False so this script makes changes
5
DRY_RUN = True
6
# Modify with VLAN used in of_lldp
7
OF_LLDP_VLAN = 3799
8
# Change to False to not print the missing VLANs
9
PRINT_MISSING = True
10
11
def get_cookie(dpid: str) -> int:
12
    """Return the cookie integer given a dpid."""
13
    COOKIE_PREFIX = 0xab
14
    return (0x0000FFFFFFFFFFFF & int(dpid.replace(":", ""), 16) | (COOKIE_PREFIX << 56))
15
16 View Code Duplication
def get_range(vlans, avoid) -> list[list[int]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
17
    """Convert available_vlans to available_tags.
18
    From list[int] to list[list[int]]"""
19
    result = []
20
    if not vlans:
21
        return result
22
    vlans.sort()
23
    i = 0
24
    while i < len(vlans):
25
        if vlans[i] in avoid:
26
            i += 1
27
        else:
28
            break
29
    if not vlans[i:]:
30
        return result
31
32
    start = end = vlans[i]
33
    for tag in vlans[i+1:]:
34
        if tag in avoid:
35
            continue
36
        if tag == end + 1:
37
            end = tag
38
        else:
39
            result.append([start, end])
40
            start = end = tag
41
    result.append([start, end])
42
    return result
43
44
mef_eline = controller.napps[('kytos', 'mef_eline')]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable controller does not seem to be defined.
Loading history...
45
evcs = {evc_id: evc.as_dict() for evc_id, evc in mef_eline.circuits.items() if not evc.archived}
46
47
in_use_tags = defaultdict(set)
48
for evc_id, evc in evcs.items():
49
    for link in evc["current_path"]:
50
        svlan = link["metadata"]["s_vlan"]["value"]
51
        intfa = link["endpoint_a"]["id"]
52
        intfb = link["endpoint_b"]["id"]
53
        in_use_tags[intfa].add(svlan)
54
        in_use_tags[intfb].add(svlan)
55
    for link in evc["failover_path"]:
56
        svlan = link["metadata"]["s_vlan"]["value"]
57
        intfa = link["endpoint_a"]["id"]
58
        intfb = link["endpoint_b"]["id"]
59
        in_use_tags[intfa].add(svlan)
60
        in_use_tags[intfb].add(svlan)
61
    for uni in ("uni_a", "uni_z"):
62
        intf_id = evc[uni]["interface_id"]
63 View Code Duplication
        if (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
64
            "tag" in evc[uni]
65
            and evc[uni]["tag"]
66
            and "tag_type" in evc[uni]["tag"]
67
            and evc[uni]["tag"]["tag_type"] in ("vlan", 1)
68
        ):
69
            tag = evc[uni]["tag"]["value"]
70
            if isinstance(tag, int):
71
                in_use_tags[intf_id].add(tag)
72
            elif isinstance(tag, list):
73
                for tag_item in tag:
74
                    if isinstance(tag_item, int):
75
                        in_use_tags[intf_id].add(tag_item)
76
                    elif isinstance(tag_item, list) and len(tag_item) == 1:
77
                        in_use_tags[intf_id].add(tag_item[0])
78
                    elif isinstance(tag_item, list) and len(tag_item) == 2:
79
                        for val in range(tag_item[0], tag_item[1]+1):
80
                            in_use_tags[intf_id].add(val)
81
82
switch_rm_flows = {}
83
flow_manager = controller.napps[('kytos', 'flow_manager')]
84
for dpid in list(controller.switches.keys()):
85
    switch = controller.get_switch_by_dpid(dpid)
86
    of_lldp_cookie = get_cookie(switch.id)
87
    switch_lldp_flow = flow_manager.flow_controller.get_flows_by_cookie_ranges(
88
        [dpid], [(of_lldp_cookie, of_lldp_cookie)]
89
    )
90
    of_lldp_flow_flag = bool(switch_lldp_flow[dpid])
91
92
    for interface in switch.interfaces.copy().values():
93
        intf_id = interface.id
94
        vlans = in_use_tags.get(intf_id, set())
95
        if OF_LLDP_VLAN and switch.is_enabled() and of_lldp_flow_flag:
96
            vlans.add(OF_LLDP_VLAN)
97
        vlans = get_range(sorted(list(vlans)), set())
98
        intf = controller.get_interface_by_id(intf_id)
99
        tag_range = intf.tag_ranges["vlan"]
100
        available_tags = range_difference(tag_range, vlans)
101
        if intf.available_tags["vlan"] != available_tags:
102
            print(f"Inconsistent available tags in interface {intf_id}:\n"
103
                  f"WRONG -> {intf.available_tags['vlan']}\n"
104
                  f"CORRECT -> {available_tags}")
105
            if PRINT_MISSING:
106
                print(f"AVAILABLE MISSING -> {range_difference(available_tags, intf.available_tags['vlan'])}")
107
            if not DRY_RUN:
108
                intf.make_tags_available(controller, available_tags, 'vlan')
109
            print("\n")
110