Passed
Pull Request — master (#60)
by Vinicius
03:22
created

build.kytos_api_helper.get_evcs()   B

Complexity

Conditions 5

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 5.583

Importance

Changes 0
Metric Value
cc 5
eloc 19
nop 1
dl 0
loc 23
ccs 10
cts 14
cp 0.7143
crap 5.583
rs 8.9833
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
from collections import defaultdict
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
10 1
from tenacity import (
11
    retry,
12
    retry_if_exception_type,
13
    stop_after_attempt,
14
    wait_combine,
15
    wait_fixed,
16
    wait_random,
17
)
18
19 1
from kytos.core.retry import before_sleep
20
21
22 1
@retry(
23
    stop=stop_after_attempt(5),
24
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
25
    before_sleep=before_sleep,
26
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
27
)
28 1
async def get_evcs(**kwargs) -> dict:
29
    """Get EVCs."""
30 1
    archived = "false"
31 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
32 1
        endpoint = f"/evc/?archived={archived}"
33 1
        if kwargs:
34
            query_args = [f"{k}={v}" for k, v in kwargs.items()]
35
            endpoint = f"{endpoint}&{'&'.join(query_args)}"
36 1
        response = await client.get(endpoint, timeout=10)
37 1
        if response.is_server_error:
38
            raise httpx.RequestError(response.text)
39 1
        if not response.is_success:
40
            raise UnrecoverableError(
41
                f"Failed to get_evcs archived {archived}"
42
                f"status code {response.status_code}, response text: {response.text}"
43
            )
44 1
        return response.json()
45
46
47 1
@retry(
48
    stop=stop_after_attempt(5),
49
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
50
    before_sleep=before_sleep,
51
    retry=retry_if_exception_type(httpx.RequestError),
52
)
53 1
async def get_evc(evc_id: str, exclude_archived=True) -> dict:
54
    """Get EVC."""
55 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
56 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
57 1
        if response.status_code == 404:
58
            return {}
59 1
        if response.is_server_error:
60
            raise httpx.RequestError(response.text)
61 1
        if not response.is_success:
62
            raise UnrecoverableError(
63
                f"Failed to get_evc id {evc_id} "
64
                f"status code {response.status_code}, response text: {response.text}"
65
            )
66 1
        data = response.json()
67 1
        if data["archived"] and exclude_archived:
68
            return {}
69 1
        return {data["id"]: data}
70
71
72 1
@retry(
73
    stop=stop_after_attempt(5),
74
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
75
    before_sleep=before_sleep,
76
    retry=retry_if_exception_type(httpx.RequestError),
77
)
78 1
async def get_stored_flows(
79
    cookies: list[int] = None,
80
) -> dict[int, list[dict]]:
81
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
82 1
    cookies = cookies or []
83
84 1
    cookie_range_args = []
85 1
    for cookie in cookies:
86
        # gte cookie
87 1
        cookie_range_args.append(f"cookie_range={cookie}")
88
        # lte cookie
89 1
        cookie_range_args.append(f"cookie_range={cookie}")
90
91 1
    endpoint = "stored_flows?state=installed&state=pending"
92 1
    if cookie_range_args:
93 1
        endpoint = f"{endpoint}&{'&'.join(cookie_range_args)}"
94
95 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
96 1
        response = await client.get(f"/{endpoint}", timeout=10)
97 1
        if response.is_server_error:
98
            raise httpx.RequestError(response.text)
99 1
        if not response.is_success:
100
            raise UnrecoverableError(
101
                f"Failed to get_stored_flows cookies {cookies} "
102
                f"status code {response.status_code}, response text: {response.text}"
103
            )
104 1
        return _map_stored_flows_by_cookies(response.json())
105
106
107 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
108
    """Map stored flows by cookies.
109
110
    This is for mapping the data by cookies, just to it can be
111
    reused upfront by bulk operations.
112
    """
113 1
    flows_by_cookies = defaultdict(list)
114 1
    for flows in stored_flows.values():
115 1
        for flow in flows:
116 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
117 1
    return flows_by_cookies
118
119
120 1
@retry(
121
    stop=stop_after_attempt(5),
122
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
123
    before_sleep=before_sleep,
124
    retry=retry_if_exception_type(httpx.RequestError),
125
)
126 1
async def add_evcs_metadata(
127
    evcs: dict[str, dict], new_metadata: dict, force=False
128
) -> dict:
129
    """Add EVC metadata."""
130
131
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
132
    # return early if there's no circuits to update their metadata
133
    if not circuit_ids:
134
        return {}
135
136
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
137
        response = await client.post(
138
            "/evc/metadata",
139
            timeout=10,
140
            json={
141
                **new_metadata,
142
                **{"circuit_ids": circuit_ids},
143
            },
144
        )
145
        if response.is_success:
146
            return response.json()
147
        # Ignore 404 if force just so it's easier to handle this concurrently
148
        if response.status_code == 404 and force:
149
            return {}
150
151
        if response.is_server_error:
152
            raise httpx.RequestError(response.text)
153
        raise UnrecoverableError(
154
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
155
            f"status code {response.status_code}, response text: {response.text}"
156
        )
157