Test Failed
Pull Request — master (#583)
by
unknown
06:03 queued 20s
created

build.utils   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 246
Duplicated Lines 15.45 %

Test Coverage

Coverage 99.01%

Importance

Changes 0
Metric Value
eloc 167
dl 38
loc 246
ccs 100
cts 101
cp 0.9901
rs 8.5599
c 0
b 0
f 0
wmc 48

14 Functions

Rating   Name   Duplication   Size   Complexity  
A emit_event() 0 6 1
A prepare_delete_flow() 0 17 4
A compare_endpoint_trace() 0 11 3
A merge_flow_dicts() 0 11 4
B send_flow_mods_http() 38 38 7
B make_uni_list() 0 25 6
A send_flow_mods_event() 0 14 2
A max_power2_divisor() 0 5 2
A check_disabled_component() 0 13 5
A compare_uni_out_trace() 0 15 4
A map_dl_vlan() 0 14 3
A map_evc_event_content() 0 10 1
A aemit_event() 0 4 1
A get_vlan_tags_and_masks() 0 16 5

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Utility functions."""
2 1
from typing import Union
3
4 1
import httpx
5 1
from tenacity import (retry, retry_if_exception_type, stop_after_attempt,
6 1
                      wait_combine, wait_fixed, wait_random)
7 1
8
9
from kytos.core.common import EntityStatus
10 1
from kytos.core.events import KytosEvent
11
from kytos.core.interface import UNI, Interface, TAGRange
12 1
from kytos.core.retry import before_sleep
13
from napps.kytos.mef_eline import settings
14
from napps.kytos.mef_eline.exceptions import DisabledSwitch, FlowModException
15
16
17
def map_evc_event_content(evc, **kwargs) -> dict:
18
    """Returns a set of values from evc to be used for content"""
19
    return kwargs | {"evc_id": evc.id,
20
                     "id": evc.id,
21
                     "name": evc.name,
22 1
                     "metadata": evc.metadata,
23
                     "active": evc._active,
24
                     "enabled": evc._enabled,
25 1
                     "uni_a": evc.uni_a.as_dict(),
26 1
                     "uni_z": evc.uni_z.as_dict()}
27 1
28
29
def emit_event(controller, name, context="kytos/mef_eline", content=None,
30 1
               timeout=None):
31
    """Send an event when something happens with an EVC."""
32
    event_name = f"{context}.{name}"
33
    event = KytosEvent(name=event_name, content=content)
34 1
    controller.buffers.app.put(event, timeout=timeout)
35 1
36 1
37 1
def merge_flow_dicts(
38
    dst: dict[str, list], *srcs: dict[str, list]
39 1
) -> dict[str, list]:
40 1
    """Merge srcs dict flows into dst."""
41
    for src in srcs:
42
        for k, v in src.items():
43 1
            if k not in dst:
44
                dst[k] = v
45 1
            else:
46 1
                dst[k].extend(v)
47
    return dst
48
49 1
50
async def aemit_event(controller, name, content):
51 1
    """Send an asynchronous event"""
52 1
    event = KytosEvent(name=name, content=content)
53
    await controller.buffers.app.aput(event)
54
55
56
def compare_endpoint_trace(endpoint, vlan, trace):
57 1
    """Compare and endpoint with a trace step."""
58
    if vlan and "vlan" in trace:
59
        return (
60
            endpoint.switch.dpid == trace["dpid"]
61
            and endpoint.port_number == trace["port"]
62
            and vlan == trace["vlan"]
63 1
        )
64
    return (
65
        endpoint.switch.dpid == trace["dpid"]
66
        and endpoint.port_number == trace["port"]
67
    )
68 1
69 1
70 1
def map_dl_vlan(value: Union[str, int]) -> bool:
71 1
    """Map dl_vlan value with the following criteria:
72 1
    dl_vlan = untagged or 0 -> None
73 1
    dl_vlan = any or "4096/4096" -> 1
74 1
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
75 1
    special_untagged = {"untagged", 0}
76 1
    if value in special_untagged:
77
        return None
78
    special_any = {"any": 1, "4096/4096": 1}
79 1
    value = special_any.get(value, value)
80
    if isinstance(value, int):
81
        return value
82
    value, mask = map(int, value.split('/'))
83
    return value & (mask & 4095)
84
85
86 1
def compare_uni_out_trace(
87 1
    tag_value: Union[None, int, str],
88 1
    interface: Interface,
89 1
    trace: dict
90 1
) -> bool:
91 1
    """Check if the trace last step (output) matches the UNI attributes."""
92
    # keep compatibility for old versions of sdntrace-cp
93
    if "out" not in trace:
94
        return True
95
    if not isinstance(trace["out"], dict):
96
        return False
97 1
    uni_vlan = map_dl_vlan(tag_value) if tag_value else None
98
    return (
99 1
        interface.port_number == trace["out"].get("port")
100 1
        and uni_vlan == trace["out"].get("vlan")
101 1
    )
102
103
104 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
105
    """Get the max power of 2 that is divisor of number"""
106 1
    while number % limit > 0:
107 1
        limit //= 2
108 1
    return limit
109 1
110 1
111 1
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
112 1
    """Get a list of vlan/mask pairs for a given list of ranges."""
113 1
    masks_list = []
114 1
    for start, end in tag_ranges:
115 1
        limit = end + 1
116
        while start < limit:
117 1
            divisor = max_power2_divisor(start)
118 1
            while divisor > limit - start:
119 1
                divisor //= 2
120
            mask = 4096 - divisor
121
            if mask == 4095:
122 1
                masks_list.append(start)
123
            else:
124 1
                masks_list.append(f"{start}/{mask}")
125 1
            start += divisor
126 1
    return masks_list
127 1
128 1
129 1
def check_disabled_component(uni_a: UNI, uni_z: UNI):
130 1
    """Check if a switch or an interface is disabled"""
131 1
    if uni_a.interface.switch != uni_z.interface.switch:
132 1
        return
133 1
    if uni_a.interface.switch.status == EntityStatus.DISABLED:
134 1
        dpid = uni_a.interface.switch.dpid
135
        raise DisabledSwitch(f"Switch {dpid} is disabled")
136
    if uni_a.interface.status == EntityStatus.DISABLED:
137 1
        id_ = uni_a.interface.id
138
        raise DisabledSwitch(f"Interface {id_} is disabled")
139 1
    if uni_z.interface.status == EntityStatus.DISABLED:
140 1
        id_ = uni_z.interface.id
141 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
142
143 1
144
def make_uni_list(list_circuits: list) -> list:
145 1
    """Make uni list to be sent to sdntrace"""
146 1
    uni_list = []
147 1
    for circuit in list_circuits:
148
        if isinstance(circuit.uni_a.user_tag, TAGRange):
149 1
            # TAGRange value from uni_a and uni_z are currently mirrored
150 1
            mask_list = (circuit.uni_a.user_tag.mask_list or
151 1
                         circuit.uni_z.user_tag.mask_list)
152 1
            for mask in mask_list:
153
                uni_list.append((circuit.uni_a.interface, mask))
154
                uni_list.append((circuit.uni_z.interface, mask))
155 1
        else:
156 1
            tag_a = None
157 1
            if circuit.uni_a.user_tag:
158 1
                tag_a = circuit.uni_a.user_tag.value
159
            uni_list.append(
160
                (circuit.uni_a.interface, tag_a)
161 1
            )
162
            tag_z = None
163
            if circuit.uni_z.user_tag:
164 1
                tag_z = circuit.uni_z.user_tag.value
165
            uni_list.append(
166
                (circuit.uni_z.interface, tag_z)
167
            )
168
    return uni_list
169 1
170 1
171
def send_flow_mods_event(
172
    controller, flow_dict: dict[str, list], action: str, force=True
173
):
174
    """Send flow mods to be deleted or install to flow_manager
175
     through an event"""
176
    for dpid, flows in flow_dict.items():
177
        emit_event(
178
            controller,
179
            context="kytos.flow_manager",
180
            name=f"flows.{action}",
181
            content={
182 1
                "dpid": dpid,
183
                "flow_dict": {"flows": flows},
184 1
                "force": force,
185
            },
186 1
        )
187
188
189 1
def prepare_delete_flow(evc_flows: dict[str, list[dict]]):
190 1
    """Create flow mods suited for flow deletion."""
191 1
    dpid_flows: dict[str, list[dict]] = {}
192 1
193
    if not evc_flows:
194
        return dpid_flows
195
196
    for dpid, flows in evc_flows.items():
197
        dpid_flows.setdefault(dpid, [])
198 1
        for flow in flows:
199
            dpid_flows[dpid].append({
200
                "cookie": flow["cookie"],
201
                "match": flow["match"],
202
                "owner": "mef_eline",
203
                "cookie_mask": int(0xffffffffffffffff)
204
            })
205
    return dpid_flows
206
207
208 View Code Duplication
@retry(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
209
    stop=stop_after_attempt(3),
210
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
211
    retry=retry_if_exception_type(FlowModException),
212
    before_sleep=before_sleep,
213
    reraise=True,
214
)
215
def send_flow_mods_http(
216
    data_content: dict,
217
    command="install",
218
    force=False,
219
    by_switch=False
220
):
221
    """Send a flow_mod list to a specific switch.
222
223
    Args:
224
        dpid(str): The target of flows (i.e. Switch.id).
225
        flow_mods(dict): Python dictionary with flow_mods.
226
        command(str): By default is 'flows'. To remove a flow is 'remove'.
227
        force(bool): True to send via consistency check in case of errors.
228
        by_switch(bool): True to send to 'flows_by_switch' request instead.
229
    """
230
    if by_switch:
231
        endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}"
232
    else:
233
        endpoint = f"{settings.MANAGER_URL}/flows"
234
        data_content["force"] = force
235
    try:
236
        if command == "install":
237
            res = httpx.post(endpoint, json=data_content, timeout=30)
238
        elif command == "delete":
239
            res = httpx.request(
240
                "DELETE", endpoint, json=data_content, timeout=30
241
            )
242
    except httpx.RequestError as err:
243
        raise FlowModException(str(err)) from err
244
    if res.is_server_error or res.status_code >= 400:
0 ignored issues
show
introduced by
The variable res does not seem to be defined for all execution paths.
Loading history...
245
        raise FlowModException(res.text)
246