build.kytos_api_helper.get_evc()   B
last analyzed

Complexity

Conditions 7

Size

Total Lines 23
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 8.1426

Importance

Changes 0
Metric Value
cc 7
eloc 19
nop 2
dl 0
loc 23
ccs 10
cts 14
cp 0.7143
crap 8.1426
rs 8
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4 1
from collections import defaultdict
5 1
from typing import Union
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9 1
from napps.kytos.telemetry_int.exceptions import UnrecoverableError
10 1
from tenacity import (
11
    retry,
12
    retry_if_exception_type,
13
    stop_after_attempt,
14
    wait_combine,
15
    wait_fixed,
16
    wait_random,
17
)
18
19 1
from kytos.core.retry import before_sleep
20
21
22 1
@retry(
23
    stop=stop_after_attempt(5),
24
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
25
    before_sleep=before_sleep,
26
    retry=retry_if_exception_type((httpx.RequestError, httpx.HTTPStatusError)),
27
)
28 1
async def get_evcs(**kwargs) -> dict:
29
    """Get EVCs."""
30 1
    archived = "false"
31 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
32 1
        endpoint = f"/evc/?archived={archived}"
33 1
        if kwargs:
34
            query_args = [f"{k}={v}" for k, v in kwargs.items()]
35
            endpoint = f"{endpoint}&{'&'.join(query_args)}"
36 1
        response = await client.get(endpoint, timeout=10)
37 1
        if response.is_server_error:
38
            raise httpx.RequestError(response.text)
39 1
        if not response.is_success:
40
            raise UnrecoverableError(
41
                f"Failed to get_evcs archived {archived}"
42
                f"status code {response.status_code}, response text: {response.text}"
43
            )
44 1
        return response.json()
45
46
47 1
@retry(
48
    stop=stop_after_attempt(5),
49
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
50
    before_sleep=before_sleep,
51
    retry=retry_if_exception_type(httpx.RequestError),
52
)
53 1
async def get_evc(evc_id: str, exclude_archived=True) -> dict:
54
    """Get EVC."""
55 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
56 1
        response = await client.get(f"/evc/{evc_id}", timeout=10)
57 1
        if response.status_code == 404:
58
            return {}
59 1
        if response.is_server_error:
60
            raise httpx.RequestError(response.text)
61 1
        if not response.is_success:
62
            raise UnrecoverableError(
63
                f"Failed to get_evc id {evc_id} "
64
                f"status code {response.status_code}, response text: {response.text}"
65
            )
66 1
        data = response.json()
67 1
        if data["archived"] and exclude_archived:
68
            return {}
69 1
        return {data["id"]: data}
70
71
72 1
@retry(
73
    stop=stop_after_attempt(5),
74
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
75
    before_sleep=before_sleep,
76
    retry=retry_if_exception_type(httpx.RequestError),
77
)
78 1
async def get_stored_flows(
79
    cookies: list[Union[int, tuple[int, int]]] = None,
80
) -> dict[int, list[dict]]:
81
    """Get flow_manager stored_flows grouped by cookies given a list of cookies."""
82 1
    cookies = cookies or []
83
84 1
    cookie_range_args = []
85 1
    for cookie in cookies:
86 1
        if isinstance(cookie, int):
87
            # gte cookie
88 1
            cookie_range_args.append(cookie)
89
            # lte cookie
90 1
            cookie_range_args.append(cookie)
91 1
        elif isinstance(cookie, tuple) and len(cookie) == 2:
92
            # gte cookie
93 1
            cookie_range_args.append(cookie[0])
94
            # lte cookie
95 1
            cookie_range_args.append(cookie[1])
96
97 1
    endpoint = "stored_flows?state=installed&state=pending"
98 1
    async with httpx.AsyncClient(base_url=settings.flow_manager_api) as client:
99 1
        if cookie_range_args:
100 1
            response = await client.request(
101
                "GET",
102
                f"/{endpoint}",
103
                json={"cookie_range": cookie_range_args},
104
                timeout=10,
105
            )
106
        else:
107 1
            response = await client.get(f"/{endpoint}", timeout=10)
108
109 1
        if response.is_server_error:
110
            raise httpx.RequestError(response.text)
111 1
        if not response.is_success:
