Passed
Push — master ( 575e05...d500d0 )
by Aldo
02:40 queued 15s
created

002_recover_vlans   A

Complexity

Total Complexity 8

Size/Duplication

Total Lines 99
Duplicated Lines 45.45 %

Importance

Changes 0
Metric Value
wmc 8
eloc 83
dl 45
loc 99
rs 10
c 0
b 0
f 0

1 Function

Rating   Name   Duplication   Size   Complexity  
B get_range() 27 27 8

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
from collections import defaultdict
2
from kytos.core.tag_ranges import range_difference
3
4
# Change to False so this script makes changes
5
DRY_RUN = True
6
# Modify with VLAN used in of_lldp
7
OF_LLDP_VLAN = 3799
8
# Change to False to not print the missing VLANs
9
PRINT_MISSING = True
10
11 View Code Duplication
def get_range(vlans, avoid) -> list[list[int]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
12
    """Convert available_vlans to available_tags.
13
    From list[int] to list[list[int]]"""
14
    result = []
15
    if not vlans:
16
        return result
17
    vlans.sort()
18
    i = 0
19
    while i < len(vlans):
20
        if vlans[i] in avoid:
21
            i += 1
22
        else:
23
            break
24
    if not vlans[i:]:
25
        return result
26
27
    start = end = vlans[i]
28
    for tag in vlans[i+1:]:
29
        if tag in avoid:
30
            continue
31
        if tag == end + 1:
32
            end = tag
33
        else:
34
            result.append([start, end])
35
            start = end = tag
36
    result.append([start, end])
37
    return result
38
39
mef_eline = controller.napps[('kytos', 'mef_eline')]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable controller does not seem to be defined.
Loading history...
40
evcs = {evc_id: evc.as_dict() for evc_id, evc in mef_eline.circuits.items() if not evc.archived}
41
interfaces = {}
42
for dpid, switch in controller.switches.items():
43
    for intf_id, interface in switch.interfaces.items():
44
        interfaces[interface.id] = interface
45
46
47
in_use_tags = defaultdict(set)
48
for evc_id, evc in evcs.items():
49
    for link in evc["current_path"]:
50
        svlan = link["metadata"]["s_vlan"]["value"]
51
        intfa = link["endpoint_a"]["id"]
52
        intfb = link["endpoint_b"]["id"]
53
        in_use_tags[intfa].add(svlan)
54
        in_use_tags[intfb].add(svlan)
55
    for link in evc["failover_path"]:
56
        svlan = link["metadata"]["s_vlan"]["value"]
57
        intfa = link["endpoint_a"]["id"]
58
        intfb = link["endpoint_b"]["id"]
59
        in_use_tags[intfa].add(svlan)
60
        in_use_tags[intfb].add(svlan)
61
    for uni in ("uni_a", "uni_z"):
62
        intf_id = evc[uni]["interface_id"]
63 View Code Duplication
        if (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
64
            "tag" in evc[uni]
65
            and evc[uni]["tag"]
66
            and "tag_type" in evc[uni]["tag"]
67
            and evc[uni]["tag"]["tag_type"] in ("vlan", 1)
68
        ):
69
            tag = evc[uni]["tag"]["value"]
70
            if isinstance(tag, int):
71
                in_use_tags[intf_id].add((tag, evc_id))
72
            elif isinstance(tag, list):
73
                for tag_item in tag:
74
                    if isinstance(tag_item, int):
75
                        in_use_tags[intf_id].add(tag_item)
76
                    elif isinstance(tag_item, list) and len(tag_item) == 1:
77
                        in_use_tags[intf_id].add(tag_item[0])
78
                    elif isinstance(tag_item, list) and len(tag_item) == 2:
79
                        for val in range(tag_item[0], tag_item[1]+1):
80
                            in_use_tags[intf_id].add(val)
81
82
for intf_id, interface in interfaces.items():
83
    vlans = in_use_tags.get(intf_id, set())
84
    if OF_LLDP_VLAN:
85
        vlans.add(OF_LLDP_VLAN)
86
    vlans = get_range(sorted(list(vlans)), set())
87
    intf = controller.get_interface_by_id(intf_id)
88
    tag_range = intf.tag_ranges["vlan"]
89
    available_tags = range_difference(tag_range, vlans)
90
    if intf.available_tags["vlan"] != available_tags:
91
        print(f"Missing available tags in interface {intf_id}:\n"
92
              f"WRONG -> {intf.available_tags['vlan']}\n"
93
              f"CORRECT -> {available_tags}")
94
        if PRINT_MISSING:
95
            print(f"MISSING -> {range_difference(available_tags, intf.available_tags['vlan'])}")
96
        print("\n")
97
        if not DRY_RUN:
98
            intf.make_tags_available(controller, available_tags, 'vlan')
99