Passed
Pull Request — master (#35)
by Vinicius
02:28
created

build.kytos_api_helper.add_evcs_metadata()   B

Complexity

Conditions 6

Size

Total Lines 29
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 25.7188

Importance

Changes 0
Metric Value
cc 6
eloc 22
nop 3
dl 0
loc 29
ccs 2
cts 11
cp 0.1818
crap 25.7188
rs 8.4186
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
10 1
from tenacity import (
11
    retry,
12
    retry_if_exception_type,
13
    stop_after_attempt,
14
    wait_combine,
15
    wait_fixed,
16
    wait_random,
17
)
18
19 1
from kytos.core.retry import before_sleep
20
21
22 1
@retry(
23
    stop=stop_after_attempt(5),
24
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
25
    before_sleep=before_sleep,
26
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
27
)
28 1
async def get_evcs(archived="false") -> dict:
29
    """Get EVCs."""
30 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
31 1
        response = await client.get(f"/evc/?archived={archived}", timeout=10)
32 1
        if response.is_server_error:
33
            raise httpx.RequestError(response.text)
34 1
        if not response.is_success:
35
            raise UnrecoverableError(
36
                f"Failed to get_evcs archived {archived}"
37
                f"status code {response.status_code}, response text: {response.text}"
38
            )
39 1
        return response.json()
40
41
42 1
@retry(
43
    stop=stop_after_attempt(5),
44
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
45
    before_sleep=before_sleep,
46
    retry=retry_if_exception_type(httpx.RequestError),
47
)
48 1
async def get_evc(evc_id: str) -> dict:
49
    """Get EVC."""
50 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
51 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
52 1
        if response.status_code == 404:
53
            return {}
54 1
        if response.is_server_error:
55
            raise httpx.RequestError(response.text)
56 1
        if not response.is_success:
57
            raise UnrecoverableError(
58
                f"Failed to get_evc id {evc_id} "
59
                f"status code {response.status_code}, response text: {response.text}"
60
            )
61 1
        data = response.json()
62 1
        return {data["id"]: data}
63
64
65 1
@retry(
66
    stop=stop_after_attempt(5),
67
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
68
    before_sleep=before_sleep,
69
    retry=retry_if_exception_type(httpx.RequestError),
70
)
71 1
async def get_stored_flows(
72
    cookies: list[int] = None,
73
) -> dict[int, list[dict]]:
74
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
75 1
    cookies = cookies or []
76
77 1
    cookie_range_args = []
78 1
    for cookie in cookies:
79
        # gte cookie
80 1
        cookie_range_args.append(f"cookie_range={cookie}")
81
        # lte cookie
82 1
        cookie_range_args.append(f"cookie_range={cookie}")
83
84 1
    endpoint = "stored_flows?state=installed&state=pending"
85 1
    if cookie_range_args:
86 1
        endpoint = f"{endpoint}&{'&'.join(cookie_range_args)}"
87
88 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
89 1
        response = await client.get(f"/{endpoint}", timeout=10)
90 1
        if response.is_server_error:
91
            raise httpx.RequestError(response.text)
92 1
        if not response.is_success:
93
            raise UnrecoverableError(
94
                f"Failed to get_stored_flows cookies {cookies} "
95
                f"status code {response.status_code}, response text: {response.text}"
96
            )
97 1
        return _map_stored_flows_by_cookies(response.json())
98
99
100 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
101
    """Map stored flows by cookies.
102
103
    This is for mapping the data by cookies, just to it can be
104
    reused upfront by bulk operations.
105
    """
106 1
    flows_by_cookies = defaultdict(list)
107 1
    for flows in stored_flows.values():
108 1
        for flow in flows:
109 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
110 1
    return flows_by_cookies
111
112
113 1
@retry(
114
    stop=stop_after_attempt(5),
115
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
116
    before_sleep=before_sleep,
117
    retry=retry_if_exception_type(httpx.RequestError),
118
)
119 1
async def add_evcs_metadata(
120
    evcs: dict[str, dict], new_metadata: dict, force=False
121
) -> dict:
122
    """Add EVC metadata."""
123
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
124
        response = await client.post(
125
            "/evc/metadata",
126
            timeout=10,
127
            json={
128
                **new_metadata,
129
                **{"circuit_ids": [evc_id for evc_id, evc in evcs.items() if evc]},
130
            },
131
        )
132
        if response.is_success:
133
            return response.json()
134
        # Ignore 404 if force just so it's easier to handle this concurrently
135
        if response.status_code == 404 and force:
136
            return {}
137
138
        if response.is_server_error:
139
            raise httpx.RequestError(response.text)
140
        raise UnrecoverableError(
141
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
142
            f"status code {response.status_code}, response text: {response.text}"
143
        )
144