112
            raise UnrecoverableError(
113
                f"Failed to get_stored_flows cookies {cookies} "
114
                f"status code {response.status_code}, response text: {response.text}"
115
            )
116 1
        return _map_stored_flows_by_cookies(response.json())
117
118
119 1
def _map_stored_flows_by_cookies(stored_flows: dict) -> dict[int, list[dict]]:
120
    """Map stored flows by cookies.
121
122
    This is for mapping the data by cookies, just to it can be
123
    reused upfront by bulk operations.
124
    """
125 1
    flows_by_cookies = defaultdict(list)
126 1
    for flows in stored_flows.values():
127 1
        for flow in flows:
128 1
            flows_by_cookies[flow["flow"]["cookie"]].append(flow)
129 1
    return flows_by_cookies
130
131
132 1
@retry(
133
    stop=stop_after_attempt(5),
134
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
135
    before_sleep=before_sleep,
136
    retry=retry_if_exception_type(httpx.RequestError),
137
)
138 1
async def add_evcs_metadata(
139
    evcs: dict[str, dict], new_metadata: dict, force=False
140
) -> dict:
141
    """Add EVC metadata."""
142
143 1
    circuit_ids = [evc_id for evc_id, evc in evcs.items() if evc]
144
    # return early if there's no circuits to update their metadata
145 1
    if not circuit_ids:
146 1
        return {}
147
148 1
    async with httpx.AsyncClient(base_url=settings.mef_eline_api) as client:
149 1
        response = await client.post(
150
            "/evc/metadata",
151
            timeout=10,
152
            json={
153
                **new_metadata,
154
                **{"circuit_ids": circuit_ids},
155
            },
156
        )
157 1
        if response.is_success:
158 1
            return response.json()
159
        # Ignore 404 if force just so it's easier to handle this concurrently
160
        if response.status_code == 404 and force:
161
            return {}
162
163
        if response.is_server_error:
164
            raise httpx.RequestError(response.text)
165
        raise UnrecoverableError(
166
            f"Failed to add_evc_metadata for EVC ids {list(evcs.keys())} "
167
            f"status code {response.status_code}, response text: {response.text}"
168
        )
169
170
171 1 View Code Duplication
@retry(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
172
    stop=stop_after_attempt(5),
173
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
174
    before_sleep=before_sleep,
175
    retry=retry_if_exception_type(httpx.RequestError),
176
)
177 1
async def add_proxy_port_metadata(intf_id: str, port_no: int) -> dict:
178
    """Add proxy_port metadata."""
179 1
    async with httpx.AsyncClient(base_url=settings.topology_url) as client:
180 1
        response = await client.post(
181
            f"/interfaces/{intf_id}/metadata",
182
            timeout=10,
183
            json={"proxy_port": port_no},
184
        )
185 1
        if response.is_success:
186 1
            return response.json()
187
        if response.status_code == 404:
188
            raise ValueError(f"interface_id {intf_id} not found")
189
        if response.is_server_error:
190
            raise httpx.RequestError(response.text)
191
        raise UnrecoverableError(
192
            f"Failed to add_proxy_port {port_no} metadata for intf_id {intf_id} "
193
            f"status code {response.status_code}, response text: {response.text}"
194
        )
195
196
197 1 View Code Duplication
@retry(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
198
    stop=stop_after_attempt(5),
199
    wait=wait_combine(wait_fixed(3), wait_random(min=2, max=7)),
200
    before_sleep=before_sleep,
201
    retry=retry_if_exception_type(httpx.RequestError),
202
)
203 1
async def delete_proxy_port_metadata(intf_id: str) -> dict:
204
    """Delete proxy_port metadata."""
205 1
    async with httpx.AsyncClient(base_url=settings.topology_url) as client:
206 1
        response = await client.delete(
207
            f"/interfaces/{intf_id}/metadata/proxy_port",
208
            timeout=10,
209
        )
210 1
        if response.is_success:
211 1
            return response.json()
212
        if response.status_code == 404:
213
            raise ValueError(f"interface_id {intf_id} or metadata proxy_port not found")
214
        if response.is_server_error:
215
            raise httpx.RequestError(response.text)
216
        raise UnrecoverableError(
217
            f"Failed to delete_proxy_port metadata for intf_id {intf_id} "
218
            f"status code {response.status_code}, response text: {response.text}"
219
        )
220