Passed
Pull Request — master (#626)
by Aldo
08:48 queued 04:35
created

build.utils.merge_flow_dicts()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 8
nop 2
dl 0
loc 11
ccs 7
cts 7
cp 1
crap 4
rs 10
c 0
b 0
f 0
1
"""Utility functions."""
2 1
from typing import Union
3
4 1
from kytos.core.common import EntityStatus
5 1
from kytos.core.events import KytosEvent
6 1
from kytos.core.interface import UNI, Interface, TAGRange
7 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch
8
9
10 1
def map_evc_event_content(evc, **kwargs) -> dict:
11
    """Returns a set of values from evc to be used for content"""
12 1
    return kwargs | {"evc_id": evc.id,
13
                     "id": evc.id,
14
                     "name": evc.name,
15
                     "metadata": evc.metadata,
16
                     "active": evc._active,
17
                     "enabled": evc._enabled,
18
                     "uni_a": evc.uni_a.as_dict(),
19
                     "uni_z": evc.uni_z.as_dict()}
20
21
22 1
def emit_event(controller, name, context="kytos/mef_eline", content=None,
23
               timeout=None):
24
    """Send an event when something happens with an EVC."""
25 1
    event_name = f"{context}.{name}"
26 1
    event = KytosEvent(name=event_name, content=content)
27 1
    controller.buffers.app.put(event, timeout=timeout)
28
29
30 1
def merge_flow_dicts(
31
    dst: dict[str, list], *srcs: list[dict[str, list]]
32
) -> dict[str, list]:
33
    """Merge srcs dict flows into dst."""
34 1
    for src in srcs:
35 1
        for k, v in src.items():
36 1
            if k not in dst:
37 1
                dst[k] = v
38
            else:
39 1
                dst[k].extend(v)
40 1
    return dst
41
42
43 1
async def aemit_event(controller, name, content):
44
    """Send an asynchronous event"""
45 1
    event = KytosEvent(name=name, content=content)
46 1
    await controller.buffers.app.aput(event)
47
48
49 1
def compare_endpoint_trace(endpoint, vlan, trace):
50
    """Compare and endpoint with a trace step."""
51 1
    if vlan and "vlan" in trace:
52 1
        return (
53
            endpoint.switch.dpid == trace["dpid"]
54
            and endpoint.port_number == trace["port"]
55
            and vlan == trace["vlan"]
56
        )
57 1
    return (
58
        endpoint.switch.dpid == trace["dpid"]
59
        and endpoint.port_number == trace["port"]
60
    )
61
62
63 1
def map_dl_vlan(value: Union[str, int]) -> bool:
64
    """Map dl_vlan value with the following criteria:
65
    dl_vlan = untagged or 0 -> None
66
    dl_vlan = any or "4096/4096" -> 1
67
    dl_vlan = "num1/num2" -> int in [1, 4095]"""
68 1
    special_untagged = {"untagged", 0}
69 1
    if value in special_untagged:
70 1
        return None
71 1
    special_any = {"any": 1, "4096/4096": 1}
72 1
    value = special_any.get(value, value)
73 1
    if isinstance(value, int):
74 1
        return value
75 1
    value, mask = map(int, value.split('/'))
76 1
    return value & (mask & 4095)
77
78
79 1
def compare_uni_out_trace(
80
    tag_value: Union[None, int, str],
81
    interface: Interface,
82
    trace: dict
83
) -> bool:
84
    """Check if the trace last step (output) matches the UNI attributes."""
85
    # keep compatibility for old versions of sdntrace-cp
86 1
    if "out" not in trace:
87 1
        return True
88 1
    if not isinstance(trace["out"], dict):
89 1
        return False
90 1
    uni_vlan = map_dl_vlan(tag_value) if tag_value else None
91 1
    return (
92
        interface.port_number == trace["out"].get("port")
93
        and uni_vlan == trace["out"].get("vlan")
94
    )
95
96
97 1
def max_power2_divisor(number: int, limit: int = 4096) -> int:
98
    """Get the max power of 2 that is divisor of number"""
99 1
    while number % limit > 0:
100 1
        limit //= 2
101 1
    return limit
102
103
104 1
def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]:
105
    """Get a list of vlan/mask pairs for a given list of ranges."""
106 1
    masks_list = []
107 1
    for start, end in tag_ranges:
108 1
        limit = end + 1
