Test Failed
Pull Request — master (#396)
by
unknown
06:28
created

build.main   F

Complexity

Total Complexity 193

Size/Duplication

Total Lines 1090
Duplicated Lines 3.67 %

Test Coverage

Coverage 93.27%

Importance

Changes 0
Metric Value
eloc 729
dl 40
loc 1090
ccs 596
cts 639
cp 0.9327
rs 1.871
c 0
b 0
f 0
wmc 193

48 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.list_circuits() 0 15 1
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.setup() 0 28 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
C Main.should_be_checked() 0 16 9
B Main.list_schedules() 0 31 5
C Main.execute_consistency() 0 26 9
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main._use_uni_tags() 0 10 2
A Main.add_metadata() 0 19 3
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.update_schedule() 0 51 3
A Main.handle_evc_deployed() 0 7 3
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 6 3
A Main.get_metadata() 0 13 2
A Main.handle_link_up() 0 7 5
A Main.delete_metadata() 0 16 2
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 36 3
A Main.handle_evc_affected_by_link_down() 0 14 4
B Main.handle_topology_update() 0 14 8
A Main.on_evc_deployed() 0 4 1
F Main.handle_link_down() 0 81 17
F Main.create_circuit() 0 115 15
A Main.on_topology_update() 0 4 1
B Main._link_from_dict() 0 26 6
B Main._uni_from_dict() 0 29 5
A Main.redeploy() 0 23 4
A Main.handle_flow_mod_error() 0 10 4
A Main._is_duplicated_evc() 0 14 4
F Main.update() 0 60 14
A Main.bulk_add_metadata() 20 20 4
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main.bulk_delete_metadata() 20 20 4
A Main._load_evc() 0 21 4
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 61 4
A Main._get_circuits_buffer() 0 13 3
A Main.delete_circuit() 0 39 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.exceptions import KytosTagError
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI, TAGRange
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from kytos.core.tag_ranges import get_tag_ranges
22 1
from napps.kytos.mef_eline import controllers, settings
23
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
24 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
25 1
                                          Path)
26
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
27
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
28
                                         emit_event, get_vlan_tags_and_masks,
29
                                         map_evc_event_content)
30 1
31
32
# pylint: disable=too-many-public-methods
33
class Main(KytosNApp):
34
    """Main class of amlight/mef_eline NApp.
35
36 1
    This class is the entry point for this napp.
37
    """
38 1
39
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47 1
        So, if you have any setup routine, insert it here.
48
        """
49
        # object used to scheduler circuit events
50 1
        self.sched = Scheduler()
51 1
52
        # object to save and load circuits
53
        self.mongo_controller = self.get_eline_controller()
54 1
        self.mongo_controller.bootstrap_indexes()
55
56
        # set the controller that will manager the dynamic paths
57
        DynamicPathManager.set_controller(self.controller)
58 1
59
        # dictionary of EVCs created. It acts as a circuit buffer.
60 1
        # Every create/update/delete must be synced to mongodb.
61 1
        self.circuits = {}
62 1
63
        self.table_group = {"epl": 0, "evpl": 0}
64 1
        self._lock = Lock()
65 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
66
67 1
        self.load_all_evcs()
68
        self._topology_updated_at = None
69
70
    def get_evcs_by_svc_level(self) -> list:
71
        """Get circuits sorted by desc service level and asc creation_time.
72 1
73
        In the future, as more ops are offloaded it should be get from the DB.
74
        """
75 1
        return sorted(self.circuits.values(),
76 1
                      key=lambda x: (-x.service_level, x.creation_time))
77
78
    @staticmethod
79
    def get_eline_controller():
80 1
        """Return the ELineController instance."""
81
        return controllers.ELineController()
82 1
83 1
    def execute(self):
84 1
        """Execute once when the napp is running."""
85 1
        if self._lock.locked():
86 1
            return
87 1
        log.debug("Starting consistency routine")
88
        with self._lock:
89 1
            self.execute_consistency()
90
        log.debug("Finished consistency routine")
91
92 1
    def should_be_checked(self, circuit):
93
        "Verify if the circuit meets the necessary conditions to be checked"
94
        # pylint: disable=too-many-boolean-expressions
95
        if (
96
                circuit.is_enabled()
97
                and not circuit.is_active()
98
                and not circuit.lock.locked()
99
                and not circuit.has_recent_removed_flow()
100
                and not circuit.is_recent_updated()
101
                and circuit.are_unis_active(self.controller.switches)
102
                # if a inter-switch EVC does not have current_path, it does not
103 1
                # make sense to run sdntrace on it
104
                and (circuit.is_intra_switch() or circuit.current_path)
105
                ):
106 1
            return True
107
        return False
108 1
109 1
    def execute_consistency(self):
110 1
        """Execute consistency routine."""
111 1
        circuits_to_check = []
112 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
113 1
        for circuit in self.get_evcs_by_svc_level():
114 1
            stored_circuits.pop(circuit.id, None)
115 1
            if self.should_be_checked(circuit):
116 1
                circuits_to_check.append(circuit)
117 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
118 1
        for circuit in circuits_to_check:
119 1
            is_checked = circuits_checked.get(circuit.id)
120 1
            if is_checked:
121 1
                circuit.execution_rounds = 0
122 1
                log.info(f"{circuit} enabled but inactive - activating")
123
                with circuit.lock:
124 1
                    circuit.activate()
125 1
                    circuit.sync()
126 1
            else:
127 1
                circuit.execution_rounds += 1
128 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
129 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
130 1
                    with circuit.lock:
131 1
                        circuit.deploy()
132
        for circuit_id in stored_circuits:
133 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
134
            self._load_evc(stored_circuits[circuit_id])
135
136
    def shutdown(self):
137
        """Execute when your napp is unloaded.
