Test Failed
Pull Request — master (#35)
by Vinicius
06:08
created

build.main.Main.enable_int_sink()   C

Complexity

Conditions 9

Size

Total Lines 84
Code Lines 50

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 0
Metric Value
cc 9
eloc 50
nop 3
dl 0
loc 84
ccs 0
cts 43
cp 0
crap 90
rs 6.303
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A build.main.Main.listen_for_evc_change() 0 4 1
A build.main.Main.listen_for_path_changes() 0 4 1
A build.main.Main.listen_for_evcs_removed() 0 3 1
A build.main.Main.listen_for_topology_changes() 0 6 1

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
"""Main module of kytos/telemetry Network Application.
2
3
Napp to deploy In-band Network Telemetry over Ethernet Virtual Circuits
4
5
"""
6
import itertools
7
from collections import defaultdict
8
9
from tenacity import RetryError
10
11
from kytos.core import KytosNApp, rest, log
12
from kytos.core.rest_api import (
13
    HTTPException,
14
    JSONResponse,
15
    Request,
16
    aget_json_or_400,
17
)
18
19
from .managers.int import INTManager
20
from .exceptions import (
21
    EVCHasNoINT,
22
    EVCHasINT,
23
    EVCNotFound,
24
    FlowsNotFound,
25
    ProxyPortNotFound,
26
    ProxyPortStatusNotUP,
27
    UnrecoverableError,
28
)
29
from .kytos_api_helper import get_evc, get_evcs
30
from .proxy_port import ProxyPort
31
from napps.kytos.telemetry_int import utils
32
33
# pylint: disable=fixme
34
35
36
class Main(KytosNApp):
37
    """Main class of kytos/telemetry NApp.
38
39
    This class is the entry point for this NApp.
40
    """
41
42
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
51
        self.int_manager = INTManager(self.controller)
52
53
    def execute(self):
54
        """Run after the setup method execution.
55
56
        You can also use this method in loop mode if you add to the above setup
57
        method a line like the following example:
58
59
            self.execute_as_loop(30)  # 30-second interval.
60
        """
61
62
    def shutdown(self):
63
        """Run when your NApp is unloaded.
64
65
        If you have some cleanup procedure, insert it here.
66
        """
67
68
    async def provision_int_unidirectional(
69
        self, evc: dict, source_uni: dict, destination_uni: dict, proxy_port: ProxyPort
70
    ) -> dict[str, list]:
71
        """Create INT flows from source to destination."""
72
        switches_flows = defaultdict(list)
73
74
        # Create flows for the first switch (INT Source)
75
        source_flows = self.enable_int_source(source_uni, evc, proxy_port)
76
77
        # Create flows the INT hops
78
        hop_flows = self.enable_int_hop(evc, source_uni, destination_uni)
79
80
        # # Create flows for the last switch (INT Sink)
81
        sink_flows = self.enable_int_sink(destination_uni, evc, proxy_port)
82
83
        for flow in itertools.chain(source_flows, hop_flows, sink_flows):
84
            switches_flows[flow["switch"]].append(flow)
85
86
        return await self.install_int_flows(switches_flows)
87
88 View Code Duplication
    @rest("v1/evc/enable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
89
    async def enable_telemetry(self, request: Request) -> JSONResponse:
90
        """REST to enable INT flows on EVCs.
91
92
        If a list of evc_ids is empty, it'll enable on non-INT EVCs.
93
        """
94
95
        try:
96
            content = await aget_json_or_400(request)
97
            evc_ids = content["evc_ids"]
98
            force = bool(content.get("force", False))
99
        except (TypeError, KeyError):
100
            raise HTTPException(400, detail=f"Invalid payload: {content}")
101
102
        try:
103
            evcs = await get_evcs() if len(evc_ids) != 1 else await get_evc(evc_ids[0])
104
        except RetryError as exc:
105
            exc_error = str(exc.last_attempt.exception())
106
            log.error(exc_error)
107
            raise HTTPException(503, detail=exc_error)
108
109
        if evc_ids:
110
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
111
        else:
112
            evcs = {k: v for k, v in evcs.items() if not utils.has_int_enabled(v)}
113
            if not evcs:
114
                # There's no non-INT EVCs to get enabled.
115
                return JSONResponse({})
116
117
        try:
118
            await self.int_manager.enable_int(evcs, force)
119
        except (EVCNotFound, FlowsNotFound, ProxyPortNotFound) as exc:
120
            raise HTTPException(404, detail=str(exc))
121
        except (EVCHasINT, ProxyPortStatusNotUP) as exc:
122
            raise HTTPException(400, detail=str(exc))
123
        except RetryError as exc:
124
            exc_error = str(exc.last_attempt.exception())
125
            log.error(exc_error)
126
            raise HTTPException(503, detail=exc_error)
127
        except UnrecoverableError as exc:
128
            exc_error = str(exc)
129
            log.error(exc_error)
130
            raise HTTPException(500, detail=exc_error)
131
132
        return JSONResponse({}, status_code=201)
133
134 View Code Duplication
    @rest("v1/evc/disable", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
135
    async def disable_telemetry(self, request: Request) -> JSONResponse:
136
        """REST to disable/remove INT flows for an EVC_ID
137
138
        If a list of evc_ids is empty, it'll disable on all INT EVCs.
139
        """
140
        try:
141
            content = await aget_json_or_400(request)
142
            evc_ids = content["evc_ids"]
143
            force = bool(content.get("force", False))
144
        except (TypeError, KeyError):
145
            raise HTTPException(400, detail=f"Invalid payload: {content}")
146
147
        try:
148
            evcs = await get_evcs() if len(evc_ids) != 1 else await get_evc(evc_ids[0])
149
        except RetryError as exc:
150
            exc_error = str(exc.last_attempt.exception())
151
            log.error(exc_error)
152
            raise HTTPException(503, detail=exc_error)
153
154
        if evc_ids:
155
            evcs = {evc_id: evcs.get(evc_id, {}) for evc_id in evc_ids}
156
        else:
157
            evcs = {k: v for k, v in evcs.items() if utils.has_int_enabled(v)}
158
            if not evcs:
159
                # There's no INT EVCs to get disabled.
160
                return JSONResponse({})
161
162
        try:
163
            await self.int_manager.disable_int(evcs, force)
164
        except EVCNotFound as exc:
165
            raise HTTPException(404, detail=str(exc))
166
        except EVCHasNoINT as exc:
167
            raise HTTPException(400, detail=str(exc))
168
        except RetryError as exc:
169
            exc_error = str(exc.last_attempt.exception())
170
            log.error(exc_error)
171
            raise HTTPException(503, detail=exc_error)
172
        except UnrecoverableError as exc:
173
            exc_error = str(exc)
174
            log.error(exc_error)
175
            raise HTTPException(500, detail=exc_error)
176
177
        return JSONResponse({})
178
179
    @rest("v1/evc")
180
    def get_evcs(self, _request: Request) -> JSONResponse:
181
        """REST to return the list of EVCs with INT enabled"""
182
        return JSONResponse(utils.get_evc_with_telemetry())
183
184
    @rest("v1/sync")
185
    def sync_flows(self, _request: Request) -> JSONResponse:
186
        """Endpoint to force the telemetry napp to search for INT flows and delete them
187
        accordingly to the evc metadata."""
188
189
        # TODO
190
        # for evc_id in get_evcs_ids():
191
        return JSONResponse("TBD")
192
193
    @rest("v1/evc/update")
194
    def update_evc(self, _request: Request) -> JSONResponse:
195
        """If an EVC changed from unidirectional to bidirectional telemetry,
196
        make the change."""
197
        return JSONResponse({})
198
199
    # Event-driven methods: future
200
    def listen_for_new_evcs(self):
201
        """Change newly created EVC to INT-enabled EVC based on the metadata field
202
        (future)"""
203
        pass
204
205
    def listen_for_evc_change(self):
206
        """Change newly created EVC to INT-enabled EVC based on the
207
        metadata field (future)"""
208
        pass
209
210
    def listen_for_path_changes(self):
211
        """Change EVC's new path to INT-enabled EVC based on the metadata field
212
        when there is a path change. (future)"""
213
        pass
214
215
    def listen_for_evcs_removed(self):
216
        """Remove all INT flows belonging the just removed EVC (future)"""
217
        pass
218
219
    def listen_for_topology_changes(self):
220
        """If the topology changes, make sure it is not the loop ports.
221
        If so, update proxy ports"""
222
        # TODO:
223
        # self.proxy_ports = create_proxy_ports(self.proxy_ports)
224
        pass
225