000_vlan_pool.aggregate_outdated_interfaces()   F
last analyzed

Complexity

Conditions 17

Size

Total Lines 72
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 53
nop 1
dl 0
loc 72
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like 000_vlan_pool.aggregate_outdated_interfaces() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import datetime
2
import json
3
import sys
4
import os
5
from collections import defaultdict
6
from kytos.core.db import Mongo
7
8
DEFAULT_TAG_RANGES = [[1, 4095]]
9
custom_tag_range = json.loads(os.environ.get("CUSTOM_TAG_RANGE", "{}"))
10
11
12
def get_tag_range(intf_id) -> [list[list[int]]]:
13
    return custom_tag_range.get(intf_id, DEFAULT_TAG_RANGES)
14
15 View Code Duplication
def get_range(vlans, avoid) -> list[list[int]]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
16
    """Convert available_vlans to available_tags.
17
    From list[int] to list[list[int]]"""
18
    result = []
19
    if not vlans:
20
        return result
21
    vlans.sort()
22
    i = 0
23
    while i < len(vlans):
24
        if vlans[i] in avoid:
25
            i += 1
26
        else:
27
            break
28
    if not vlans[i:]:
29
        return result
30
31
    start = end = vlans[i]
32
    for tag in vlans[i+1:]:
33
        if tag in avoid:
34
            continue
35
        if tag == end + 1:
36
            end = tag
37
        else:
38
            result.append([start, end])
39
            start = end = tag
40
    result.append([start, end])
41
    return result
42
43
def generate_ranges(avoid, intf_id) -> [list[list[int]]]:
44
    """Generate available_tags only from avoid"""
45
    if not avoid:
46
        return get_tag_range(intf_id)
47
48
    avoid.sort()
49
    ranges = []
50
    start = 1
51
52
    for num in avoid:
53
        if num > start:
54
            ranges.append([start, num - 1])
55
        start = num + 1
56
57
    if start <= 4095:
58
        ranges.append([start, 4095])
59
    return ranges
60
61
def update_database(mongo: Mongo):
62
    """Update database"""
63
    db = mongo.client[mongo.db_name]
64
    intf_documents = db.interface_details.find()
65
    evc_documents = db.evcs.find({"archived": False})
66
67
    evc_intf = defaultdict(str)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
68
    evc_tags = defaultdict(set)
69
70
    for evc in evc_documents:
71
        tag_a = evc["uni_a"].get("tag")
72
        if tag_a:
73
            intf_id = evc["uni_a"]["interface_id"]
74
            if tag_a["value"] in evc_tags[intf_id] and isinstance(tag_a["value"], int):
75
                print(f"Error: Detected duplicated {tag_a['value']} TAG"
76
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_a['value'])]}"
77
                      f" in interface {intf_id}")
78
                sys.exit(1)
79
            evc_tags[intf_id].add(tag_a["value"])
80
            evc_intf[intf_id+str(tag_a["value"])] = evc["id"]
81
82
        tag_z = evc["uni_z"].get("tag")
83
        if tag_z:
84
            intf_id = evc["uni_z"]["interface_id"]
85
            if tag_z["value"] in evc_tags[intf_id] and isinstance(tag_z["value"], int):
86
                print(f"Error: Detected duplicated {tag_z['value']} TAG"
87
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_z['value'])]}"
88
                      f" in interface {intf_id}")
89
                sys.exit(1)
90
            evc_tags[intf_id].add(tag_z["value"])
91
            evc_intf[intf_id+str(tag_z["value"])] = evc["id"]
92
93
    intf_count = 0
94
    for document in intf_documents:
95
        avoid_tags = evc_tags.pop(document["id"], set())
96
        if document.get("available_vlans") is None:
97
            continue
98
        ranges = get_range(document["available_vlans"], avoid_tags)
99
        result = db.interface_details.update_one(
100
            {"id": document["id"]},
101
            {
102
                "$set":
103
                {
104
                    "available_tags": {"vlan": ranges},
105
                    "tag_ranges": {"vlan": get_tag_range(document["id"])}
106
                },
107
                "$unset": {"available_vlans": ""}
108
            }
109
        )
110
        intf_count += result.modified_count