138
139 1
        If you have some cleanup procedure, insert it here.
140 1
        """
141
142
    @rest("/v2/evc/", methods=["GET"])
143
    def list_circuits(self, request: Request) -> JSONResponse:
144
        """Endpoint to return circuits stored.
145
146 1
        archive query arg if defined (not null) will be filtered
147 1
        accordingly, by default only non archived evcs will be listed
148 1
        """
149 1
        log.debug("list_circuits /v2/evc")
150 1
        args = request.query_params
151
        archived = args.get("archived", "false").lower()
152 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
153 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
154
                                                      metadata=args)
155 1
        circuits = circuits['circuits']
156 1
        return JSONResponse(circuits)
157
158
    @rest("/v2/evc/schedule", methods=["GET"])
159
    def list_schedules(self, _request: Request) -> JSONResponse:
160
        """Endpoint to return all schedules stored for all circuits.
161
162
        Return a JSON with the following template:
163
        [{"schedule_id": <schedule_id>,
164 1
         "circuit_id": <circuit_id>,
165 1
         "schedule": <schedule object>}]
166 1
        """
167 1
        log.debug("list_schedules /v2/evc/schedule")
168 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
169 1
        if not circuits:
170
            result = {}
171 1
            status = 200
172 1
            return JSONResponse(result, status_code=status)
173 1
174 1
        result = []
175 1
        status = 200
176 1
        for circuit in circuits:
177 1
            circuit_scheduler = circuit.get("circuit_scheduler")
178
            if circuit_scheduler:
179
                for scheduler in circuit_scheduler:
180
                    value = {
181
                        "schedule_id": scheduler.get("id"),
182 1
                        "circuit_id": circuit.get("id"),
183
                        "schedule": scheduler,
184 1
                    }
185 1
                    result.append(value)
186
187 1
        log.debug("list_schedules result %s %s", result, status)
188 1
        return JSONResponse(result, status_code=status)
189
190 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
191 1
    def get_circuit(self, request: Request) -> JSONResponse:
192 1
        """Endpoint to return a circuit based on id."""
193 1
        circuit_id = request.path_params["circuit_id"]
194 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
195 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
196 1
        if not circuit:
197 1
            result = f"circuit_id {circuit_id} not found"
198 1
            log.debug("get_circuit result %s %s", result, 404)
199 1
            raise HTTPException(404, detail=result)
200
        status = 200
201 1
        log.debug("get_circuit result %s %s", circuit, status)
202 1
        return JSONResponse(circuit, status_code=status)
203 1
204
    # pylint: disable=too-many-branches, too-many-statements
205
    @rest("/v2/evc/", methods=["POST"])
206
    @validate_openapi(spec)
207
    def create_circuit(self, request: Request) -> JSONResponse:
208
        """Try to create a new circuit.
209
210
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
211
        UNI_Z's requested C-VID are available from the interfaces' pools. This
212
        is checked when creating the UNI object.
213
214
        Then, E-Line NApp requests a primary and a backup path to the
215
        Pathfinder NApp using the attributes primary_links and backup_links
216
        submitted via REST
217
218
        # For each link composing paths in #3:
219
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
220
        #  - Using the S-VID obtained, generate abstract flow entries to be
221
        #    sent to FlowManager
222
223
        Push abstract flow entries to FlowManager and FlowManager pushes
224
        OpenFlow entries to datapaths
225
226
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
227
        creation
228 1
229 1
        Finnaly, notify user of the status of its request.
