Passed
Push — master ( 2ec8a9...67da38 )
by Vinicius
07:53 queued 06:30
created

build.main.Main.execute()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
nop 1
dl 0
loc 2
ccs 0
cts 1
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
7
import napps.kytos.telemetry_int.kytos_api_helper as api
8
from napps.kytos.telemetry_int import settings, utils
9
from tenacity import RetryError
10
11
from kytos.core import KytosEvent, KytosNApp, log, rest
12
from kytos.core.helpers import alisten_to
13
from kytos.core.rest_api import HTTPException, JSONResponse, Request, aget_json_or_400
14
15
from .exceptions import (
16
    EVCHasINT,
17
    EVCHasNoINT,
18
    EVCNotFound,
19
    FlowsNotFound,
20
    ProxyPortNotFound,
21
    ProxyPortStatusNotUP,
22
    UnrecoverableError,
23
)
24
from .managers.int import INTManager
25
26
# pylint: disable=fixme
27
28
29
class Main(KytosNApp):
30
    """Main class of kytos/telemetry NApp.
31
32
    This class is the entry point for this NApp.
33
    """
34
35
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
44
        self.int_manager = INTManager(self.controller)
45
46
    def execute(self):
47
        """Run after the setup method execution.
48
49
        You can also use this method in loop mode if you add to the above setup
50
        method a line like the following example:
51
52
            self.execute_as_loop(30)  # 30-second interval.
53
        """
54
55
    def shutdown(self):
56
        """Run when your NApp is unloaded.
57
58
        If you have some cleanup procedure, insert it here.
59
        """
60
61 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
62
    async def enable_telemetry(self, request: Request) -> JSONResponse:
63
        """REST to enable INT flows on EVCs.
64
65
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
66
        """
67
68
        try:
69
            content = await aget_json_or_400(request)
70
            evc_ids = content["evc_ids"]
71
            force = content.get("force", False)
72
            if not isinstance(force, bool):
73
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
74
        except (TypeError, KeyError):
75
            raise HTTPException(400, detail=f"Invalid payload: {content}")
76
77
        try:
78
            evcs = (
79
                await api.get_evcs()
80
                if len(evc_ids) != 1
81
                else await api.get_evc(evc_ids[0])
82
            )
83
        except RetryError as exc:
84
            exc_error = str(exc.last_attempt.exception())
85
            log.error(exc_error)
86
            raise HTTPException(503, detail=exc_error)
87
88
        if evc_ids:
89
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
90
        else:
91
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
92
            if not evcs:
93
                # There's no non-INT EVCs to get enabled.
94
                return JSONResponse({})
95
96
        try:
97
            await self.int_manager.enable_int(evcs, force)
98
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
99
            raise HTTPException(404, detail=str(exc))
100
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
101
            raise HTTPException(400, detail=str(exc))
102
        except RetryError as exc:
103
            exc_error = str(exc.last_attempt.exception())
104
            log.error(exc_error)
105
            raise HTTPException(503, detail=exc_error)
106
        except UnrecoverableError as exc:
107
            exc_error = str(exc)
108
            log.error(exc_error)
109
            raise HTTPException(500, detail=exc_error)
110
111
        return JSONResponse({}, status_code=201)
112
113 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
114
    async def disable_telemetry(self, request: Request) -> JSONResponse:
115
        """REST to disable/remove INT flows for an EVC_ID
116
117
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
118
        """
119
        try:
120
            content = await aget_json_or_400(request)
121
            evc_ids = content["evc_ids"]
122
            force = content.get("force", False)
123
            if not isinstance(force, bool):
124
                raise TypeError(f"'force' wrong type: {type(force)} expected bool")
125
        except (TypeError, KeyError):
126
            raise HTTPException(400, detail=f"Invalid payload: {content}")
127
128
        try:
129
            evcs = (
130
                await api.get_evcs()
131
                if len(evc_ids) != 1
132
                else await api.get_evc(evc_ids[0])
133
            )
134
        except RetryError as exc:
135
            exc_error = str(exc.last_attempt.exception())
136
            log.error(exc_error)
137
            raise HTTPException(503, detail=exc_error)
138
139
        if evc_ids:
140
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
141
        else:
142
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
143
            if not evcs:
144
                # There's no INT EVCs to get disabled.
145
                return JSONResponse({})
146
147
        try:
148
            await self.int_manager.disable_int(evcs, force)
149
        except EVCNotFound as exc:
150
            raise HTTPException(404, detail=str(exc))
151
        except EVCHasNoINT as exc:
152
            raise HTTPException(400, detail=str(exc))
153
        except RetryError as exc:
154
            exc_error = str(exc.last_attempt.exception())
155
            log.error(exc_error)
156
            raise HTTPException(503, detail=exc_error)
157
        except UnrecoverableError as exc:
158
            exc_error = str(exc)
159
            log.error(exc_error)
160
            raise HTTPException(500, detail=exc_error)
161
162
        return JSONResponse({})
163
164
    @rest("v1/evc")
165
    def get_evcs(self, _request: Request) -> JSONResponse:
166
        """REST to return the list of EVCs with INT enabled"""
167
        return JSONResponse(utils.get_evc_with_telemetry())
168
169
    @rest("v1/sync")
170
    def sync_flows(self, _request: Request) -> JSONResponse:
171
        """Endpoint to force the telemetry napp to search for INT flows and delete them
172
        accordingly to the evc metadata."""
173
174
        # TODO
175
        # for evc_id in get_evcs_ids():
176
        return JSONResponse("TBD")
177
178
    @rest("v1/evc/update")
179
    def update_evc(self, _request: Request) -> JSONResponse:
180
        """If an EVC changed from unidirectional to bidirectional telemetry,
181
        make the change."""
182
        return JSONResponse({})
183
184
    @alisten_to("kytos/mef_eline.deleted")
185
    async def on_evc_deleted(self, event: KytosEvent) -> None:
186
        """On EVC deleted."""
187
        content = event.content
188
        if (
189
            "metadata" in content
190
            and "telemetry" in content["metadata"]
191
            and content["metadata"]["telemetry"]["enabled"]
192
        ):
193
            evc_id = content["evc_id"]
194
            log.info(
195
                f"EVC({evc_id}, {content.get('name', '')}) got deleted, "
196
                "INT flows will be removed too"
197
            )
198
            stored_flows = await api.get_stored_flows(
199
                [utils.get_cookie(evc_id, settings.INT_COOKIE_PREFIX)]
200
            )
201
            await self.int_manager.remove_int_flows(stored_flows)
202
203
    # Event-driven methods: future
204
    def listen_for_new_evcs(self):
205
        """Change newly created EVC to INT-enabled EVC based on the metadata field
206
        (future)"""
207
        pass
208
209
    def listen_for_evc_change(self):
210
        """Change newly created EVC to INT-enabled EVC based on the
211
        metadata field (future)"""
212
        pass
213
214
    def listen_for_path_changes(self):
215
        """Change EVC's new path to INT-enabled EVC based on the metadata field
216
        when there is a path change. (future)"""
217
        pass
218
219
    def listen_for_evcs_removed(self):
220
        """Remove all INT flows belonging the just removed EVC (future)"""
221
        pass
222
223
    def listen_for_topology_changes(self):
224
        """If the topology changes, make sure it is not the loop ports.
225
        If so, update proxy ports"""
226
        # TODO:
227
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
228
        pass
229