Passed
Pull Request — master (#199)
by Aldo
10:20 queued 06:48
created

000_vlan_pool.aggregate_outdated_interfaces()   F

Complexity

Conditions 17

Size

Total Lines 72
Code Lines 53

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 17
eloc 53
nop 1
dl 0
loc 72
rs 1.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like 000_vlan_pool.aggregate_outdated_interfaces() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import datetime
2
import sys
3
import os
4
from collections import defaultdict
5
from kytos.core.db import Mongo
6
7
DEFAULT_TAG_RANGES = [[1, 4095]]
8
9
def get_range(vlans, avoid) -> list[list[int]]:
10
    """Convert available_vlans to available_tags.
11
    From list[int] to list[list[int]]"""
12
    result = []
13
    if not vlans:
14
        return result
15
    vlans.sort()
16
    i = 0
17
    while i < len(vlans):
18
        if vlans[i] in avoid:
19
            i += 1
20
        else:
21
            break
22
    if not vlans[i:]:
23
        return result
24
25
    start = end = vlans[i]
26
    for tag in vlans[i+1:]:
27
        if tag in avoid:
28
            continue
29
        if tag == end + 1:
30
            end = tag
31
        else:
32
            result.append([start, end])
33
            start = end = tag
34
    result.append([start, end])
35
    return result
36
37
def generate_ranges(avoid) -> [list[list[int]]]:
38
    """Generate available_tags only from avoid"""
39
    if not avoid:
40
        return DEFAULT_TAG_RANGES
41
42
    avoid.sort()
43
    ranges = []
44
    start = 1
45
46
    for num in avoid:
47
        if num > start:
48
            ranges.append([start, num - 1])
49
        start = num + 1
50
51
    if start <= 4095:
52
        ranges.append([start, 4095])
53
    return ranges
54
55
def update_database(mongo: Mongo):
56
    """Update database"""
57
    db = mongo.client[mongo.db_name]
58
    intf_documents = db.interface_details.find()
59
    evc_documents = db.evcs.find({"archived": False})
60
61
    evc_intf = defaultdict(str)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
62
    evc_tags = defaultdict(set)
63
64
    for evc in evc_documents:
65
        tag_a = evc["uni_a"].get("tag")
66
        if tag_a:
67
            intf_id = evc["uni_a"]["interface_id"]
68
            if tag_a["value"] in evc_tags[intf_id] and isinstance(tag_a["value"], int):
69
                print(f"Error: Detected duplicated {tag_a['value']} TAG"
70
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_a['value'])]}"
71
                      f" in interface {intf_id}")
72
                sys.exit(1)
73
            evc_tags[intf_id].add(tag_a["value"])
74
            evc_intf[intf_id+str(tag_a["value"])] = evc["id"]
75
76
        tag_z = evc["uni_z"].get("tag")
77
        if tag_z:
78
            intf_id = evc["uni_z"]["interface_id"]
79
            if tag_z["value"] in evc_tags[intf_id] and isinstance(tag_z["value"], int):
80
                print(f"Error: Detected duplicated {tag_z['value']} TAG"
81
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_z['value'])]}"
82
                      f" in interface {intf_id}")
83
                sys.exit(1)
84
            evc_tags[intf_id].add(tag_z["value"])
85
            evc_intf[intf_id+str(tag_z["value"])] = evc["id"]
86
87
    intf_count = 0
88
    for document in intf_documents:
89
        avoid_tags = evc_tags.pop(document["id"], set())
90
        if document.get("available_vlans") is None:
91
            continue
92
        ranges = get_range(document["available_vlans"], avoid_tags)
93
        result = db.interface_details.update_one(
94
            {"id": document["id"]},
95
            {
96
                "$set": 
97
                {
98
                    "available_tags": {"vlan": ranges},
99
                    "tag_ranges": {"vlan": DEFAULT_TAG_RANGES}
100
                },
101
                "$unset": {"available_vlans": ""}
102
            }
103
        )
104
        intf_count += result.modified_count
105
106
    evc_intf_count = 0
107
    for intf_id, avoid_tags in evc_tags.items():
108
        available_tags = generate_ranges(list(avoid_tags))