230
        """
231 1
        # Try to create the circuit object
232 1
        log.debug("create_circuit /v2/evc/")
233 1
        data = get_json_or_400(request, self.controller.loop)
234 1
235 1
        try:
236
            evc = self._evc_from_dict(data)
237 1
        except ValueError as exception:
238 1
            log.debug("create_circuit result %s %s", exception, 400)
239 1
            raise HTTPException(400, detail=str(exception)) from exception
240 1
        except KytosTagError as exception:
241 1
            log.debug("create_circuit result %s %s", exception, 400)
242
            raise HTTPException(400, detail=str(exception)) from exception
243
        try:
244
            check_disabled_component(evc.uni_a, evc.uni_z)
245
        except DisabledSwitch as exception:
246 1
            log.debug("create_circuit result %s %s", exception, 409)
247
            raise HTTPException(
248
                    409,
249
                    detail=f"Path is not valid: {exception}"
250
                ) from exception
251
252
        if evc.primary_path:
253
            try:
254
                evc.primary_path.is_valid(
255
                    evc.uni_a.interface.switch,
256
                    evc.uni_z.interface.switch,
257
                    bool(evc.circuit_scheduler),
258 1
                )
259
            except InvalidPath as exception:
260
                raise HTTPException(
261
                    400,
262
                    detail=f"primary_path is not valid: {exception}"
263
                ) from exception
264
        if evc.backup_path:
265
            try:
266
                evc.backup_path.is_valid(
267
                    evc.uni_a.interface.switch,
268
                    evc.uni_z.interface.switch,
269
                    bool(evc.circuit_scheduler),
270
                )
271
            except InvalidPath as exception:
272 1
                raise HTTPException(
273 1
                    400,
274 1
                    detail=f"backup_path is not valid: {exception}"
275 1
                ) from exception
276
277 1
        if self._is_duplicated_evc(evc):
278 1
            result = "The EVC already exists."
279 1
            log.debug("create_circuit result %s %s", result, 409)
280 1
            raise HTTPException(409, detail=result)
281
282 1
        if not evc._tag_lists_equal():
283 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
284
            raise HTTPException(400, detail=detail)
285
286
        try:
287
            evc._validate_has_primary_or_dynamic()
288 1
        except ValueError as exception:
289 1
            raise HTTPException(400, detail=str(exception)) from exception
290
291
        try:
292
            self._use_uni_tags(evc)
293
        except KytosTagError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295
296
        # save circuit
297 1
        try:
298
            evc.sync()
299
        except ValidationError as exception:
300 1
            raise HTTPException(400, detail=str(exception)) from exception
301 1
302 1
        # store circuit in dictionary
303
        self.circuits[evc.id] = evc
304
305 1
        # Schedule the circuit deploy
306 1
        self.sched.add(evc)
307 1
308 1
        # Circuit has no schedule, deploy now
309
        if not evc.circuit_scheduler:
310 1
            with evc.lock:
311
                evc.deploy()
312 1
313 1
        # Notify users
314 1
        result = {"circuit_id": evc.id}
315 1
        status = 201
316 1
        log.debug("create_circuit result %s %s", result, status)
317 1
        emit_event(self.controller, name="created",
318 1
                   content=map_evc_event_content(evc))
319 1
        return JSONResponse(result, status_code=status)
320 1
321 1
    @staticmethod
322 1
    def _use_uni_tags(evc):
323 1
        uni_a = evc.uni_a
324 1
        evc._use_uni_vlan(uni_a)
325
        try:
326 1
            uni_z = evc.uni_z
327 1
            evc._use_uni_vlan(uni_z)
328
        except KytosTagError as err:
329
            evc.make_uni_vlan_available(uni_a)
330
            raise err
331 1
332
    @listen_to('kytos/flow_manager.flow.removed')
333 1
    def on_flow_delete(self, event):
334 1
        """Capture delete messages to keep track when flows got removed."""
335 1
        self.handle_flow_delete(event)
336 1
337 1
    def handle_flow_delete(self, event):
338
        """Keep track when the EVC got flows removed by deriving its cookie."""
339 1
        flow = event.content["flow"]
340 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
341 1
        if evc:
342
            log.debug("Flow removed in EVC %s", evc.id)
343
            evc.set_flow_removed_at()
344
345
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
346
    @validate_openapi(spec)
347 1
    def update(self, request: Request) -> JSONResponse:
348 1
        """Update a circuit based on payload.
349 1
350 1
        The EVC attributes (creation_time, active, current_path,
351 1
        failover_path, _id, archived) can't be updated.
352 1
        """
353 1
        data = get_json_or_400(request, self.controller.loop)
354 1
        circuit_id = request.path_params["circuit_id"]
355 1
        log.debug("update /v2/evc/%s", circuit_id)
356
        try:
357 1
            evc = self.circuits[circuit_id]
358 1
        except KeyError:
359 1
            result = f"circuit_id {circuit_id} not found"
360 1
            log.debug("update result %s %s", result, 404)
361
            raise HTTPException(404, detail=result) from KeyError
362 1
363 1
        if evc.archived:
364
            result = "Can't update archived EVC"
365
            log.debug("update result %s %s", result, 409)
366 1
            raise HTTPException(409, detail=result)
367
368 1
        try:
369 1
            enable, redeploy = evc.update(
370 1
                **self._evc_dict_with_instances(data)
371 1
            )
372 1
        except KytosTagError as exception:
373 1
            raise HTTPException(400, detail=str(exception)) from exception
374 1
        except ValidationError as exception:
375
            raise HTTPException(400, detail=str(exception)) from exception
376
        except ValueError as exception:
377
            log.debug("update result %s %s", exception, 400)
378
            raise HTTPException(400, detail=str(exception)) from exception
379 1
        except DisabledSwitch as exception:
380
            log.debug("update result %s %s", exception, 409)
381
            raise HTTPException(
382
                    409,
383
                    detail=f"Path is not valid: {exception}"
384
                ) from exception
385
386
        if evc.is_active():
387
            if enable is False:  # disable if active
388 1
                with evc.lock:
389 1
                    evc.remove()
390 1
            elif redeploy is not None:  # redeploy if active
391 1
                with evc.lock:
392 1
                    evc.remove()
393
                    evc.deploy()
394 1
        else:
395 1
            if enable is True:  # enable if inactive
396
                with evc.lock:
397 1
                    evc.deploy()
398
        result = {evc.id: evc.as_dict()}
399 1
        status = 200
400 1
401
        log.debug("update result %s %s", result, status)
402
        emit_event(self.controller, "updated",
403
                   content=map_evc_event_content(evc, **data))
404
        return JSONResponse(result, status_code=status)
405
406 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
407 1
    def delete_circuit(self, request: Request) -> JSONResponse:
408 1
        """Remove a circuit.