109 1
        while start < limit:
110 1
            divisor = max_power2_divisor(start)
111 1
            while divisor > limit - start:
112 1
                divisor //= 2
113 1
            mask = 4096 - divisor
114 1
            if mask == 4095:
115 1
                masks_list.append(start)
116
            else:
117 1
                masks_list.append(f"{start}/{mask}")
118 1
            start += divisor
119 1
    return masks_list
120
121
122 1
def check_disabled_component(uni_a: UNI, uni_z: UNI):
123
    """Check if a switch or an interface is disabled"""
124 1
    if uni_a.interface.switch != uni_z.interface.switch:
125 1
        return
126 1
    if uni_a.interface.switch.status == EntityStatus.DISABLED:
127 1
        dpid = uni_a.interface.switch.dpid
128 1
        raise DisabledSwitch(f"Switch {dpid} is disabled")
129 1
    if uni_a.interface.status == EntityStatus.DISABLED:
130 1
        id_ = uni_a.interface.id
131 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
132 1
    if uni_z.interface.status == EntityStatus.DISABLED:
133 1
        id_ = uni_z.interface.id
134 1
        raise DisabledSwitch(f"Interface {id_} is disabled")
135
136
137 1
def make_uni_list(list_circuits: list) -> list:
138
    """Make uni list to be sent to sdntrace"""
139 1
    uni_list = []
140 1
    for circuit in list_circuits:
141 1
        if isinstance(circuit.uni_a.user_tag, TAGRange):
142
            # TAGRange value from uni_a and uni_z are currently mirrored
143 1
            mask_list = (circuit.uni_a.user_tag.mask_list or
144
                         circuit.uni_z.user_tag.mask_list)
145 1
            for mask in mask_list:
146 1
                uni_list.append((circuit.uni_a.interface, mask))
147 1
                uni_list.append((circuit.uni_z.interface, mask))
148
        else:
149 1
            tag_a = None
150 1
            if circuit.uni_a.user_tag:
151 1
                tag_a = circuit.uni_a.user_tag.value
152 1
            uni_list.append(
153
                (circuit.uni_a.interface, tag_a)
154
            )
155 1
            tag_z = None
156 1
            if circuit.uni_z.user_tag:
157 1
                tag_z = circuit.uni_z.user_tag.value
158 1
            uni_list.append(
159
                (circuit.uni_z.interface, tag_z)
160
            )
161 1
    return uni_list
162
163
164 1
def send_flow_mods_event(
165
    controller, flow_dict: dict[str, list], action: str, force=True
166
):
167
    """Send flow mods to be deleted or install to flow_manager
168
     through an event"""
169 1
    for dpid, flows in flow_dict.items():
170 1
        emit_event(
171
            controller,
172
            context="kytos.flow_manager",
173
            name=f"flows.{action}",
174
            content={
175
                "dpid": dpid,
176
                "flow_dict": {"flows": flows},
177
                "force": force,
178
            },
179
        )
180
181
182 1
def prepare_delete_flow(evc_flows: dict[str, list[dict]]):
183
    """Create flow mods suited for flow deletion."""
184 1
    dpid_flows: dict[str, list[dict]] = {}
185
186 1
    if not evc_flows:
187
        return dpid_flows
188
189 1
    for dpid, flows in evc_flows.items():
190 1
        dpid_flows.setdefault(dpid, [])
191 1
        for flow in flows:
192 1
            dpid_flows[dpid].append({
193
                "cookie": flow["cookie"],
194
                "match": flow["match"],
195
                "owner": "mef_eline",
196
                "cookie_mask": int(0xffffffffffffffff)
197
            })
198 1
    return dpid_flows
199
200
201 1
def _does_uni_affect_evc(evc, interface: Interface, link_event: str) -> bool:
202
    """Check if an interface flap is affecting an EVC UNI."""
203 1
    interface_a = evc.uni_a.interface
204 1
    interface_z = evc.uni_z.interface
205 1
    interface_affected = interface in (interface_a, interface_z)
206 1
    interface_down = (EntityStatus.DOWN in
207
                      (interface_a.status, interface_z.status))
208 1
    if link_event == "up":
209 1
        return (not evc.is_active() and interface_affected
210
                and not interface_down)
211 1
    if link_event == "down":
212 1
        return evc.is_active() and interface_affected and interface_down
213
    return False
214