001_retire_vlans.disable_evc()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 13
Code Lines 12

Duplication

Lines 13
Ratio 100 %

Importance

Changes 0
Metric Value
cc 4
eloc 12
nop 1
dl 13
loc 13
rs 9.8
c 0
b 0
f 0
1
from collections import defaultdict
2
import concurrent.futures
3
4
import kytos.core.tag_ranges as range_helpers
5
6
# Change to False so this script makes changes
7
DRY_RUN = True
8
# Modify with VLAN ranges being retired from use
9
RETIRED_VLANS = [[4095, 4095]]
10
11 View Code Duplication
def disable_evc(evc_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
12
    import httpx
13
    MEF_ELINE_URL = 'http://localhost:8181/api/kytos/mef_eline/v2'
14
    url = f"{MEF_ELINE_URL}/evc/{evc_id}"
15
    data = {
16
        "enabled": False
17
    }
18
    try:
19
        res = httpx.request("PATCH", url, json=data, timeout=30)
20
    except httpx.TimeoutException:
21
        print(f"Timeout while enabling EVC {evc_id}")
22
    if res.is_server_error or res.status_code in {424, 404, 400}:
23
        print(f"Error disabling EVC {evc_id}: {res.text}")
24
25 View Code Duplication
def enable_evc(evc_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
26
    import httpx
27
    MEF_ELINE_URL = 'http://localhost:8181/api/kytos/mef_eline/v2'
28
    url = f"{MEF_ELINE_URL}/evc/{evc_id}"
29
    data = {
30
        "enabled": True
31
    }
32
    try:
33
        res = httpx.request("PATCH", url, json=data, timeout=30)
34
    except httpx.TimeoutException:
35
        print(f"Timeout while enabling EVC {evc_id}")
36
    if res.is_server_error or res.status_code in {424, 404, 400}:
37
        print(f"Error enabling EVC {evc_id}: {res.text}")
38
39
mef_eline = controller.napps[("kytos", "mef_eline")]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable controller does not seem to be defined.
Loading history...
40
of_lldp = controller.napps[('kytos', 'of_lldp')]
41
topology = controller.napps[('kytos', 'topology')]
42
43
evcs = {
44
    evc_id: evc.as_dict()
45
    for evc_id, evc in mef_eline.circuits.items()
46
    if not evc.archived
47
}
48
49
in_use_tags = defaultdict(list)
50
for evc_id, evc in evcs.items():
51
    for link in evc["current_path"]:
52
        svlan = link["metadata"]["s_vlan"]["value"]
53
        intfa = link["endpoint_a"]["id"]
54
        intfb = link["endpoint_b"]["id"]
55
        in_use_tags[intfa].append((svlan, evc_id))
56
        in_use_tags[intfb].append((svlan, evc_id))
57
    for link in evc["failover_path"]:
58
        svlan = link["metadata"]["s_vlan"]["value"]
59
        intfa = link["endpoint_a"]["id"]
60
        intfb = link["endpoint_b"]["id"]
61
        in_use_tags[intfa].append((svlan, evc_id))
62
        in_use_tags[intfb].append((svlan, evc_id))
63
64
    for uni in ("uni_a", "uni_z"):
65
        intf_id = evc[uni]["interface_id"]
66 View Code Duplication
        if (
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
67
            "tag" in evc[uni]
68
            and evc[uni]["tag"]
69
            and "tag_type" in evc[uni]["tag"]
70
            and evc[uni]["tag"]["tag_type"] in ("vlan", 1)
71
        ):
72
            tag = evc[uni]["tag"]["value"]
73
            if isinstance(tag, int):
74
                in_use_tags[intf_id].append((tag, evc_id))
75
            elif isinstance(tag, list):
76
                for tag_item in tag:
77
                    if isinstance(tag_item, int):
78
                        in_use_tags[intf_id].append((tag_item, evc_id))
79
                    elif isinstance(tag_item, list) and len(tag_item) == 1:
80
                        in_use_tags[intf_id].append((tag_item[0], evc_id))
81
                    elif isinstance(tag_item, list) and len(tag_item) == 2:
82
                        for val in range(tag_item[0], tag_item[1]+1):
83
                            in_use_tags[intf_id].append((val, evc_id))
84
85
evc_disable_set = set()
86
87
dry_run_key = "WILL" if not DRY_RUN else "WOULD"
88
89
print("Checking EVCs for vlan usage...")
90
91
# Find all evcs to temporarily disable
92
for intf_id in in_use_tags:
93
    for tag, evc_id in in_use_tags[intf_id]:
94
        if evc_id in evc_disable_set:
95
            continue
96
        intersect = range_helpers.range_intersection([[tag, tag]], RETIRED_VLANS)
97
        intersect = list(intersect)
98
        if intersect:
99
            print(
100
                f"EVC {evc_id} is using s_vlan {intersect} which is pending retirement, {dry_run_key} temporarily disable it..."
101
            )
102
            evc_disable_set.add(evc_id)
103
104
# Disable EVCs
105
if not DRY_RUN and evc_disable_set:
106
    print("Disabling EVCs...")
107
    executor = concurrent.futures.ThreadPoolExecutor(10, "script:disable_evc")
108
    executor.map(disable_evc, evc_disable_set)
109
    executor.shutdown()
110
111
# Retire the given vlan ranges
112
113
if not DRY_RUN:
114
    print("Clearing vlan from tag_ranges and available_tags")
115
else:
116
    print("Checking interfaces for vlan usage...")
117
118
for dpid in list(controller.switches.keys()):
119
    switch = controller.get_switch_by_dpid(dpid)
120
    for intf_id, intf in switch.interfaces.copy().items():
121
        with intf._tag_lock:
122
            old_range = intf.tag_ranges['vlan']
123
            used_tags = range_helpers.range_difference(
124
                old_range, intf.available_tags['vlan']
125
            )
126
            new_range = range_helpers.range_difference(
127
                old_range, RETIRED_VLANS
128
            )
129
            missing = range_helpers.range_difference(
130
                used_tags, new_range
131
            )
132
            if missing:
133
                if not DRY_RUN:
134
                    print(
135
                        f"WARNING: Interface {dpid} {intf_id} still has the vlans {missing} in use. Can't retire vlans."
136
                    )
137
                else:
138
                    print(
139
                        f"Interface {dpid} {intf_id} has the vlans {missing} in use."
140
                    )
141
                continue
142
            change = range_helpers.range_difference(
143
                old_range, new_range
144
            )
145
            if not DRY_RUN and change:
146
                new_available_tags = range_helpers.range_difference(
147
                    new_range, used_tags
148
                )
149
                intf.available_tags['vlan'] = new_available_tags
150
                intf.tag_ranges['vlan'] = new_range
151
                topology.handle_on_interface_tags(intf)
152
153
# Re-enable the evcs
154
155
if not DRY_RUN and evc_disable_set:
156
    print("Re-enabling EVCs...")
157
    executor = concurrent.futures.ThreadPoolExecutor(10, "script:enable_evc")
158
    executor.map(enable_evc, evc_disable_set)
159
    executor.shutdown()
160
161
print("Finished!")
162