409 1
410 1
        First, the flows are removed from the switches, and then the EVC is
411 1
        disabled.
412 1
        """
413 1
        circuit_id = request.path_params["circuit_id"]
414
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
415 1
        try:
416 1
            evc = self.circuits[circuit_id]
417 1
        except KeyError:
418 1
            result = f"circuit_id {circuit_id} not found"
419
            log.debug("delete_circuit result %s %s", result, 404)
420 1
            raise HTTPException(404, detail=result) from KeyError
421 1
422 1
        if evc.archived:
423 1
            result = f"Circuit {circuit_id} already removed"
424 1
            log.debug("delete_circuit result %s %s", result, 404)
425 1
            raise HTTPException(404, detail=result)
426 1
427 1
        log.info("Removing %s", evc)
428 1
        with evc.lock:
429 1
            evc.remove_current_flows()
430 1
            evc.remove_failover_flows(sync=False)
431 1
            evc.deactivate()
432 1
            evc.disable()
433
            self.sched.remove(evc)
434 1
            evc.archive()
435 1
            evc.remove_uni_tags()
436
            evc.sync()
437 1
        log.info("EVC removed. %s", evc)
438
        result = {"response": f"Circuit {circuit_id} removed"}
439 1
        status = 200
440 1
441
        log.debug("delete_circuit result %s %s", result, status)
442 1
        emit_event(self.controller, "deleted",
443 1
                   content=map_evc_event_content(evc))
444 1
        return JSONResponse(result, status_code=status)
445
446
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
447
    def get_metadata(self, request: Request) -> JSONResponse:
448
        """Get metadata from an EVC."""
449
        circuit_id = request.path_params["circuit_id"]
450
        try:
451
            return (
452
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
453 1
            )
454 1
        except KeyError as error:
455 1
            raise HTTPException(
456
                404,
457 1
                detail=f"circuit_id {circuit_id} not found."
458 1
            ) from error
459
460 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
461
    @validate_openapi(spec)
462 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
463 1
        """Add metadata to a bulk of EVCs."""
464 1
        data = get_json_or_400(request, self.controller.loop)
465 1
        circuit_ids = data.pop("circuit_ids")
466 1
467 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
468 1
469
        fail_evcs = []
470 1
        for _id in circuit_ids:
471 1
            try:
472 1
                evc = self.circuits[_id]
473
                evc.extend_metadata(data)
474 1
            except KeyError:
475 1
                fail_evcs.append(_id)
476 1
477
        if fail_evcs:
478 1
            raise HTTPException(404, detail=fail_evcs)
479 1
        return JSONResponse("Operation successful", status_code=201)
480 1
481
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
482 1
    @validate_openapi(spec)
483 1
    def add_metadata(self, request: Request) -> JSONResponse:
484 1
        """Add metadata to an EVC."""
485 1
        circuit_id = request.path_params["circuit_id"]
486
        metadata = get_json_or_400(request, self.controller.loop)
487
        if not isinstance(metadata, dict):
488
            raise HTTPException(400, "Invalid metadata value: {metadata}")
489
        try:
490 1
            evc = self.circuits[circuit_id]
491 1
        except KeyError as error:
492 1
            raise HTTPException(
493
                404,
494 1
                detail=f"circuit_id {circuit_id} not found."
495 1
            ) from error
496 1
497
        evc.extend_metadata(metadata)
498 1
        evc.sync()
499 1
        return JSONResponse("Operation successful", status_code=201)
500 1
501 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
502
    @validate_openapi(spec)
503 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
504 1
        """Delete metada from a bulk of EVCs"""
505 1
        data = get_json_or_400(request, self.controller.loop)
506 1
        key = request.path_params["key"]
507 1
        circuit_ids = data.pop("circuit_ids")
508
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
509
510
        fail_evcs = []
511 1
        for _id in circuit_ids:
512
            try:
513 1
                evc = self.circuits[_id]
514
                evc.remove_metadata(key)
515 1
            except KeyError:
516 1
                fail_evcs.append(_id)
517
518 1
        if fail_evcs:
519 1
            raise HTTPException(404, detail=fail_evcs)
520 1
        return JSONResponse("Operation successful")
521 1
522 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
523 1
    def delete_metadata(self, request: Request) -> JSONResponse:
524
        """Delete metadata from an EVC."""
525
        circuit_id = request.path_params["circuit_id"]
526
        key = request.path_params["key"]
527
        try:
528 1
            evc = self.circuits[circuit_id]
529 1
        except KeyError as error:
530 1
            raise HTTPException(
531
                404,
532 1
                detail=f"circuit_id {circuit_id} not found."
533 1
            ) from error
534
535 1
        evc.remove_metadata(key)
536 1
        evc.sync()
537 1
        return JSONResponse("Operation successful")
538 1
539 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
540 1
    def redeploy(self, request: Request) -> JSONResponse:
541
        """Endpoint to force the redeployment of an EVC."""
542
        circuit_id = request.path_params["circuit_id"]
543
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
544 1
        try:
545 1
            evc = self.circuits[circuit_id]
546 1
        except KeyError:
547 1
            raise HTTPException(
548 1
                404,
549 1
                detail=f"circuit_id {circuit_id} not found"
550
            ) from KeyError
551 1
        if evc.is_enabled():
552 1
            with evc.lock:
553
                evc.remove_current_flows()
554 1
                evc.deploy()
555
            result = {"response": f"Circuit {circuit_id} redeploy received."}
556 1
            status = 202
557 1
        else:
558 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
559
            status = 409
560
561
        return JSONResponse(result, status_code=status)
562
563
    @rest("/v2/evc/schedule/", methods=["POST"])
564
    @validate_openapi(spec)
565
    def create_schedule(self, request: Request) -> JSONResponse:
566
        """
