| Total Complexity | 48 |
| Total Lines | 246 |
| Duplicated Lines | 15.45 % |
| Coverage | 99.01% |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like build.utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Utility functions.""" |
||
| 2 | 1 | from typing import Union |
|
| 3 | |||
| 4 | 1 | import httpx |
|
| 5 | 1 | from tenacity import (retry, retry_if_exception_type, stop_after_attempt, |
|
| 6 | 1 | wait_combine, wait_fixed, wait_random) |
|
| 7 | 1 | ||
| 8 | |||
| 9 | from kytos.core.common import EntityStatus |
||
| 10 | 1 | from kytos.core.events import KytosEvent |
|
| 11 | from kytos.core.interface import UNI, Interface, TAGRange |
||
| 12 | 1 | from kytos.core.retry import before_sleep |
|
| 13 | from napps.kytos.mef_eline import settings |
||
| 14 | from napps.kytos.mef_eline.exceptions import DisabledSwitch, FlowModException |
||
| 15 | |||
| 16 | |||
| 17 | def map_evc_event_content(evc, **kwargs) -> dict: |
||
| 18 | """Returns a set of values from evc to be used for content""" |
||
| 19 | return kwargs | {"evc_id": evc.id, |
||
| 20 | "id": evc.id, |
||
| 21 | "name": evc.name, |
||
| 22 | 1 | "metadata": evc.metadata, |
|
| 23 | "active": evc._active, |
||
| 24 | "enabled": evc._enabled, |
||
| 25 | 1 | "uni_a": evc.uni_a.as_dict(), |
|
| 26 | 1 | "uni_z": evc.uni_z.as_dict()} |
|
| 27 | 1 | ||
| 28 | |||
| 29 | def emit_event(controller, name, context="kytos/mef_eline", content=None, |
||
| 30 | 1 | timeout=None): |
|
| 31 | """Send an event when something happens with an EVC.""" |
||
| 32 | event_name = f"{context}.{name}" |
||
| 33 | event = KytosEvent(name=event_name, content=content) |
||
| 34 | 1 | controller.buffers.app.put(event, timeout=timeout) |
|
| 35 | 1 | ||
| 36 | 1 | ||
| 37 | 1 | def merge_flow_dicts( |
|
| 38 | dst: dict[str, list], *srcs: dict[str, list] |
||
| 39 | 1 | ) -> dict[str, list]: |
|
| 40 | 1 | """Merge srcs dict flows into dst.""" |
|
| 41 | for src in srcs: |
||
| 42 | for k, v in src.items(): |
||
| 43 | 1 | if k not in dst: |
|
| 44 | dst[k] = v |
||
| 45 | 1 | else: |
|
| 46 | 1 | dst[k].extend(v) |
|
| 47 | return dst |
||
| 48 | |||
| 49 | 1 | ||
| 50 | async def aemit_event(controller, name, content): |
||
| 51 | 1 | """Send an asynchronous event""" |
|
| 52 | 1 | event = KytosEvent(name=name, content=content) |
|
| 53 | await controller.buffers.app.aput(event) |
||
| 54 | |||
| 55 | |||
| 56 | def compare_endpoint_trace(endpoint, vlan, trace): |
||
| 57 | 1 | """Compare and endpoint with a trace step.""" |
|
| 58 | if vlan and "vlan" in trace: |
||
| 59 | return ( |
||
| 60 | endpoint.switch.dpid == trace["dpid"] |
||
| 61 | and endpoint.port_number == trace["port"] |
||
| 62 | and vlan == trace["vlan"] |
||
| 63 | 1 | ) |
|
| 64 | return ( |
||
| 65 | endpoint.switch.dpid == trace["dpid"] |
||
| 66 | and endpoint.port_number == trace["port"] |
||
| 67 | ) |
||
| 68 | 1 | ||
| 69 | 1 | ||
| 70 | 1 | def map_dl_vlan(value: Union[str, int]) -> bool: |
|
| 71 | 1 | """Map dl_vlan value with the following criteria: |
|
| 72 | 1 | dl_vlan = untagged or 0 -> None |
|
| 73 | 1 | dl_vlan = any or "4096/4096" -> 1 |
|
| 74 | 1 | dl_vlan = "num1/num2" -> int in [1, 4095]""" |
|
| 75 | 1 | special_untagged = {"untagged", 0} |
|
| 76 | 1 | if value in special_untagged: |
|
| 77 | return None |
||
| 78 | special_any = {"any": 1, "4096/4096": 1} |
||
| 79 | 1 | value = special_any.get(value, value) |
|
| 80 | if isinstance(value, int): |
||
| 81 | return value |
||
| 82 | value, mask = map(int, value.split('/')) |
||
| 83 | return value & (mask & 4095) |
||
| 84 | |||
| 85 | |||
| 86 | 1 | def compare_uni_out_trace( |
|
| 87 | 1 | tag_value: Union[None, int, str], |
|
| 88 | 1 | interface: Interface, |
|
| 89 | 1 | trace: dict |
|
| 90 | 1 | ) -> bool: |
|
| 91 | 1 | """Check if the trace last step (output) matches the UNI attributes.""" |
|
| 92 | # keep compatibility for old versions of sdntrace-cp |
||
| 93 | if "out" not in trace: |
||
| 94 | return True |
||
| 95 | if not isinstance(trace["out"], dict): |
||
| 96 | return False |
||
| 97 | 1 | uni_vlan = map_dl_vlan(tag_value) if tag_value else None |
|
| 98 | return ( |
||
| 99 | 1 | interface.port_number == trace["out"].get("port") |
|
| 100 | 1 | and uni_vlan == trace["out"].get("vlan") |
|
| 101 | 1 | ) |
|
| 102 | |||
| 103 | |||
| 104 | 1 | def max_power2_divisor(number: int, limit: int = 4096) -> int: |
|
| 105 | """Get the max power of 2 that is divisor of number""" |
||
| 106 | 1 | while number % limit > 0: |
|
| 107 | 1 | limit //= 2 |
|
| 108 | 1 | return limit |
|
| 109 | 1 | ||
| 110 | 1 | ||
| 111 | 1 | def get_vlan_tags_and_masks(tag_ranges: list[list[int]]) -> list[int, str]: |
|
| 112 | 1 | """Get a list of vlan/mask pairs for a given list of ranges.""" |
|
| 113 | 1 | masks_list = [] |
|
| 114 | 1 | for start, end in tag_ranges: |
|
| 115 | 1 | limit = end + 1 |
|
| 116 | while start < limit: |
||
| 117 | 1 | divisor = max_power2_divisor(start) |
|
| 118 | 1 | while divisor > limit - start: |
|
| 119 | 1 | divisor //= 2 |
|
| 120 | mask = 4096 - divisor |
||
| 121 | if mask == 4095: |
||
| 122 | 1 | masks_list.append(start) |
|
| 123 | else: |
||
| 124 | 1 | masks_list.append(f"{start}/{mask}") |
|
| 125 | 1 | start += divisor |
|
| 126 | 1 | return masks_list |
|
| 127 | 1 | ||
| 128 | 1 | ||
| 129 | 1 | def check_disabled_component(uni_a: UNI, uni_z: UNI): |
|
| 130 | 1 | """Check if a switch or an interface is disabled""" |
|
| 131 | 1 | if uni_a.interface.switch != uni_z.interface.switch: |
|
| 132 | 1 | return |
|
| 133 | 1 | if uni_a.interface.switch.status == EntityStatus.DISABLED: |
|
| 134 | 1 | dpid = uni_a.interface.switch.dpid |
|
| 135 | raise DisabledSwitch(f"Switch {dpid} is disabled") |
||
| 136 | if uni_a.interface.status == EntityStatus.DISABLED: |
||
| 137 | 1 | id_ = uni_a.interface.id |
|
| 138 | raise DisabledSwitch(f"Interface {id_} is disabled") |
||
| 139 | 1 | if uni_z.interface.status == EntityStatus.DISABLED: |
|
| 140 | 1 | id_ = uni_z.interface.id |
|
| 141 | 1 | raise DisabledSwitch(f"Interface {id_} is disabled") |
|
| 142 | |||
| 143 | 1 | ||
| 144 | def make_uni_list(list_circuits: list) -> list: |
||
| 145 | 1 | """Make uni list to be sent to sdntrace""" |
|
| 146 | 1 | uni_list = [] |
|
| 147 | 1 | for circuit in list_circuits: |
|
| 148 | if isinstance(circuit.uni_a.user_tag, TAGRange): |
||
| 149 | 1 | # TAGRange value from uni_a and uni_z are currently mirrored |
|
| 150 | 1 | mask_list = (circuit.uni_a.user_tag.mask_list or |
|
| 151 | 1 | circuit.uni_z.user_tag.mask_list) |
|
| 152 | 1 | for mask in mask_list: |
|
| 153 | uni_list.append((circuit.uni_a.interface, mask)) |
||
| 154 | uni_list.append((circuit.uni_z.interface, mask)) |
||
| 155 | 1 | else: |
|
| 156 | 1 | tag_a = None |
|
| 157 | 1 | if circuit.uni_a.user_tag: |
|
| 158 | 1 | tag_a = circuit.uni_a.user_tag.value |
|
| 159 | uni_list.append( |
||
| 160 | (circuit.uni_a.interface, tag_a) |
||
| 161 | 1 | ) |
|
| 162 | tag_z = None |
||
| 163 | if circuit.uni_z.user_tag: |
||
| 164 | 1 | tag_z = circuit.uni_z.user_tag.value |
|
| 165 | uni_list.append( |
||
| 166 | (circuit.uni_z.interface, tag_z) |
||
| 167 | ) |
||
| 168 | return uni_list |
||
| 169 | 1 | ||
| 170 | 1 | ||
| 171 | def send_flow_mods_event( |
||
| 172 | controller, flow_dict: dict[str, list], action: str, force=True |
||
| 173 | ): |
||
| 174 | """Send flow mods to be deleted or install to flow_manager |
||
| 175 | through an event""" |
||
| 176 | for dpid, flows in flow_dict.items(): |
||
| 177 | emit_event( |
||
| 178 | controller, |
||
| 179 | context="kytos.flow_manager", |
||
| 180 | name=f"flows.{action}", |
||
| 181 | content={ |
||
| 182 | 1 | "dpid": dpid, |
|
| 183 | "flow_dict": {"flows": flows}, |
||
| 184 | 1 | "force": force, |
|
| 185 | }, |
||
| 186 | 1 | ) |
|
| 187 | |||
| 188 | |||
| 189 | 1 | def prepare_delete_flow(evc_flows: dict[str, list[dict]]): |
|
| 190 | 1 | """Create flow mods suited for flow deletion.""" |
|
| 191 | 1 | dpid_flows: dict[str, list[dict]] = {} |
|
| 192 | 1 | ||
| 193 | if not evc_flows: |
||
| 194 | return dpid_flows |
||
| 195 | |||
| 196 | for dpid, flows in evc_flows.items(): |
||
| 197 | dpid_flows.setdefault(dpid, []) |
||
| 198 | 1 | for flow in flows: |
|
| 199 | dpid_flows[dpid].append({ |
||
| 200 | "cookie": flow["cookie"], |
||
| 201 | "match": flow["match"], |
||
| 202 | "owner": "mef_eline", |
||
| 203 | "cookie_mask": int(0xffffffffffffffff) |
||
| 204 | }) |
||
| 205 | return dpid_flows |
||
| 206 | |||
| 207 | |||
| 208 | View Code Duplication | @retry( |
|
|
|
|||
| 209 | stop=stop_after_attempt(3), |
||
| 210 | wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)), |
||
| 211 | retry=retry_if_exception_type(FlowModException), |
||
| 212 | before_sleep=before_sleep, |
||
| 213 | reraise=True, |
||
| 214 | ) |
||
| 215 | def send_flow_mods_http( |
||
| 216 | data_content: dict, |
||
| 217 | command="install", |
||
| 218 | force=False, |
||
| 219 | by_switch=False |
||
| 220 | ): |
||
| 221 | """Send a flow_mod list to a specific switch. |
||
| 222 | |||
| 223 | Args: |
||
| 224 | dpid(str): The target of flows (i.e. Switch.id). |
||
| 225 | flow_mods(dict): Python dictionary with flow_mods. |
||
| 226 | command(str): By default is 'flows'. To remove a flow is 'remove'. |
||
| 227 | force(bool): True to send via consistency check in case of errors. |
||
| 228 | by_switch(bool): True to send to 'flows_by_switch' request instead. |
||
| 229 | """ |
||
| 230 | if by_switch: |
||
| 231 | endpoint = f"{settings.MANAGER_URL}/flows_by_switch/?force={force}" |
||
| 232 | else: |
||
| 233 | endpoint = f"{settings.MANAGER_URL}/flows" |
||
| 234 | data_content["force"] = force |
||
| 235 | try: |
||
| 236 | if command == "install": |
||
| 237 | res = httpx.post(endpoint, json=data_content, timeout=30) |
||
| 238 | elif command == "delete": |
||
| 239 | res = httpx.request( |
||
| 240 | "DELETE", endpoint, json=data_content, timeout=30 |
||
| 241 | ) |
||
| 242 | except httpx.RequestError as err: |
||
| 243 | raise FlowModException(str(err)) from err |
||
| 244 | if res.is_server_error or res.status_code >= 400: |
||
| 245 | raise FlowModException(res.text) |
||
| 246 |