Passed
Push — master ( a932fa...68b174 )
by Vinicius
01:56 queued 20s
created

build.kytos_api_helper.get_stored_flows()   C

Complexity

Conditions 9

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 16
CRAP Score 10.0933

Importance

Changes 0
Metric Value
cc 9
eloc 31
nop 1
dl 0
loc 45
ccs 16
cts 21
cp 0.7619
crap 10.0933
rs 6.6666
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6 1
from typing import Union
7
8 1
import httpx
9 1
from napps.kytos.telemetry_int import settings
10 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
11 1
from tenacity import (
12
    retry,
13
    retry_if_exception_type,
14
    stop_after_attempt,
15
    wait_combine,
16
    wait_fixed,
17
    wait_random,
18
)
19
20 1
from kytos.core.retry import before_sleep
21
22
23 1
@retry(
24
    stop=stop_after_attempt(5),
25
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
26
    before_sleep=before_sleep,
27
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
28
)
29 1
async def get_evcs(**kwargs) -> dict:
30
    """Get EVCs."""
31 1
    archived = "false"
32 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
33 1
        endpoint = f"/evc/?archived={archived}"
34 1
        if kwargs:
35
            query_args = [f"{k}={v}" for k, v in kwargs.items()]
36
            endpoint = f"{endpoint}&{'&'.join(query_args)}"
37 1
        response = await client.get(endpoint, timeout=10)
38 1
        if response.is_server_error:
39
            raise httpx.RequestError(response.text)
40 1
        if not response.is_success:
41
            raise UnrecoverableError(
42
                f"Failed to get_evcs archived {archived}"
43
                f"status code {response.status_code}, response text: {response.text}"
44
            )
45 1
        return response.json()
46
47
48 1
@retry(
49
    stop=stop_after_attempt(5),
50
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
51
    before_sleep=before_sleep,
52
    retry=retry_if_exception_type(httpx.RequestError),
53
)
54 1
async def get_evc(evc_id: str, exclude_archived=True) -> dict:
55
    """Get EVC."""
56 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
57 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
58 1
        if response.status_code == 404:
59
            return {}
60 1
        if response.is_server_error:
61
            raise httpx.RequestError(response.text)
62 1
        if not response.is_success:
63
            raise UnrecoverableError(
64
                f"Failed to get_evc id {evc_id} "
65
                f"status code {response.status_code}, response text: {response.text}"
66
            )
67 1
        data = response.json()
68 1
        if data["archived"] and exclude_archived:
69
            return {}
70 1
        return {data["id"]: data}
71
72
73 1
@retry(
74
    stop=stop_after_attempt(5),
75
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
76
    before_sleep=before_sleep,
77
    retry=retry_if_exception_type(httpx.RequestError),
78
)
79 1
async def get_stored_flows(
80
    cookies: list[Union[int, tuple[int, int]]] = None,
81
) -> dict[int, list[dict]]:
82
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
83 1
    cookies = cookies or []
84
85 1
    cookie_range_args = []
86 1
    for cookie in cookies:
87 1
        if isinstance(cookie, int):
88
            # gte cookie
89 1
            cookie_range_args.append(cookie)
90
            # lte cookie
91 1
            cookie_range_args.append(cookie)
92
        elif isinstance(cookie, tuple) and len(cookie) == 2:
93
            # gte cookie
94
            cookie_range_args.append(cookie[0])
95
            # lte cookie
96
            cookie_range_args.append(cookie[1])
97
98 1
    endpoint = "stored_flows?state=installed&state=pending"
99 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
100 1
        if cookie_range_args:
101 1
            response = await client.request(
102
                "GET",
103
                f"/{endpoint}",
104
                json={"cookie_range": cookie_range_args},
105
                timeout=10,
106
            )
107
        else:
108 1
            response = await client.get(f"/{endpoint}", timeout=10)
109
110 1
        if response.is_server_error:
111
            raise httpx.RequestError(response.text)
112 1
        if not response.is_success:
113
            raise UnrecoverableError(
114
                f"Failed to get_stored_flows cookies {cookies} "
115
                f"status code {response.status_code}, response text: {response.text}"
116
            )
117 1
        return _map_stored_flows_by_cookies(response.json())
118
119
120 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
121
    """Map stored flows by cookies.
122
123
    This is for mapping the data by cookies, just to it can be
124
    reused upfront by bulk operations.
125
    """
126 1
    flows_by_cookies = defaultdict(list)
127 1
    for flows in stored_flows.values():
128 1
        for flow in flows:
129 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
130 1
    return flows_by_cookies
131
132
133 1
@retry(
134
    stop=stop_after_attempt(5),
135
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
136
    before_sleep=before_sleep,
137
    retry=retry_if_exception_type(httpx.RequestError),
138
)
139 1
async def add_evcs_metadata(
140
    evcs: dict[str, dict], new_metadata: dict, force=False
141
) -> dict:
142
    """Add EVC metadata."""
143
144
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
145
    # return early if there's no circuits to update their metadata
146
    if not circuit_ids:
147
        return {}
148
149
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
150
        response = await client.post(
151
            "/evc/metadata",
152
            timeout=10,
153
            json={
154
                **new_metadata,
155
                **{"circuit_ids": circuit_ids},
156
            },
157
        )
158
        if response.is_success:
159
            return response.json()
160
        # Ignore 404 if force just so it's easier to handle this concurrently
161
        if response.status_code == 404 and force:
162
            return {}
163
164
        if response.is_server_error:
165
            raise httpx.RequestError(response.text)
166
        raise UnrecoverableError(
167
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
168
            f"status code {response.status_code}, response text: {response.text}"
169
        )
170