567
        Create a new schedule for a given circuit.
568
569
        This service do no check if there are conflicts with another schedule.
570
        Payload example:
571
            {
572
              "circuit_id":"aa:bb:cc",
573
              "schedule": {
574 1
                "date": "2019-08-07T14:52:10.967Z",
575 1
                "interval": "string",
576 1
                "frequency": "1 * * * *",
577 1
                "action": "create"
578
              }
579
            }
580 1
        """
581
        log.debug("create_schedule /v2/evc/schedule/")
582
        data = get_json_or_400(request, self.controller.loop)
583 1
        circuit_id = data["circuit_id"]
584
        schedule_data = data["schedule"]
585
586 1
        # Get EVC from circuits buffer
587 1
        circuits = self._get_circuits_buffer()
588 1
589 1
        # get the circuit
590
        evc = circuits.get(circuit_id)
591 1
592 1
        # get the circuit
593 1
        if not evc:
594 1
            result = f"circuit_id {circuit_id} not found"
595
            log.debug("create_schedule result %s %s", result, 404)
596
            raise HTTPException(404, detail=result)
597 1
        # Can not modify circuits deleted and archived
598
        if evc.archived:
599
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
600 1
            log.debug("create_schedule result %s %s", result, 409)
601 1
            raise HTTPException(409, detail=result)
602
603
        # new schedule from dict
604 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
605
606
        # If there is no schedule, create the list
607 1
        if not evc.circuit_scheduler:
608
            evc.circuit_scheduler = []
609
610 1
        # Add the new schedule
611
        evc.circuit_scheduler.append(new_schedule)
612 1
613 1
        # Add schedule job
614
        self.sched.add_circuit_job(evc, new_schedule)
615 1
616 1
        # save circuit to mongodb
617
        evc.sync()
618 1
619 1
        result = new_schedule.as_dict()
620 1
        status = 201
621
622
        log.debug("create_schedule result %s %s", result, status)
623
        return JSONResponse(result, status_code=status)
624
625
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
626
    @validate_openapi(spec)
627
    def update_schedule(self, request: Request) -> JSONResponse:
628
        """Update a schedule.
629
630
        Change all attributes from the given schedule from a EVC circuit.
631
        The schedule ID is preserved as default.
632
        Payload example:
633 1
            {
634 1
              "date": "2019-08-07T14:52:10.967Z",
635 1
              "interval": "string",
636
              "frequency": "1 * * *",
637
              "action": "create"
638 1
            }
639
        """
640
        data = get_json_or_400(request, self.controller.loop)
641 1
        schedule_id = request.path_params["schedule_id"]
642 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
643 1
644 1
        # Try to find a circuit schedule
645 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
646 1
647 1
        # Can not modify circuits deleted and archived
648 1
        if not found_schedule:
649
            result = f"schedule_id {schedule_id} not found"
650 1
            log.debug("update_schedule result %s %s", result, 404)
651 1
            raise HTTPException(404, detail=result)
652
        if evc.archived:
653 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
654
            log.debug("update_schedule result %s %s", result, 409)
655 1
            raise HTTPException(409, detail=result)
656
657
        new_schedule = CircuitSchedule.from_dict(data)
658 1
        new_schedule.id = found_schedule.id
659
        # Remove the old schedule
660 1
        evc.circuit_scheduler.remove(found_schedule)
661
        # Append the modified schedule
662 1
        evc.circuit_scheduler.append(new_schedule)
663
664 1
        # Cancel all schedule jobs
665 1
        self.sched.cancel_job(found_schedule.id)
666
        # Add the new circuit schedule
667 1
        self.sched.add_circuit_job(evc, new_schedule)
668 1
        # Save EVC to mongodb
669
        evc.sync()
670 1
671 1
        result = new_schedule.as_dict()
672
        status = 200
673
674
        log.debug("update_schedule result %s %s", result, status)
675
        return JSONResponse(result, status_code=status)
676
677
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
678 1
    def delete_schedule(self, request: Request) -> JSONResponse:
679 1
        """Remove a circuit schedule.
