Passed
Pull Request — master (#160)
by
unknown
06:25
created

vlan_pool   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 101
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 79
dl 0
loc 101
rs 10
c 0
b 0
f 0
wmc 13

2 Functions

Rating   Name   Duplication   Size   Complexity  
A generate_ranges() 0 16 5
B get_range() 0 27 8
1
import datetime
2
from collections import defaultdict
3
from kytos.core.db import mongo_client
4
5
DEFAULT_TAG_RANGES = [[1, 4095]]
6
7
def get_range(vlans, avoid) -> list[list[int]]:
8
    """Convert available_vlans to available_tags.
9
    From list[int] to list[list[int]]"""
10
    result = []
11
    if not vlans:
12
        return result
13
    vlans.sort()
14
    i = 0
15
    while i < len(vlans):
16
        if vlans[i] in avoid:
17
            i += 1
18
        else:
19
            break
20
    if not vlans[i:]:
21
        return result
22
23
    start = end = vlans[i]
24
    for tag in vlans[i+1:]:
25
        if tag in avoid:
26
            continue
27
        if tag == end + 1:
28
            end = tag
29
        else:
30
            result.append([start, end])
31
            start = end = tag
32
    result.append([start, end])
33
    return result
34
35
def generate_ranges(avoid):
36
    if not avoid:
37
        return DEFAULT_TAG_RANGES
38
39
    avoid.sort()
40
    ranges = []
41
    start = 1
42
43
    for num in avoid:
44
        if num > start:
45
            ranges.append([start, num - 1])
46
        start = num + 1
47
48
    if start <= 4095:
49
        ranges.append([start, 4095])
50
    return ranges
51
52
client = mongo_client()
53
intf_collection = client["napps"]["interface_details"]
54
evc_collection = client["napps"]["evcs"]
55
intf_documents = intf_collection.find()
56
evc_documents = evc_collection.find()
57
58
evc_intf_dict = defaultdict(set)
59
60
for evc in evc_documents:
61
    tag_a = evc["uni_a"].get("tag")
62
    if tag_a:
63
        intf_id = evc["uni_a"]["interface_id"]
64
        evc_intf_dict[intf_id].add(tag_a["value"])
65
66
    tag_z = evc["uni_z"].get("tag")
67
    if tag_z:
68
        intf_id = evc["uni_z"]["interface_id"]
69
        evc_intf_dict[intf_id].add(tag_z["value"])
70
71
for document in intf_documents:
72
    if document.get("available_vlans") is None:
73
        continue
74
    avoid_tags = evc_intf_dict.pop(document["id"], set())
75
    ranges = get_range(document["available_vlans"], avoid_tags)
76
    intf_collection.update_one(
77
        {"id": document["id"]},
78
        {
79
            "$set": 
80
            {
81
                "available_tags": {"vlan": ranges},
82
                "tag_ranges": {"vlan": DEFAULT_TAG_RANGES}
83
            },
84
            "$unset": {"available_vlans": ""}
85
        }
86
    )
87
88
for intf_id, avoid_tags in evc_intf_dict.items():
89
    available_tags = generate_ranges(list(avoid_tags))
90
    utc_now = datetime.datetime.utcnow()
91
    intf_collection.insert_one({
92
        "_id": intf_id,
93
        "id": intf_id,
94
        "inserted_at": utc_now,
95
        "updated_at": utc_now,
96
        "available_tags": {"vlan": available_tags},
97
        "tag_ranges": {"vlan": DEFAULT_TAG_RANGES},
98
    })
99
100
print("Finnished adding available_tags and tag_ranges")