Passed
Push — master ( 7f62ad...2ec8a9 )
by Vinicius
02:07 queued 13s
created

set_telemetry_metadata_true()   A

Complexity

Conditions 1

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.2963

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 2
dl 0
loc 10
ccs 1
cts 3
cp 0.3333
crap 1.2963
rs 10
c 0
b 0
f 0
1
""" This module was created to be the main interface between the telemetry napp and all
2
other kytos napps' APIs """
3
4
5 1
import datetime
6
7 1
import httpx
8 1
from napps.kytos.telemetry_int import settings
9
10 1
from kytos.core import log
11
12 1
from .settings import flow_manager_api, mef_eline_api
13
14
# pylint: disable=fixme,too-many-arguments,no-else-return
15
16
17 1
def kytos_api(
18
    get=False,
19
    put=False,
20
    post=False,
21
    mef_eline=False,
22
    evc_id=None,
23
    flow_manager=False,
24
    switch=None,
25
    data=None,
26
    metadata=False,
27
):
28
    """Main function to handle requests to Kytos API."""
29
30
    # TODO: add support for batch, temporizer, retries
31
    if flow_manager_api:
32
        kytos_api_url = flow_manager_api
33
    if mef_eline:
34
        kytos_api_url = mef_eline_api
35
36
    headers = {"Content-Type": "application/json"}
37
38
    try:
39
        if get:
40
            if data:
41
                kytos_api_url += data
0 ignored issues
show
introduced by
The variable kytos_api_url does not seem to be defined in case flow_manager_api on line 31 is False. Are you sure this can never be the case?
Loading history...
42
            return httpx.get(kytos_api_url, timeout=10).json()
43
44
        elif put:
45
            httpx.put(kytos_api_url, timeout=10, headers=headers)
46
47
        elif post:
48
            if mef_eline and metadata:
49
                url = f"{kytos_api_url}/evc/{evc_id}/metadata"
50
                response = httpx.post(url, headers=headers, json=data, timeout=10)
51
                return response.status_code == 201
52
53
            if flow_manager:
54
                url = f"{kytos_api_url}/{switch}"
55
                response = httpx.post(url, headers=headers, json=data, timeout=10)
56
                return response.status_code == 202
57
58
    except httpx.RequestError as http_err:
59
        log.error(f"HTTP error occurred: {http_err}")
60
61
    return False
62
63
64 1
def get_evcs(archived="false") -> dict:
65
    """Get EVCs."""
66 1
    try:
67 1
        response = httpx.get(f"{settings.mef_eline_api}/evc/?archived={archived}")
68 1
        response.raise_for_status()
69 1
        return response.json()
70
    except httpx.HTTPStatusError as exc:
71
        log.error(f"response: {response.text} for {str(exc)}")
72
        return {}
73
74
75 1
def get_evc(evc_id: str) -> dict:
76
    """Get EVC."""
77 1
    try:
78 1
        response = httpx.get(f"{settings.mef_eline_api}/evc/{evc_id}")
79 1
        if response.status_code == 404:
80
            return {}
81
82 1
        response.raise_for_status()
83 1
        data = response.json()
84 1
        return {data["id"]: data}
85
    except httpx.HTTPStatusError as exc:
86
        log.error(f"response: {response.text} for {str(exc)}")
87
        return {}
88
89
90 1
def get_evc_flows(cookie: int, *dpid: str) -> dict:
91
    """Get EVC's flows given a range of cookies."""
92 1
    endpoint = (
93
        f"stored_flows?cookie_range={cookie}&cookie_range={cookie}"
94
        "&state=installed&state=pending"
95
    )
96 1
    if dpid:
97 1
        dpid_query_args = [f"&dpid={val}" for val in dpid]
98 1
        endpoint = f"{endpoint}{''.join(dpid_query_args)}"
99 1
    response = httpx.get(f"{settings.flow_manager_api}/{endpoint}")
100 1
    try:
101 1
        response.raise_for_status()
102 1
        return response.json()
103
    except httpx.HTTPStatusError as exc:
104
        log.error(f"response: {response.text} for {str(exc)}")
105
        return {}
106
107
108 1
def set_telemetry_metadata_true(evc_id, direction):
109
    """Set telemetry enabled metadata item to true"""
110
    data = {
111
        "telemetry": {
112
            "enabled": True,
113
            "direction": direction,
114
            "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
115
        }
116
    }
117
    return kytos_api(post=True, mef_eline=True, evc_id=evc_id, metadata=True, data=data)
118
119
120 1
def set_telemetry_metadata_false(evc_id):
121
    """Set telemetry enabled metadata item to false"""
122
    data = {
123
        "telemetry": {
124
            "enabled": False,
125
            "timestamp": datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S"),
126
        }
127
    }
128
129
    return kytos_api(post=True, mef_eline=True, evc_id=evc_id, metadata=True, data=data)
130
131
132 1
def kytos_push_flows(switch, data):
133
    """Push flows to Flow Manager"""
134
    return kytos_api(post=True, flow_manager=True, switch=switch, data=data)
135