680 1
681
        Remove the Schedule from EVC.
682
        Remove the Schedule from cron job.
683 1
        Save the EVC to the Storehouse.
684 1
        """
685 1
        schedule_id = request.path_params["schedule_id"]
686 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
687
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
688 1
689 1
        # Can not modify circuits deleted and archived
690 1
        if not found_schedule:
691 1
            result = f"schedule_id {schedule_id} not found"
692
            log.debug("delete_schedule result %s %s", result, 404)
693
            raise HTTPException(404, detail=result)
694 1
695
        if evc.archived:
696
            result = f"Circuit {evc.id} is archived. Update is forbidden."
697 1
            log.debug("delete_schedule result %s %s", result, 409)
698
            raise HTTPException(409, detail=result)
699 1
700
        # Remove the old schedule
701 1
        evc.circuit_scheduler.remove(found_schedule)
702 1
703
        # Cancel all schedule jobs
704 1
        self.sched.cancel_job(found_schedule.id)
705 1
        # Save EVC to mongodb
706
        evc.sync()
707 1
708
        result = "Schedule removed"
709
        status = 200
710
711
        log.debug("delete_schedule result %s %s", result, status)
712
        return JSONResponse(result, status_code=status)
713
714
    def _is_duplicated_evc(self, evc):
715
        """Verify if the circuit given is duplicated with the stored evcs.
716
717 1
        Args:
718 1
            evc (EVC): circuit to be analysed.
719 1
720 1
        Returns:
721
            boolean: True if the circuit is duplicated, otherwise False.