109
        utc_now = datetime.datetime.utcnow()
110
        result = db.interface_details.insert_one({
111
            "_id": intf_id,
112
            "id": intf_id,
113
            "inserted_at": utc_now,
114
            "updated_at": utc_now,
115
            "available_tags": {"vlan": available_tags},
116
            "tag_ranges": {"vlan": DEFAULT_TAG_RANGES},
117
        })
118
        if result:
119
            evc_intf_count += 1
120
121
    print(f"{intf_count} documents modified. {evc_intf_count} documents inserted")
122
123
def aggregate_outdated_interfaces(mongo: Mongo):
124
    """Aggregate outdated inteface details"""
125
    db = mongo.client[mongo.db_name]
126
    document_ids = set()
127
    result = db.interface_details.aggregate(
128
        [
129
            {"$sort": {"_id": 1}},
130
            {"$project": {
131
                "_id": 0,
132
                "id": 1,
133
                "max_number": {"$max": "$available_vlans"}, # MAX deleted in 6.0
134
                "min_number": {"$min": "$available_vlans"}, # MIN deleted in 6.0
135
                "available_vlans": 1,
136
            }}
137
        ]
138
    )
139
    
140
    messages = ""
141
    for document in result:
142
        document_ids.add(document["id"])
143
        if document.get("available_vlans") is None:
144
            continue
145
        document.pop("available_vlans")
146
        messages += str(document) + "\n"
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable str does not seem to be defined.
Loading history...
147
148
    if messages != "":
149
        print("Here are the outdated interfaces. 'available_vlans' have a massive"
150
          " amount of items, minimum and maximum items will be shown only")
151
        print(messages)
152
    
153
    evc_documents = db.evcs.find({"archived": False})
154
    evc_intf = defaultdict(str)
155
    evc_tags = defaultdict(set)
156
157
    for evc in evc_documents:
158
        tag_a = evc["uni_a"].get("tag")
159
        if tag_a:
160
            intf_id = evc["uni_a"]["interface_id"]
161
            if tag_a["value"] in evc_tags[intf_id] and isinstance(tag_a["value"], int):
162
                print(f"WARNING: Detected duplicated {tag_a['value']} TAG"
163
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_a['value'])]}"
164
                      f" in interface {intf_id}")
165
                print()
166
            evc_tags[intf_id].add(tag_a["value"])
167
            evc_intf[intf_id+str(tag_a["value"])] = evc["id"]
168
169
        tag_z = evc["uni_z"].get("tag")
170
        if tag_z:
171
            intf_id = evc["uni_z"]["interface_id"]
172
            if tag_z["value"] in evc_tags[intf_id] and isinstance(tag_z["value"], int):
173
                print(f"WARNING: Detected duplicated {tag_z['value']} TAG"
174
                      f" in EVCs {evc['id']} and {evc_intf[intf_id+str(tag_z['value'])]}"
175
                      f" in interface {intf_id}")
176
                print()
177
            evc_tags[intf_id].add(tag_z["value"])
178
            evc_intf[intf_id+str(tag_z["value"])] = evc["id"]
179
180
    for id_ in document_ids:
181
        evc_tags.pop(id_, None)
182
183
    if evc_tags:
184
        print("New documents are going to be created. From the next interfaces,"
185
              " these tags should be avoided")
186
187
    for intf, avoid_tags in evc_tags.items():
188
        if intf in document_ids:
189
            continue
190
        aux = {"id": intf, "avoid_tags": avoid_tags}
191
        print(aux)
192
193
    if not evc_tags and messages == "":
194
        print("There is nothing to update or add")
195
196
197 View Code Duplication
if __name__ == "__main__":
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
    mongo = Mongo()
199
    cmds = {
200
        "aggregate_outdated_interfaces": aggregate_outdated_interfaces,
201
        "update_database": update_database,
202
    }
203
    try:
204
        cmd = os.environ["CMD"]
205
        cmds[cmd](mongo)
206
    except KeyError:
207
        print(
208
            f"Please set the 'CMD' env var. \nIt has to be one of these: {list(cmds.keys())}"
209
        )
210
        sys.exit(1)
211