Passed
Push — master ( 2ec8a9...67da38 )
by Vinicius
07:53 queued 06:30
created

build.kytos_api_helper   A

Complexity

Total Complexity 26

Size/Duplication

Total Lines 151
Duplicated Lines 0 %

Test Coverage

Coverage 70.15%

Importance

Changes 0
Metric Value
eloc 102
dl 0
loc 151
ccs 47
cts 67
cp 0.7015
rs 10
c 0
b 0
f 0
wmc 26

5 Functions

Rating   Name   Duplication   Size   Complexity  
B get_evc() 0 23 6
A _map_stored_flows_by_cookies() 0 11 3
B add_evcs_metadata() 0 35 7
B get_stored_flows() 0 33 6
A get_evcs() 0 19 4
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
10 1
from tenacity import (
11
    retry,
12
    retry_if_exception_type,
13
    stop_after_attempt,
14
    wait_combine,
15
    wait_fixed,
16
    wait_random,
17
)
18
19 1
from kytos.core.retry import before_sleep
20
21
22 1
@retry(
23
    stop=stop_after_attempt(5),
24
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
25
    before_sleep=before_sleep,
26
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
27
)
28 1
async def get_evcs() -> dict:
29
    """Get EVCs."""
30 1
    archived = "false"
31 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
32 1
        response = await client.get(f"/evc/?archived={archived}", timeout=10)
33 1
        if response.is_server_error:
34
            raise httpx.RequestError(response.text)
35 1
        if not response.is_success:
36
            raise UnrecoverableError(
37
                f"Failed to get_evcs archived {archived}"
38
                f"status code {response.status_code}, response text: {response.text}"
39
            )
40 1
        return response.json()
41
42
43 1
@retry(
44
    stop=stop_after_attempt(5),
45
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
46
    before_sleep=before_sleep,
47
    retry=retry_if_exception_type(httpx.RequestError),
48
)
49 1
async def get_evc(evc_id: str) -> dict:
50
    """Get EVC."""
51 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
52 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
53 1
        if response.status_code == 404:
54
            return {}
55 1
        if response.is_server_error:
56
            raise httpx.RequestError(response.text)
57 1
        if not response.is_success:
58
            raise UnrecoverableError(
59
                f"Failed to get_evc id {evc_id} "
60
                f"status code {response.status_code}, response text: {response.text}"
61
            )
62 1
        data = response.json()
63 1
        if data["archived"]:
64
            return {}
65 1
        return {data["id"]: data}
66
67
68 1
@retry(
69
    stop=stop_after_attempt(5),
70
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
71
    before_sleep=before_sleep,
72
    retry=retry_if_exception_type(httpx.RequestError),
73
)
74 1
async def get_stored_flows(
75
    cookies: list[int] = None,
76
) -> dict[int, list[dict]]:
77
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
78 1
    cookies = cookies or []
79
80 1
    cookie_range_args = []
81 1
    for cookie in cookies:
82
        # gte cookie
83 1
        cookie_range_args.append(f"cookie_range={cookie}")
84
        # lte cookie
85 1
        cookie_range_args.append(f"cookie_range={cookie}")
86
87 1
    endpoint = "stored_flows?state=installed&state=pending"
88 1
    if cookie_range_args:
89 1
        endpoint = f"{endpoint}&{'&'.join(cookie_range_args)}"
90
91 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
92 1
        response = await client.get(f"/{endpoint}", timeout=10)
93 1
        if response.is_server_error:
94
            raise httpx.RequestError(response.text)
95 1
        if not response.is_success:
96
            raise UnrecoverableError(
97
                f"Failed to get_stored_flows cookies {cookies} "
98
                f"status code {response.status_code}, response text: {response.text}"
99
            )
100 1
        return _map_stored_flows_by_cookies(response.json())
101
102
103 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
104
    """Map stored flows by cookies.
105
106
    This is for mapping the data by cookies, just to it can be
107
    reused upfront by bulk operations.
108
    """
109 1
    flows_by_cookies = defaultdict(list)
110 1
    for flows in stored_flows.values():
111 1
        for flow in flows:
112 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
113 1
    return flows_by_cookies
114
115
116 1
@retry(
117
    stop=stop_after_attempt(5),
118
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
119
    before_sleep=before_sleep,
120
    retry=retry_if_exception_type(httpx.RequestError),
121
)
122 1
async def add_evcs_metadata(
123
    evcs: dict[str, dict], new_metadata: dict, force=False
124
) -> dict:
125
    """Add EVC metadata."""
126
127
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
128
    # return early if there's no circuits to update their metadata
129
    if not circuit_ids:
130
        return {}
131
132
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
133
        response = await client.post(
134
            "/evc/metadata",
135
            timeout=10,
136
            json={
137
                **new_metadata,
138
                **{"circuit_ids": circuit_ids},
139
            },
140
        )
141
        if response.is_success:
142
            return response.json()
143
        # Ignore 404 if force just so it's easier to handle this concurrently
144
        if response.status_code == 404 and force:
145
            return {}
146
147
        if response.is_server_error:
148
            raise httpx.RequestError(response.text)
149
        raise UnrecoverableError(
150
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
151
            f"status code {response.status_code}, response text: {response.text}"
152
        )
153