722 1
723 1
        """
724
        for circuit in tuple(self.circuits.values()):
725
            if not circuit.archived and circuit.shares_uni(evc):
726
                return True
727 1
        return False
728
729 1
    @listen_to("kytos/topology.link_up")
730 1
    def on_link_up(self, event):
731 1
        """Change circuit when link is up or end_maintenance."""
732 1
        self.handle_link_up(event)
733 1
734
    def handle_link_up(self, event):
735 1
        """Change circuit when link is up or end_maintenance."""
736 1
        log.info("Event handle_link_up %s", event.content["link"])
737
        for evc in self.get_evcs_by_svc_level():
738
            if evc.is_enabled() and not evc.archived:
739
                with evc.lock:
740 1
                    evc.handle_link_up(event.content["link"])
741
742 1
    @listen_to("kytos/topology.updated")
743 1
    def on_topology_update(self, event):
744
        """Capture topology update event"""
745
        self.handle_topology_update(event)
746
747
    def handle_topology_update(self, event):
748 1
        """Handle topology update"""
749 1
        with self._lock:
750 1
            if (
751 1
                self._topology_updated_at
752 1
                and self._topology_updated_at > event.timestamp
753
            ):
754
                return
755
            self._topology_updated_at = event.timestamp
756 1
            for evc in self.get_evcs_by_svc_level():
757 1
                if evc.is_enabled() and not evc.archived:
758
                    with evc.lock:
759
                        evc.handle_topology_update(
760
                            event.content["topology"].switches
761 1
                        )
762
763 1
    @listen_to("kytos/topology.link_down")
764 1
    def on_link_down(self, event):
765 1
        """Change circuit when link is down or under_mantenance."""
766 1
        self.handle_link_down(event)
767 1
768 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
769 1
        """Change circuit when link is down or under_mantenance."""
770 1
        link = event.content["link"]
771
        log.info("Event handle_link_down %s", link)
772
        switch_flows = {}
773 1
        evcs_with_failover = []
774
        evcs_normal = []
775
        check_failover = []
776
        for evc in self.get_evcs_by_svc_level():
777 1
            if evc.is_affected_by_link(link):
778 1
                # if there is no failover path, handles link down the
779 1
                # tradditional way
780 1
                if (
781
                    not getattr(evc, 'failover_path', None) or
782 1
                    evc.is_failover_path_affected_by_link(link)
783 1
                ):
784 1
                    evcs_normal.append(evc)
785
                    continue
786
                try:
787 1
                    dpid_flows = evc.get_failover_flows()
788 1
                # pylint: disable=broad-except
789 1
                except Exception:
790 1
                    err = traceback.format_exc().replace("\n", ", ")
791 1
                    log.error(
792 1
                        f"Ignore Failover path for {evc} due to error: {err}"
793
                    )
794 1
                    evcs_normal.append(evc)
795
                    continue
796 1
                for dpid, flows in dpid_flows.items():
797 1
                    switch_flows.setdefault(dpid, [])
798 1
                    switch_flows[dpid].extend(flows)
799 1
                evcs_with_failover.append(evc)
800 1
            else:
801
                check_failover.append(evc)
802
803
        while switch_flows:
804
            offset = settings.BATCH_SIZE or None
805
            switches = list(switch_flows.keys())
806
            for dpid in switches:
807
                emit_event(
808
                    self.controller,
809 1
                    context="kytos.flow_manager",
810 1
                    name="flows.install",
811 1
                    content={
812 1
                        "dpid": dpid,
813 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
814
                    }
815 1
                )
816 1
                if offset is None or offset >= len(switch_flows[dpid]):
817 1
                    del switch_flows[dpid]
818 1
                    continue
819 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
820 1
            time.sleep(settings.BATCH_INTERVAL)
821 1
822
        for evc in evcs_with_failover:
823 1
            with evc.lock:
824
                old_path = evc.current_path
825
                evc.current_path = evc.failover_path
826
                evc.failover_path = old_path
827 1
                evc.sync()
828 1
            emit_event(self.controller, "redeployed_link_down",
829
                       content=map_evc_event_content(evc))
830
            log.info(
831
                f"{evc} redeployed with failover due to link down {link.id}"
832
            )
833
834
        for evc in evcs_normal:
835
            emit_event(
836
                self.controller,
837
                "evc_affected_by_link_down",
838 1
                content={"link_id": link.id} | map_evc_event_content(evc)
839 1
            )
840 1
841 1
        # After handling the hot path, check if new failover paths are needed.
842
        # Note that EVCs affected by link down will generate a KytosEvent for
843 1
        # deployed|redeployed, which will trigger the failover path setup.
844 1
        # Thus, we just need to further check the check_failover list
845
        for evc in check_failover:
846
            if evc.is_failover_path_affected_by_link(link):
847
                with evc.lock:
848 1
                    evc.setup_failover_path()
849
850 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
851 1
    def on_evc_affected_by_link_down(self, event):
852 1
        """Change circuit when link is down or under_mantenance."""
853 1
        self.handle_evc_affected_by_link_down(event)
854 1
855 1
    def handle_evc_affected_by_link_down(self, event):
856 1
        """Change circuit when link is down or under_mantenance."""
857 1
        evc = self.circuits.get(event.content["evc_id"])
858 1
        link_id = event.content['link_id']
859 1
        if not evc:
860 1
            return
861
        with evc.lock:
862
            result = evc.handle_link_down()
863 1
        event_name = "error_redeploy_link_down"
864 1
        if result:
865
            log.info(f"{evc} redeployed due to link down {link_id}")
866
            event_name = "redeployed_link_down"
867
        emit_event(self.controller, event_name,
868 1
                   content=map_evc_event_content(evc))
869
870 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
871 1
    def on_evc_deployed(self, event):
872 1
        """Handle EVC deployed|redeployed_link_down."""
873 1
        self.handle_evc_deployed(event)
874 1
875
    def handle_evc_deployed(self, event):
876 1
        """Setup failover path on evc deployed."""
877 1
        evc = self.circuits.get(event.content["evc_id"])
878
        if not evc:
879
            return
880
        with evc.lock:
881 1
            evc.setup_failover_path()
882
883 1
    @listen_to("kytos/topology.topology_loaded")
884 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
885 1
        """Load EVCs once the topology is available."""
886 1
        self.load_all_evcs()
887
888 1
    def load_all_evcs(self):
889
        """Try to load all EVCs on startup."""
890 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
891 1
        for circuit_id, circuit in circuits:
892 1
            if circuit_id not in self.circuits:
893 1
                self._load_evc(circuit)
894
895
    def _load_evc(self, circuit_dict):
896 1
        """Load one EVC from mongodb to memory."""
897
        try:
898 1
            evc = self._evc_from_dict(circuit_dict)
899 1
        except ValueError as exception:
900 1
            log.error(
901 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
902 1
            )
903 1
            return None
904 1
        except KytosTagError as exception:
905
            log.error(
906 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
907 1
            )
908
            return None
909
        if evc.archived:
910
            return None
911 1
        evc.deactivate()
912
        evc.sync()
913 1
        self.circuits.setdefault(evc.id, evc)
914 1
        self.sched.add(evc)
915 1
        return evc
916
917 1
    @listen_to("kytos/flow_manager.flow.error")
918 1
    def on_flow_mod_error(self, event):
919 1
        """Handle flow mod errors related to an EVC."""
920 1
        self.handle_flow_mod_error(event)
921
922 1
    def handle_flow_mod_error(self, event):
923
        """Handle flow mod errors related to an EVC."""
924
        flow = event.content["flow"]
925
        command = event.content.get("error_command")
926
        if command != "add":
927 1
            return
928 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
929
        if evc:
930
            with evc.lock:
931 1
                evc.remove_current_flows()
932 1
933 1
    def _evc_dict_with_instances(self, evc_dict):
934 1
        """Convert some dict values to instance of EVC classes.
935 1
936 1
        This method will convert: [UNI, Link]
