Test Failed
Pull Request — master (#160)
by
unknown
02:43
created

vlan_pool   A

Complexity

Total Complexity 38

Size/Duplication

Total Lines 210
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 158
dl 0
loc 210
rs 9.36
c 0
b 0
f 0
wmc 38

4 Functions

Rating   Name   Duplication   Size   Complexity  
A generate_ranges() 0 17 5
B get_range() 0 27 8
C update_database() 0 67 10
F aggregate_outdated_interfaces() 0 71 15
1
import datetime
2
import sys
3
import os
4
from collections import defaultdict
5
from kytos.core.db import Mongo
6
7
DEFAULT_TAG_RANGES = [[1, 4095]]
8
9
def get_range(vlans, avoid) -> list[list[int]]:
10
    """Convert available_vlans to available_tags.
11
    From list[int] to list[list[int]]"""
12
    result = []
13
    if not vlans:
14
        return result
15
    vlans.sort()
16
    i = 0
17
    while i < len(vlans):
18
        if vlans[i] in avoid:
19
            i += 1
20
        else:
21
            break
22
    if not vlans[i:]:
23
        return result
24
25
    start = end = vlans[i]
26
    for tag in vlans[i+1:]:
27
        if tag in avoid:
28
            continue
29
        if tag == end + 1:
30
            end = tag
31
        else:
32
            result.append([start, end])
33
            start = end = tag
34
    result.append([start, end])
35
    return result
36
37
def generate_ranges(avoid) -> [list[list[int]]]:
38
    """Generate available_tags only from avoid"""
39
    if not avoid:
40
        return DEFAULT_TAG_RANGES
41
42
    avoid.sort()
43
    ranges = []
44
    start = 1
45
46
    for num in avoid:
47
        if num > start:
48
            ranges.append([start, num - 1])
49
        start = num + 1
50
51
    if start <= 4095:
52
        ranges.append([start, 4095])
53
    return ranges
54
55
def update_database(mongo: Mongo):
56
    """Update database"""
57
    db = mongo.client[mongo.db_name]
58
    intf_documents = db.interface_details.find()
59
    evc_documents = db.evcs.find()
60
61
    evc_intf = defaultdict(set)
62
    evc_tags = defaultdict(set)
63
64
    for evc in evc_documents:
65
        tag_a = evc["uni_a"].get("tag")
66
        if tag_a:
67
            intf_id = evc["uni_a"]["interface_id"]
68
            if tag_a["value"] in evc_tags[intf_id]:
69
                print(f"Error: Detected duplicated {tag_a['value']} TAG"
70
                      f" in EVCs {evc['id']} and {evc_intf[intf_id].pop()}"
71
                      f" in interface {intf_id}")
72
                sys.exit(1)
73
            evc_tags[intf_id].add(tag_a["value"])
74
            evc_intf[intf_id].add(evc["id"])
75
76
        tag_z = evc["uni_z"].get("tag")
77
        if tag_z:
78
            intf_id = evc["uni_z"]["interface_id"]
79
            if tag_z["value"] in evc_tags[intf_id]:
80
                print(f"Error: Detected duplicated {tag_z['value']} TAG"
81
                      f" in EVCs {evc['id']} and {evc_intf[intf_id].pop()}"
82
                      f" in interface {intf_id}")
83
                sys.exit(1)
84
            evc_tags[intf_id].add(tag_z["value"])
85
            evc_intf[intf_id].add(evc["id"])
86
87
    intf_count = 0
88
    for document in intf_documents:
89
        avoid_tags = evc_tags.pop(document["id"], set())
90
        if document.get("available_vlans") is None:
91
            continue
92
        ranges = get_range(document["available_vlans"], avoid_tags)
93
        result = db.interface_details.update_one(
94
            {"id": document["id"]},
95
            {
96
                "$set": 
97
                {
98
                    "available_tags": {"vlan": ranges},
99
                    "tag_ranges": {"vlan": DEFAULT_TAG_RANGES}
100
                },
101
                "$unset": {"available_vlans": ""}
102
            }
103
        )
104
        intf_count += result.modified_count
105
106
    evc_intf_count = 0
107
    for intf_id, avoid_tags in evc_tags.items():
108
        available_tags = generate_ranges(list(avoid_tags))
109
        utc_now = datetime.datetime.utcnow()
110
        result = db.interface_details.insert_one({
111
            "_id": intf_id,
112
            "id": intf_id,
113
            "inserted_at": utc_now,
114
            "updated_at": utc_now,
115
            "available_tags": {"vlan": available_tags},
116
            "tag_ranges": {"vlan": DEFAULT_TAG_RANGES},
117
        })
118
        if result:
119
            evc_intf_count += 1
120
121
    print(f"{intf_count} documents modified. {evc_intf_count} documents inserted")
122
123
def aggregate_outdated_interfaces(mongo: Mongo):
124
    """Aggregate outdated inteface details"""
125
    db = mongo.client[mongo.db_name]
126
    document_ids = set()
127
    result = db.interface_details.aggregate(
128
        [
129
            {"$sort": {"_id": 1}},
130
            {"$project": {
131
                "_id": 0,
132
                "id": 1,
133
                "max_number": {"$max": "$available_vlans"},
134
                "min_number": {"$min": "$available_vlans"},
135
            }}
136
        ]
137
    )
138
    
139
    messages = ""
140
    for document in result:
141
        document_ids.add(document["id"])
142
        if document.get("available_vlans") is None:
143
            continue
144
        messages += str(document) + "\n"
145
146
    if messages != "":
147
        print("Here are the outdated interfaces. 'available_vlans' have a massive"
148
          " amount of items, minimum and maximum items will be shown only")
149
        print(messages)
150
        print()
151
    
152
    evc_documents = db.evcs.find()
153
    evc_intf = defaultdict(set)
154
    evc_tags = defaultdict(set)
155
156
    for evc in evc_documents:
157
        tag_a = evc["uni_a"].get("tag")
158
        if tag_a:
159
            intf_id = evc["uni_a"]["interface_id"]
160
            if tag_a["value"] in evc_tags[intf_id]:
161
                print(f"WARNING: Detected duplicated {tag_a['value']} TAG"
162
                      f" in EVCs {evc['id']} and {evc_intf[intf_id].pop()}"
163
                      f" in interface {intf_id}")
164
                print()
165
            evc_tags[intf_id].add(tag_a["value"])
166
            evc_intf[intf_id].add(evc["id"])
167
168
        tag_z = evc["uni_z"].get("tag")
169
        if tag_z:
170
            intf_id = evc["uni_z"]["interface_id"]
171
            if tag_z["value"] in evc_tags[intf_id]:
172
                print(f"WARNING: Detected duplicated {tag_z['value']} TAG"
173
                      f" in EVCs {evc['id']} and {evc_intf[intf_id].pop()}"
174
                      f" in interface {intf_id}")
175
                print()
176
            evc_tags[intf_id].add(tag_z["value"])
177
            evc_intf[intf_id].add(evc["id"])
178
179
    for id_ in document_ids:
180
        evc_tags.pop(id_, None)
181
182
    if evc_tags:
183
        print("New documents are going to be created. From the next interfaces,"
184
              " these tags should be avoided")
185
186
    for intf, avoid_tags in evc_tags.items():
187
        if intf in document_ids:
188
            continue
189
        aux = {"id": intf, "avoid_tags": avoid_tags}
190
        print(aux)
191
192
    if not evc_tags and messages == "":
193
        print("There is nothing to update or add")
194
195
196
if __name__ == "__main__":
197
    mongo = Mongo()
198
    cmds = {
199
        "aggregate_outdated_interfaces": aggregate_outdated_interfaces,
200
        "update_database": update_database,
201
    }
202
    try:
203
        cmd = os.environ["CMD"]
204
        cmds[cmd](mongo)
205
    except KeyError:
206
        print(
207
            f"Please set the 'CMD' env var. \nIt has to be one of these: {list(cmds.keys())}"
208
        )
209
        sys.exit(1)
210