Test Failed
Pull Request — master (#76)
by Vinicius
06:10
created

build.kytos_api_helper.get_stored_flows()   C

Complexity

Conditions 9

Size

Total Lines 45
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 9.1317

Importance

Changes 0
Metric Value
cc 9
eloc 31
nop 1
dl 0
loc 45
ccs 15
cts 17
cp 0.8824
crap 9.1317
rs 6.6666
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6
from typing import Union
7 1
8 1
import httpx
9 1
from napps.kytos.telemetry_int import settings
10 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
11
from tenacity import (
12
    retry,
13
    retry_if_exception_type,
14
    stop_after_attempt,
15
    wait_combine,
16
    wait_fixed,
17
    wait_random,
18
)
19 1
20
from kytos.core.retry import before_sleep
21
22 1
23
@retry(
24
    stop=stop_after_attempt(5),
25
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
26
    before_sleep=before_sleep,
27
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
28 1
)
29
async def get_evcs(**kwargs) -> dict:
30 1
    """Get EVCs."""
31 1
    archived = "false"
32 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
33 1
        endpoint = f"/evc/?archived={archived}"
34
        if kwargs:
35
            query_args = [f"{k}={v}" for k, v in kwargs.items()]
36 1
            endpoint = f"{endpoint}&{'&'.join(query_args)}"
37 1
        response = await client.get(endpoint, timeout=10)
38
        if response.is_server_error:
39 1
            raise httpx.RequestError(response.text)
40
        if not response.is_success:
41
            raise UnrecoverableError(
42
                f"Failed to get_evcs archived {archived}"
43
                f"status code {response.status_code}, response text: {response.text}"
44 1
            )
45
        return response.json()
46
47 1
48
@retry(
49
    stop=stop_after_attempt(5),
50
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
51
    before_sleep=before_sleep,
52
    retry=retry_if_exception_type(httpx.RequestError),
53 1
)
54
async def get_evc(evc_id: str, exclude_archived=True) -> dict:
55 1
    """Get EVC."""
56 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
57 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
58
        if response.status_code == 404:
59 1
            return {}
60
        if response.is_server_error:
61 1
            raise httpx.RequestError(response.text)
62
        if not response.is_success:
63
            raise UnrecoverableError(
64
                f"Failed to get_evc id {evc_id} "
65
                f"status code {response.status_code}, response text: {response.text}"
66 1
            )
67 1
        data = response.json()
68
        if data["archived"] and exclude_archived:
69 1
            return {}
70
        return {data["id"]: data}
71
72 1
73
@retry(
74
    stop=stop_after_attempt(5),
75
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
76
    before_sleep=before_sleep,
77
    retry=retry_if_exception_type(httpx.RequestError),
78 1
)
79
async def get_stored_flows(
80
    cookies: list[Union[int, tuple[int, int]]] = None,
81
) -> dict[int, list[dict]]:
82 1
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
83
    cookies = cookies or []
84 1
85 1
    cookie_range_args = []
86
    for cookie in cookies:
87 1
        if isinstance(cookie, int):
88
            # gte cookie
89 1
            cookie_range_args.append(cookie)
90
            # lte cookie
91 1
            cookie_range_args.append(cookie)
92 1
        elif isinstance(cookie, tuple) and len(cookie) == 2:
93 1
            # gte cookie
94 1
            cookie_range_args.append(cookie[0])
95
            # lte cookie
96
            cookie_range_args.append(cookie[1])
97
98
    endpoint = "stored_flows?state=installed&state=pending"
99
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
100
        if cookie_range_args:
101 1
            response = await client.request(
102
                "GET",
103 1
                f"/{endpoint}",
104
                json={"cookie_range": cookie_range_args},
105 1
                timeout=10,
106
            )
107
        else:
108
            response = await client.get(f"/{endpoint}", timeout=10)
109
110 1
        if response.is_server_error:
111
            raise httpx.RequestError(response.text)
112
        if not response.is_success:
113 1
            raise UnrecoverableError(
114
                f"Failed to get_stored_flows cookies {cookies} "
115
                f"status code {response.status_code}, response text: {response.text}"
116
            )
117
        return _map_stored_flows_by_cookies(response.json())
118
119 1
120 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
121 1
    """Map stored flows by cookies.
122 1
123 1
    This is for mapping the data by cookies, just to it can be
124
    reused upfront by bulk operations.
125
    """
126 1
    flows_by_cookies = defaultdict(list)
127
    for flows in stored_flows.values():
128
        for flow in flows:
129
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
130
    return flows_by_cookies
131
132 1
133
@retry(
134
    stop=stop_after_attempt(5),
135
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
136
    before_sleep=before_sleep,
137
    retry=retry_if_exception_type(httpx.RequestError),
138
)
139
async def add_evcs_metadata(
140
    evcs: dict[str, dict], new_metadata: dict, force=False
141
) -> dict:
142
    """Add EVC metadata."""
143
144
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
145
    # return early if there's no circuits to update their metadata
146
    if not circuit_ids:
147
        return {}
148
149
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
150
        response = await client.post(
151
            "/evc/metadata",
152
            timeout=10,
153
            json={
154
                **new_metadata,
155
                **{"circuit_ids": circuit_ids},
156
            },
157
        )
158
        if response.is_success:
159
            return response.json()
160
        # Ignore 404 if force just so it's easier to handle this concurrently
161
        if response.status_code == 404 and force:
162
            return {}
163
164
        if response.is_server_error:
165
            raise httpx.RequestError(response.text)
166
        raise UnrecoverableError(
167
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
168
            f"status code {response.status_code}, response text: {response.text}"
169
        )
170