937
        """
938 1
        data = evc_dict.copy()  # Do not modify the original dict
939 1
        for attribute, value in data.items():
940 1
            # Get multiple attributes.
941 1
            # Ex: uni_a, uni_z
942
            if "uni" in attribute:
943
                try:
944
                    data[attribute] = self._uni_from_dict(value)
945
                except ValueError as exception:
946
                    result = "Error creating UNI: Invalid value"
947
                    raise ValueError(result) from exception
948
949 1
            if attribute == "circuit_scheduler":
950 1
                data[attribute] = []
951
                for schedule in value:
952
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
953
954
            # Get multiple attributes.
955
            # Ex: primary_links,
956
            #     backup_links,
957 1
            #     current_links_cache,
958 1
            #     primary_links_cache,
959
            #     backup_links_cache
960
            if "links" in attribute:
961
                data[attribute] = [
962 1
                    self._link_from_dict(link) for link in value
963
                ]
964 1
965 1
            # Ex: current_path,
966 1
            #     primary_path,
967 1
            #     backup_path
968
            if "path" in attribute and attribute != "dynamic_backup_path":
969 1
                data[attribute] = Path(
970
                    [self._link_from_dict(link) for link in value]
971 1
                )
972 1
973
        return data
974 1
975 1
    def _evc_from_dict(self, evc_dict):
976 1
        data = self._evc_dict_with_instances(evc_dict)
977 1
        data["table_group"] = self.table_group
978
        return EVC(self.controller, **data)
979
980
    def _uni_from_dict(self, uni_dict):
981 1
        """Return a UNI object from python dict."""
982 1
        if uni_dict is None:
983 1
            return False
984 1
985 1
        interface_id = uni_dict.get("interface_id")
986 1
        interface = self.controller.get_interface_by_id(interface_id)
987 1
        if interface is None:
988 1
            result = (
989
                "Error creating UNI:"
990 1
                + f"Could not instantiate interface {interface_id}"
991 1
            )
992
            raise ValueError(result) from ValueError
993 1
        tag_convert = {1: "vlan"}
994
        tag_dict = uni_dict.get("tag", None)
995 1
        if tag_dict:
996
            tag_type = tag_dict.get("tag_type")
997 1
            tag_type = tag_convert.get(tag_type, tag_type)
998 1
            tag_value = tag_dict.get("value")
999
            if isinstance(tag_value, list):
1000 1
                tag_value = get_tag_ranges(tag_value)
1001 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1002 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1003 1
            else:
1004 1
                tag = TAG(tag_type, tag_value)
1005 1
        else:
1006
            tag = None
1007
        uni = UNI(interface, tag)
1008
        return uni
1009 1
1010 1
    def _link_from_dict(self, link_dict):
1011 1
        """Return a Link object from python dict."""
1012
        id_a = link_dict.get("endpoint_a").get("id")
1013 1
        id_b = link_dict.get("endpoint_b").get("id")
1014 1
1015 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1016 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1017
        if not endpoint_a:
1018
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1019 1
            raise ValueError(error_msg)
1020 1
        if not endpoint_b:
1021
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1022 1
            raise ValueError(error_msg)
1023
1024
        link = Link(endpoint_a, endpoint_b)
1025
        if "metadata" in link_dict:
1026
            link.extend_metadata(link_dict.get("metadata"))
1027
1028
        s_vlan = link.get_metadata("s_vlan")
1029 1
        if s_vlan:
1030 1
            tag = TAG.from_dict(s_vlan)
1031 1
            if tag is False:
1032
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1033
                raise ValueError(error_msg)
1034 1
            link.update_metadata("s_vlan", tag)
1035 1
        return link
1036 1
1037 1
    def _find_evc_by_schedule_id(self, schedule_id):
1038 1
        """
1039 1
        Find an EVC and CircuitSchedule based on schedule_id.
1040 1
1041 1
        :param schedule_id: Schedule ID
1042 1
        :return: EVC and Schedule
1043
        """
1044 1
        circuits = self._get_circuits_buffer()
1045
        found_schedule = None
1046
        evc = None
1047
1048
        # pylint: disable=unused-variable
1049
        for c_id, circuit in circuits.items():
1050 1
            for schedule in circuit.circuit_scheduler:
1051
                if schedule.id == schedule_id:
1052 1
                    found_schedule = schedule
1053 1
                    evc = circuit
1054 1
                    break
1055 1
            if found_schedule:
1056 1
                break
1057
        return evc, found_schedule
1058
1059 1
    def _get_circuits_buffer(self):
1060 1
        """
1061
        Return the circuit buffer.
1062 1
1063 1
        If the buffer is empty, try to load data from mongodb.
1064
        """
1065 1
        if not self.circuits:
1066 1
            # Load circuits from mongodb to buffer
1067 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1068
            for c_id, circuit in circuits.items():
1069
                evc = self._evc_from_dict(circuit)
1070 1
                self.circuits[c_id] = evc
1071 1
        return self.circuits
1072 1
1073 1
    # pylint: disable=attribute-defined-outside-init
1074 1
    @alisten_to("kytos/of_multi_table.enable_table")
1075
    async def on_table_enabled(self, event):
1076
        """Handle a recently table enabled."""
1077
        table_group = event.content.get("mef_eline", None)
1078
        if not table_group:
1079
            return
1080
        for group in table_group:
1081
            if group not in settings.TABLE_GROUP_ALLOWED:
1082
                log.error(f'The table group "{group}" is not allowed for '
1083
                          f'mef_eline. Allowed table groups are '
1084
                          f'{settings.TABLE_GROUP_ALLOWED}')
1085
                return
1086
        self.table_group.update(table_group)
1087
        content = {"group_table": self.table_group}
1088
        name = "kytos/mef_eline.enable_table"
1089
        await aemit_event(self.controller, name, content)
1090