111
112
    evc_intf_count = 0
113
    for intf_id, avoid_tags in evc_tags.items():
114
        available_tags = generate_ranges(list(avoid_tags), intf_id)
115
        utc_now = datetime.datetime.utcnow()
116
        result = db.interface_details.insert_one({
117
            "_id": intf_id,
118
            "id": intf_id,
119
            "inserted_at": utc_now,
120
            "updated_at": utc_now,
121
            "available_tags": {"vlan": available_tags},
122
            "tag_ranges": {"vlan": get_tag_range(intf_id)},
123
        })
124
        if result:
125
            evc_intf_count += 1
126
127
    print(f"{intf_count} documents modified. {evc_intf_count} documents inserted")
128
129
def aggregate_outdated_interfaces(mongo: Mongo):
130
    """Aggregate outdated inteface details"""
131
    db = mongo.client[mongo.db_name]
132
    document_ids = set()
133
    result = db.interface_details.aggregate(
134
        [
135
            {"$sort": {"_id": 1}},
136
            {"$project": {
137
                "_id": 0,
138
                "id": 1,
139
                "max_number": {"$max": "$available_vlans"}, # MAX deleted in 6.0
140
                "min_number": {"$min": "$available_vlans"}, # MIN deleted in 6.0
141
                "available_vlans": 1,
142
            }}
143
        ]
144
    )
145
    
146
    messages = ""
147
    for document in result:
148
        document_ids.add(document["id"])
149
        if document.get("available_vlans") is None:
150
            continue
151
        document.pop("available_vlans")
152
        messages += str(document) + "\n"
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
153
154
    if messages != "":
155
        print("Here are the outdated interfaces. 'available_vlans' have a massive"
156
          " amount of items, minimum and maximum items will be shown only")
157
        print(messages)
158
    
159
    evc_documents = db.evcs.find({"archived": False})
160
    evc_intf = defaultdict(str)
161
    evc_tags = defaultdict(set)
162
163
    for evc in evc_documents:
164
        tag_a = evc["uni_a"].get("tag")
165
        if tag_a:
166
            intf_id = evc["uni_a"]["interface_id"]
167
            if tag_a["value"] in evc_tags[intf_id] and isinstance(tag_a["value"], int):
168
                print(f"WARNING: Detected duplicated {tag_a['value']} TAG"
169
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_a['value'])]}"
170
                      f" in interface {intf_id}")
171
                print()
172
            evc_tags[intf_id].add(tag_a["value"])
173
            evc_intf[intf_id+str(tag_a["value"])] = evc["id"]
174
175
        tag_z = evc["uni_z"].get("tag")
176
        if tag_z:
177
            intf_id = evc["uni_z"]["interface_id"]
178
            if tag_z["value"] in evc_tags[intf_id] and isinstance(tag_z["value"], int):
179
                print(f"WARNING: Detected duplicated {tag_z['value']} TAG"
180
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_z['value'])]}"
181
                      f" in interface {intf_id}")
182
                print()
183
            evc_tags[intf_id].add(tag_z["value"])
184
            evc_intf[intf_id+str(tag_z["value"])] = evc["id"]
185
186
    for id_ in document_ids:
187
        evc_tags.pop(id_, None)
188
189
    if evc_tags:
190
        print("New documents are going to be created. From the next interfaces,"
191
              " these tags should be avoided")
192
193
    for intf, avoid_tags in evc_tags.items():
194
        if intf in document_ids:
195
            continue
196
        aux = {"id": intf, "avoid_tags": avoid_tags}
197
        print(aux)
198
199
    if not evc_tags and messages == "":
200
        print("There is nothing to update or add")
201
202
203 View Code Duplication
if __name__ == "__main__":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
204
    mongo = Mongo()
205
    cmds = {
206
        "aggregate_outdated_interfaces": aggregate_outdated_interfaces,
207
        "update_database": update_database,
208
    }
209
    try:
210
        cmd = os.environ["CMD"]
211
        cmds[cmd](mongo)
212
    except KeyError:
213
        print(
214
            f"Please set the 'CMD' env var. \nIt has to be one of these: {list(cmds.keys())}"
215
        )
216
        sys.exit(1)
217