Passed
Push — master ( 2ec8a9...67da38 )
by Vinicius
07:53 queued 06:30
created

build.kytos_api_helper.kytos_api()   C

Complexity

Conditions 11

Size

Total Lines 45
Code Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 118.053

Importance

Changes 0
Metric Value
cc 11
eloc 34
nop 9
dl 0
loc 45
ccs 1
cts 25
cp 0.04
crap 118.053
rs 5.4
c 0
b 0
f 0

How to fix   Complexity    Many Parameters   

Complexity

Complex classes like build.kytos_api_helper.kytos_api() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
10 1
from tenacity import (
11
    retry,
12
    retry_if_exception_type,
13
    stop_after_attempt,
14
    wait_combine,
15
    wait_fixed,
16
    wait_random,
17
)
18
19 1
from kytos.core.retry import before_sleep
20
21
22 1
@retry(
23
    stop=stop_after_attempt(5),
24
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
25
    before_sleep=before_sleep,
26
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
27
)
28 1
async def get_evcs() -> dict:
29
    """Get EVCs."""
30 1
    archived = "false"
31 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
32 1
        response = await client.get(f"/evc/?archived={archived}", timeout=10)
33 1
        if response.is_server_error:
34
            raise httpx.RequestError(response.text)
35 1
        if not response.is_success:
36
            raise UnrecoverableError(
37
                f"Failed to get_evcs archived {archived}"
38
                f"status code {response.status_code}, response text: {response.text}"
39
            )
40 1
        return response.json()
41
42
43 1
@retry(
44
    stop=stop_after_attempt(5),
45
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
46
    before_sleep=before_sleep,
47
    retry=retry_if_exception_type(httpx.RequestError),
48
)
49 1
async def get_evc(evc_id: str) -> dict:
50
    """Get EVC."""
51 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
52 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
53 1
        if response.status_code == 404:
54
            return {}
55 1
        if response.is_server_error:
56
            raise httpx.RequestError(response.text)
57 1
        if not response.is_success:
58
            raise UnrecoverableError(
59
                f"Failed to get_evc id {evc_id} "
60
                f"status code {response.status_code}, response text: {response.text}"
61
            )
62 1
        data = response.json()
63 1
        if data["archived"]:
64
            return {}
65 1
        return {data["id"]: data}
66
67
68 1
@retry(
69
    stop=stop_after_attempt(5),
70
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
71
    before_sleep=before_sleep,
72
    retry=retry_if_exception_type(httpx.RequestError),
73
)
74 1
async def get_stored_flows(
75
    cookies: list[int] = None,
76
) -> dict[int, list[dict]]:
77
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
78 1
    cookies = cookies or []
79
80 1
    cookie_range_args = []
81 1
    for cookie in cookies:
82
        # gte cookie
83 1
        cookie_range_args.append(f"cookie_range={cookie}")
84
        # lte cookie
85 1
        cookie_range_args.append(f"cookie_range={cookie}")
86
87 1
    endpoint = "stored_flows?state=installed&state=pending"
88 1
    if cookie_range_args:
89 1
        endpoint = f"{endpoint}&{'&'.join(cookie_range_args)}"
90
91 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
92 1
        response = await client.get(f"/{endpoint}", timeout=10)
93 1
        if response.is_server_error:
94
            raise httpx.RequestError(response.text)
95 1
        if not response.is_success:
96
            raise UnrecoverableError(
97
                f"Failed to get_stored_flows cookies {cookies} "
98
                f"status code {response.status_code}, response text: {response.text}"
99
            )
100 1
        return _map_stored_flows_by_cookies(response.json())
101
102
103 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
104
    """Map stored flows by cookies.
105
106
    This is for mapping the data by cookies, just to it can be
107
    reused upfront by bulk operations.
108
    """
109 1
    flows_by_cookies = defaultdict(list)
110 1
    for flows in stored_flows.values():
111 1
        for flow in flows:
112 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
113 1
    return flows_by_cookies
114
115
116 1
@retry(
117
    stop=stop_after_attempt(5),
118
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
119
    before_sleep=before_sleep,
120
    retry=retry_if_exception_type(httpx.RequestError),
121
)
122 1
async def add_evcs_metadata(
123
    evcs: dict[str, dict], new_metadata: dict, force=False
124
) -> dict:
125
    """Add EVC metadata."""
126
127
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
128
    # return early if there's no circuits to update their metadata
129
    if not circuit_ids:
130
        return {}
131
132
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
133
        response = await client.post(
134
            "/evc/metadata",
135
            timeout=10,
136
            json={
137
                **new_metadata,
138
                **{"circuit_ids": circuit_ids},
139
            },
140
        )
141
        if response.is_success:
142
            return response.json()
143
        # Ignore 404 if force just so it's easier to handle this concurrently
144
        if response.status_code == 404 and force:
145
            return {}
146
147
        if response.is_server_error:
148
            raise httpx.RequestError(response.text)
149
        raise UnrecoverableError(
150
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
151
            f"status code {response.status_code}, response text: {response.text}"
152
        )
153