Passed
Push — master ( 946b37...897710 )
by Vinicius
02:55 queued 27s
created

build.main   F

Complexity

Total Complexity 202

Size/Duplication

Total Lines 1119
Duplicated Lines 3.57 %

Test Coverage

Coverage 93.37%

Importance

Changes 0
Metric Value
eloc 748
dl 40
loc 1119
ccs 620
cts 664
cp 0.9337
rs 1.852
c 0
b 0
f 0
wmc 202

49 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.list_circuits() 0 15 1
A Main._use_uni_tags() 0 10 2
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.setup() 0 28 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 7 2
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 113 14
B Main.list_schedules() 0 31 5
C Main.should_be_checked() 0 16 9
C Main.execute_consistency() 0 26 9
A Main.add_metadata() 0 19 3
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.update_schedule() 0 51 3
A Main.handle_evc_deployed() 0 7 3
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 7 3
A Main.handle_interface_link_up() 0 8 2
A Main.get_metadata() 0 13 2
A Main.handle_link_up() 0 7 5
A Main.delete_metadata() 0 16 2
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 36 3
A Main.handle_evc_affected_by_link_down() 0 14 4
A Main.on_evc_deployed() 0 4 1
F Main.handle_link_down() 0 81 17
B Main._link_from_dict() 0 26 6
B Main._uni_from_dict() 0 29 5
A Main.redeploy() 0 23 4
A Main.handle_flow_mod_error() 0 10 4
A Main.handle_interface_link_down() 0 8 2
A Main.bulk_add_metadata() 20 20 4
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main.bulk_delete_metadata() 20 20 4
F Main._check_no_tag_duplication() 0 25 14
A Main._load_evc() 0 15 3
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 61 4
A Main._get_circuits_buffer() 0 13 3
A Main.on_interface_link_change() 0 15 4
A Main.delete_circuit() 0 39 4
F Main.update() 0 66 16

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10 1
from typing import Optional
11
12 1
from pydantic import ValidationError
13
14 1
from kytos.core import KytosNApp, log, rest
15 1
from kytos.core.events import KytosEvent
16 1
from kytos.core.exceptions import KytosTagError
17 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
18
                                validate_openapi)
19 1
from kytos.core.interface import TAG, UNI, TAGRange
20 1
from kytos.core.link import Link
21 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
22
                                 get_json_or_400)
23 1
from kytos.core.tag_ranges import get_tag_ranges
24 1
from napps.kytos.mef_eline import controllers, settings
25 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
26
                                              DuplicatedNoTagUNI, InvalidPath)
27 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
28
                                          Path)
29 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
30 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
31
                                         emit_event, get_vlan_tags_and_masks,
32
                                         map_evc_event_content)
33
34
35
# pylint: disable=too-many-public-methods
36 1
class Main(KytosNApp):
37
    """Main class of amlight/mef_eline NApp.
38
39
    This class is the entry point for this napp.
40
    """
41
42 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
43
44 1
    def setup(self):
45
        """Replace the '__init__' method for the KytosNApp subclass.
46
47
        The setup method is automatically called by the controller when your
48
        application is loaded.
49
50
        So, if you have any setup routine, insert it here.
51
        """
52
        # object used to scheduler circuit events
53 1
        self.sched = Scheduler()
54
55
        # object to save and load circuits
56 1
        self.mongo_controller = self.get_eline_controller()
57 1
        self.mongo_controller.bootstrap_indexes()
58
59
        # set the controller that will manager the dynamic paths
60 1
        DynamicPathManager.set_controller(self.controller)
61
62
        # dictionary of EVCs created. It acts as a circuit buffer.
63
        # Every create/update/delete must be synced to mongodb.
64 1
        self.circuits = {}
65
66 1
        self.table_group = {"epl": 0, "evpl": 0}
67 1
        self._lock = Lock()
68 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
69
70 1
        self.load_all_evcs()
71 1
        self._topology_updated_at = None
72
73 1
    def get_evcs_by_svc_level(self) -> list:
74
        """Get circuits sorted by desc service level and asc creation_time.
75
76
        In the future, as more ops are offloaded it should be get from the DB.
77
        """
78 1
        return sorted(self.circuits.values(),
79
                      key=lambda x: (-x.service_level, x.creation_time))
80
81 1
    @staticmethod
82 1
    def get_eline_controller():
83
        """Return the ELineController instance."""
84
        return controllers.ELineController()
85
86 1
    def execute(self):
87
        """Execute once when the napp is running."""
88 1
        if self._lock.locked():
89 1
            return
90 1
        log.debug("Starting consistency routine")
91 1
        with self._lock:
92 1
            self.execute_consistency()
93 1
        log.debug("Finished consistency routine")
94
95 1
    def should_be_checked(self, circuit):
96
        "Verify if the circuit meets the necessary conditions to be checked"
97
        # pylint: disable=too-many-boolean-expressions
98 1
        if (
99
                circuit.is_enabled()
100
                and not circuit.is_active()
101
                and not circuit.lock.locked()
102
                and not circuit.has_recent_removed_flow()
103
                and not circuit.is_recent_updated()
104
                and circuit.are_unis_active(self.controller.switches)
105
                # if a inter-switch EVC does not have current_path, it does not
106
                # make sense to run sdntrace on it
107
                and (circuit.is_intra_switch() or circuit.current_path)
108
                ):
109 1
            return True
110
        return False
111
112 1
    def execute_consistency(self):
113
        """Execute consistency routine."""
114 1
        circuits_to_check = []
115 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
116 1
        for circuit in self.get_evcs_by_svc_level():
117 1
            stored_circuits.pop(circuit.id, None)
118 1
            if self.should_be_checked(circuit):
119 1
                circuits_to_check.append(circuit)
120 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
121 1
        for circuit in circuits_to_check:
122 1
            is_checked = circuits_checked.get(circuit.id)
123 1
            if is_checked:
124 1
                circuit.execution_rounds = 0
125 1
                log.info(f"{circuit} enabled but inactive - activating")
126 1
                with circuit.lock:
127 1
                    circuit.activate()
128 1
                    circuit.sync()
129
            else:
130 1
                circuit.execution_rounds += 1
131 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
132 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
133 1
                    with circuit.lock:
134 1
                        circuit.deploy()
135 1
        for circuit_id in stored_circuits:
136 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
137 1
            self._load_evc(stored_circuits[circuit_id])
138
139 1
    def shutdown(self):
140
        """Execute when your napp is unloaded.
141
142
        If you have some cleanup procedure, insert it here.
143
        """
144
145 1
    @rest("/v2/evc/", methods=["GET"])
146 1
    def list_circuits(self, request: Request) -> JSONResponse:
147
        """Endpoint to return circuits stored.
148
149
        archive query arg if defined (not null) will be filtered
150
        accordingly, by default only non archived evcs will be listed
151
        """
152 1
        log.debug("list_circuits /v2/evc")
153 1
        args = request.query_params
154 1
        archived = args.get("archived", "false").lower()
155 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
156 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
157
                                                      metadata=args)
158 1
        circuits = circuits['circuits']
159 1
        return JSONResponse(circuits)
160
161 1
    @rest("/v2/evc/schedule", methods=["GET"])
162 1
    def list_schedules(self, _request: Request) -> JSONResponse:
163
        """Endpoint to return all schedules stored for all circuits.
164
165
        Return a JSON with the following template:
166
        [{"schedule_id": <schedule_id>,
167
         "circuit_id": <circuit_id>,
168
         "schedule": <schedule object>}]
169
        """
170 1
        log.debug("list_schedules /v2/evc/schedule")
171 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
172 1
        if not circuits:
173 1
            result = {}
174 1
            status = 200
175 1
            return JSONResponse(result, status_code=status)
176
177 1
        result = []
178 1
        status = 200
179 1
        for circuit in circuits:
180 1
            circuit_scheduler = circuit.get("circuit_scheduler")
181 1
            if circuit_scheduler:
182 1
                for scheduler in circuit_scheduler:
183 1
                    value = {
184
                        "schedule_id": scheduler.get("id"),
185
                        "circuit_id": circuit.get("id"),
186
                        "schedule": scheduler,
187
                    }
188 1
                    result.append(value)
189
190 1
        log.debug("list_schedules result %s %s", result, status)
191 1
        return JSONResponse(result, status_code=status)
192
193 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
194 1
    def get_circuit(self, request: Request) -> JSONResponse:
195
        """Endpoint to return a circuit based on id."""
196 1
        circuit_id = request.path_params["circuit_id"]
197 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
198 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
199 1
        if not circuit:
200 1
            result = f"circuit_id {circuit_id} not found"
201 1
            log.debug("get_circuit result %s %s", result, 404)
202 1
            raise HTTPException(404, detail=result)
203 1
        status = 200
204 1
        log.debug("get_circuit result %s %s", circuit, status)
205 1
        return JSONResponse(circuit, status_code=status)
206
207
    # pylint: disable=too-many-branches, too-many-statements
208 1
    @rest("/v2/evc/", methods=["POST"])
209 1
    @validate_openapi(spec)
210 1
    def create_circuit(self, request: Request) -> JSONResponse:
211
        """Try to create a new circuit.
212
213
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
214
        UNI_Z's requested C-VID are available from the interfaces' pools. This
215
        is checked when creating the UNI object.
216
217
        Then, E-Line NApp requests a primary and a backup path to the
218
        Pathfinder NApp using the attributes primary_links and backup_links
219
        submitted via REST
220
221
        # For each link composing paths in #3:
222
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
223
        #  - Using the S-VID obtained, generate abstract flow entries to be
224
        #    sent to FlowManager
225
226
        Push abstract flow entries to FlowManager and FlowManager pushes
227
        OpenFlow entries to datapaths
228
229
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
230
        creation
231
232
        Finnaly, notify user of the status of its request.
233
        """
234
        # Try to create the circuit object
235 1
        log.debug("create_circuit /v2/evc/")
236 1
        data = get_json_or_400(request, self.controller.loop)
237
238 1
        try:
239 1
            evc = self._evc_from_dict(data)
240 1
        except (ValueError, KytosTagError) as exception:
241 1
            log.debug("create_circuit result %s %s", exception, 400)
242 1
            raise HTTPException(400, detail=str(exception)) from exception
243 1
        try:
244 1
            check_disabled_component(evc.uni_a, evc.uni_z)
245 1
        except DisabledSwitch as exception:
246 1
            log.debug("create_circuit result %s %s", exception, 409)
247 1
            raise HTTPException(
248
                    409,
249
                    detail=f"Path is not valid: {exception}"
250
                ) from exception
251
252 1
        if evc.primary_path:
253 1
            try:
254 1
                evc.primary_path.is_valid(
255
                    evc.uni_a.interface.switch,
256
                    evc.uni_z.interface.switch,
257
                    bool(evc.circuit_scheduler),
258
                )
259 1
            except InvalidPath as exception:
260 1
                raise HTTPException(
261
                    400,
262
                    detail=f"primary_path is not valid: {exception}"
263
                ) from exception
264 1
        if evc.backup_path:
265 1
            try:
266 1
                evc.backup_path.is_valid(
267
                    evc.uni_a.interface.switch,
268
                    evc.uni_z.interface.switch,
269
                    bool(evc.circuit_scheduler),
270
                )
271 1
            except InvalidPath as exception:
272 1
                raise HTTPException(
273
                    400,
274
                    detail=f"backup_path is not valid: {exception}"
275
                ) from exception
276
277 1
        if not evc._tag_lists_equal():
278 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
279 1
            raise HTTPException(400, detail=detail)
280
281 1
        try:
282 1
            evc._validate_has_primary_or_dynamic()
283 1
        except ValueError as exception:
284 1
            raise HTTPException(400, detail=str(exception)) from exception
285
286 1
        try:
287 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
288
        except DuplicatedNoTagUNI as exception:
289
            log.debug("create_circuit result %s %s", exception, 409)
290
            raise HTTPException(409, detail=str(exception)) from exception
291
292 1
        try:
293 1
            self._use_uni_tags(evc)
294 1
        except KytosTagError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297
        # save circuit
298 1
        try:
299 1
            evc.sync()
300
        except ValidationError as exception:
301
            raise HTTPException(400, detail=str(exception)) from exception
302
303
        # store circuit in dictionary
304 1
        self.circuits[evc.id] = evc
305
306
        # Schedule the circuit deploy
307 1
        self.sched.add(evc)
308
309
        # Circuit has no schedule, deploy now
310 1
        if not evc.circuit_scheduler:
311 1
            with evc.lock:
312 1
                evc.deploy()
313
314
        # Notify users
315 1
        result = {"circuit_id": evc.id}
316 1
        status = 201
317 1
        log.debug("create_circuit result %s %s", result, status)
318 1
        emit_event(self.controller, name="created",
319
                   content=map_evc_event_content(evc))
320 1
        return JSONResponse(result, status_code=status)
321
322 1
    @staticmethod
323 1
    def _use_uni_tags(evc):
324 1
        uni_a = evc.uni_a
325 1
        evc._use_uni_vlan(uni_a)
326 1
        try:
327 1
            uni_z = evc.uni_z
328 1
            evc._use_uni_vlan(uni_z)
329 1
        except KytosTagError as err:
330 1
            evc.make_uni_vlan_available(uni_a)
331 1
            raise err
332
333 1
    @listen_to('kytos/flow_manager.flow.removed')
334 1
    def on_flow_delete(self, event):
335
        """Capture delete messages to keep track when flows got removed."""
336
        self.handle_flow_delete(event)
337
338 1
    def handle_flow_delete(self, event):
339
        """Keep track when the EVC got flows removed by deriving its cookie."""
340 1
        flow = event.content["flow"]
341 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
342 1
        if evc:
343 1
            log.debug("Flow removed in EVC %s", evc.id)
344 1
            evc.set_flow_removed_at()
345
346 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
347 1
    @validate_openapi(spec)
348 1
    def update(self, request: Request) -> JSONResponse:
349
        """Update a circuit based on payload.
350
351
        The EVC attributes (creation_time, active, current_path,
352
        failover_path, _id, archived) can't be updated.
353
        """
354 1
        data = get_json_or_400(request, self.controller.loop)
355 1
        circuit_id = request.path_params["circuit_id"]
356 1
        log.debug("update /v2/evc/%s", circuit_id)
357 1
        try:
358 1
            evc = self.circuits[circuit_id]
359 1
        except KeyError:
360 1
            result = f"circuit_id {circuit_id} not found"
361 1
            log.debug("update result %s %s", result, 404)
362 1
            raise HTTPException(404, detail=result) from KeyError
363
364 1
        if evc.archived:
365 1
            result = "Can't update archived EVC"
366 1
            log.debug("update result %s %s", result, 409)
367 1
            raise HTTPException(409, detail=result)
368
369 1
        try:
370 1
            updated_data = self._evc_dict_with_instances(data)
371 1
            self._check_no_tag_duplication(
372
                circuit_id, updated_data.get("uni_a"),
373
                updated_data.get("uni_z")
374
            )
375 1
            enable, redeploy = evc.update(**updated_data)
376 1
        except (ValueError, KytosTagError, ValidationError) as exception:
377 1
            log.debug("update result %s %s", exception, 400)
378 1
            raise HTTPException(400, detail=str(exception)) from exception
379 1
        except DuplicatedNoTagUNI as exception:
380
            log.debug("update result %s %s", exception, 409)
381
            raise HTTPException(409, detail=str(exception)) from exception
382 1
        except DisabledSwitch as exception:
383 1
            log.debug("update result %s %s", exception, 409)
384 1
            raise HTTPException(
385
                    409,
386
                    detail=f"Path is not valid: {exception}"
387
                ) from exception
388
389 1
        if evc.is_active():
390
            if enable is False:  # disable if active
391
                with evc.lock:
392
                    evc.remove()
393
            elif redeploy is not None:  # redeploy if active
394
                with evc.lock:
395
                    evc.remove()
396
                    evc.deploy()
397
        else:
398 1
            if enable is True:  # enable if inactive
399 1
                with evc.lock:
400 1
                    evc.deploy()
401 1
            elif evc.is_enabled() and redeploy:
402 1
                with evc.lock:
403 1
                    evc.remove()
404 1
                    evc.deploy()
405 1
        result = {evc.id: evc.as_dict()}
406 1
        status = 200
407
408 1
        log.debug("update result %s %s", result, status)
409 1
        emit_event(self.controller, "updated",
410
                   content=map_evc_event_content(evc, **data))
411 1
        return JSONResponse(result, status_code=status)
412
413 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
414 1
    def delete_circuit(self, request: Request) -> JSONResponse:
415
        """Remove a circuit.
416
417
        First, the flows are removed from the switches, and then the EVC is
418
        disabled.
419
        """
420 1
        circuit_id = request.path_params["circuit_id"]
421 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
422 1
        try:
423 1
            evc = self.circuits[circuit_id]
424 1
        except KeyError:
425 1
            result = f"circuit_id {circuit_id} not found"
426 1
            log.debug("delete_circuit result %s %s", result, 404)
427 1
            raise HTTPException(404, detail=result) from KeyError
428
429 1
        if evc.archived:
430 1
            result = f"Circuit {circuit_id} already removed"
431 1
            log.debug("delete_circuit result %s %s", result, 404)
432 1
            raise HTTPException(404, detail=result)
433
434 1
        log.info("Removing %s", evc)
435 1
        with evc.lock:
436 1
            evc.remove_current_flows()
437 1
            evc.remove_failover_flows(sync=False)
438 1
            evc.deactivate()
439 1
            evc.disable()
440 1
            self.sched.remove(evc)
441 1
            evc.archive()
442 1
            evc.remove_uni_tags()
443 1
            evc.sync()
444 1
        log.info("EVC removed. %s", evc)
445 1
        result = {"response": f"Circuit {circuit_id} removed"}
446 1
        status = 200
447
448 1
        log.debug("delete_circuit result %s %s", result, status)
449 1
        emit_event(self.controller, "deleted",
450
                   content=map_evc_event_content(evc))
451 1
        return JSONResponse(result, status_code=status)
452
453 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
454 1
    def get_metadata(self, request: Request) -> JSONResponse:
455
        """Get metadata from an EVC."""
456 1
        circuit_id = request.path_params["circuit_id"]
457 1
        try:
458 1
            return (
459
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
460
            )
461
        except KeyError as error:
462
            raise HTTPException(
463
                404,
464
                detail=f"circuit_id {circuit_id} not found."
465
            ) from error
466
467 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
468 1
    @validate_openapi(spec)
469 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
470
        """Add metadata to a bulk of EVCs."""
471 1
        data = get_json_or_400(request, self.controller.loop)
472 1
        circuit_ids = data.pop("circuit_ids")
473
474 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
475
476 1
        fail_evcs = []
477 1
        for _id in circuit_ids:
478 1
            try:
479 1
                evc = self.circuits[_id]
480 1
                evc.extend_metadata(data)
481 1
            except KeyError:
482 1
                fail_evcs.append(_id)
483
484 1
        if fail_evcs:
485 1
            raise HTTPException(404, detail=fail_evcs)
486 1
        return JSONResponse("Operation successful", status_code=201)
487
488 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
489 1
    @validate_openapi(spec)
490 1
    def add_metadata(self, request: Request) -> JSONResponse:
491
        """Add metadata to an EVC."""
492 1
        circuit_id = request.path_params["circuit_id"]
493 1
        metadata = get_json_or_400(request, self.controller.loop)
494 1
        if not isinstance(metadata, dict):
495
            raise HTTPException(400, "Invalid metadata value: {metadata}")
496 1
        try:
497 1
            evc = self.circuits[circuit_id]
498 1
        except KeyError as error:
499 1
            raise HTTPException(
500
                404,
501
                detail=f"circuit_id {circuit_id} not found."
502
            ) from error
503
504 1
        evc.extend_metadata(metadata)
505 1
        evc.sync()
506 1
        return JSONResponse("Operation successful", status_code=201)
507
508 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
509 1
    @validate_openapi(spec)
510 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
511
        """Delete metada from a bulk of EVCs"""
512 1
        data = get_json_or_400(request, self.controller.loop)
513 1
        key = request.path_params["key"]
514 1
        circuit_ids = data.pop("circuit_ids")
515 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
516
517 1
        fail_evcs = []
518 1
        for _id in circuit_ids:
519 1
            try:
520 1
                evc = self.circuits[_id]
521 1
                evc.remove_metadata(key)
522 1
            except KeyError:
523 1
                fail_evcs.append(_id)
524
525 1
        if fail_evcs:
526 1
            raise HTTPException(404, detail=fail_evcs)
527 1
        return JSONResponse("Operation successful")
528
529 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
530 1
    def delete_metadata(self, request: Request) -> JSONResponse:
531
        """Delete metadata from an EVC."""
532 1
        circuit_id = request.path_params["circuit_id"]
533 1
        key = request.path_params["key"]
534 1
        try:
535 1
            evc = self.circuits[circuit_id]
536 1
        except KeyError as error:
537 1
            raise HTTPException(
538
                404,
539
                detail=f"circuit_id {circuit_id} not found."
540
            ) from error
541
542 1
        evc.remove_metadata(key)
543 1
        evc.sync()
544 1
        return JSONResponse("Operation successful")
545
546 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
547 1
    def redeploy(self, request: Request) -> JSONResponse:
548
        """Endpoint to force the redeployment of an EVC."""
549 1
        circuit_id = request.path_params["circuit_id"]
550 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
551 1
        try:
552 1
            evc = self.circuits[circuit_id]
553 1
        except KeyError:
554 1
            raise HTTPException(
555
                404,
556
                detail=f"circuit_id {circuit_id} not found"
557
            ) from KeyError
558 1
        if evc.is_enabled():
559 1
            with evc.lock:
560 1
                evc.remove_current_flows()
561 1
                evc.deploy()
562 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
563 1
            status = 202
564
        else:
565 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
566 1
            status = 409
567
568 1
        return JSONResponse(result, status_code=status)
569
570 1
    @rest("/v2/evc/schedule/", methods=["POST"])
571 1
    @validate_openapi(spec)
572 1
    def create_schedule(self, request: Request) -> JSONResponse:
573
        """
574
        Create a new schedule for a given circuit.
575
576
        This service do no check if there are conflicts with another schedule.
577
        Payload example:
578
            {
579
              "circuit_id":"aa:bb:cc",
580
              "schedule": {
581
                "date": "2019-08-07T14:52:10.967Z",
582
                "interval": "string",
583
                "frequency": "1 * * * *",
584
                "action": "create"
585
              }
586
            }
587
        """
588 1
        log.debug("create_schedule /v2/evc/schedule/")
589 1
        data = get_json_or_400(request, self.controller.loop)
590 1
        circuit_id = data["circuit_id"]
591 1
        schedule_data = data["schedule"]
592
593
        # Get EVC from circuits buffer
594 1
        circuits = self._get_circuits_buffer()
595
596
        # get the circuit
597 1
        evc = circuits.get(circuit_id)
598
599
        # get the circuit
600 1
        if not evc:
601 1
            result = f"circuit_id {circuit_id} not found"
602 1
            log.debug("create_schedule result %s %s", result, 404)
603 1
            raise HTTPException(404, detail=result)
604
        # Can not modify circuits deleted and archived
605 1
        if evc.archived:
606 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
607 1
            log.debug("create_schedule result %s %s", result, 409)
608 1
            raise HTTPException(409, detail=result)
609
610
        # new schedule from dict
611 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
612
613
        # If there is no schedule, create the list
614 1
        if not evc.circuit_scheduler:
615 1
            evc.circuit_scheduler = []
616
617
        # Add the new schedule
618 1
        evc.circuit_scheduler.append(new_schedule)
619
620
        # Add schedule job
621 1
        self.sched.add_circuit_job(evc, new_schedule)
622
623
        # save circuit to mongodb
624 1
        evc.sync()
625
626 1
        result = new_schedule.as_dict()
627 1
        status = 201
628
629 1
        log.debug("create_schedule result %s %s", result, status)
630 1
        return JSONResponse(result, status_code=status)
631
632 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
633 1
    @validate_openapi(spec)
634 1
    def update_schedule(self, request: Request) -> JSONResponse:
635
        """Update a schedule.
636
637
        Change all attributes from the given schedule from a EVC circuit.
638
        The schedule ID is preserved as default.
639
        Payload example:
640
            {
641
              "date": "2019-08-07T14:52:10.967Z",
642
              "interval": "string",
643
              "frequency": "1 * * *",
644
              "action": "create"
645
            }
646
        """
647 1
        data = get_json_or_400(request, self.controller.loop)
648 1
        schedule_id = request.path_params["schedule_id"]
649 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
650
651
        # Try to find a circuit schedule
652 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
653
654
        # Can not modify circuits deleted and archived
655 1
        if not found_schedule:
656 1
            result = f"schedule_id {schedule_id} not found"
657 1
            log.debug("update_schedule result %s %s", result, 404)
658 1
            raise HTTPException(404, detail=result)
659 1
        if evc.archived:
660 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
661 1
            log.debug("update_schedule result %s %s", result, 409)
662 1
            raise HTTPException(409, detail=result)
663
664 1
        new_schedule = CircuitSchedule.from_dict(data)
665 1
        new_schedule.id = found_schedule.id
666
        # Remove the old schedule
667 1
        evc.circuit_scheduler.remove(found_schedule)
668
        # Append the modified schedule
669 1
        evc.circuit_scheduler.append(new_schedule)
670
671
        # Cancel all schedule jobs
672 1
        self.sched.cancel_job(found_schedule.id)
673
        # Add the new circuit schedule
674 1
        self.sched.add_circuit_job(evc, new_schedule)
675
        # Save EVC to mongodb
676 1
        evc.sync()
677
678 1
        result = new_schedule.as_dict()
679 1
        status = 200
680
681 1
        log.debug("update_schedule result %s %s", result, status)
682 1
        return JSONResponse(result, status_code=status)
683
684 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
685 1
    def delete_schedule(self, request: Request) -> JSONResponse:
686
        """Remove a circuit schedule.
687
688
        Remove the Schedule from EVC.
689
        Remove the Schedule from cron job.
690
        Save the EVC to the Storehouse.
691
        """
692 1
        schedule_id = request.path_params["schedule_id"]
693 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
694 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
695
696
        # Can not modify circuits deleted and archived
697 1
        if not found_schedule:
698 1
            result = f"schedule_id {schedule_id} not found"
699 1
            log.debug("delete_schedule result %s %s", result, 404)
700 1
            raise HTTPException(404, detail=result)
701
702 1
        if evc.archived:
703 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
704 1
            log.debug("delete_schedule result %s %s", result, 409)
705 1
            raise HTTPException(409, detail=result)
706
707
        # Remove the old schedule
708 1
        evc.circuit_scheduler.remove(found_schedule)
709
710
        # Cancel all schedule jobs
711 1
        self.sched.cancel_job(found_schedule.id)
712
        # Save EVC to mongodb
713 1
        evc.sync()
714
715 1
        result = "Schedule removed"
716 1
        status = 200
717
718 1
        log.debug("delete_schedule result %s %s", result, status)
719 1
        return JSONResponse(result, status_code=status)
720
721 1
    def _check_no_tag_duplication(
722
        self,
723
        evc_id: str,
724
        uni_a: Optional[UNI] = None,
725
        uni_z: Optional[UNI] = None
726
    ):
727
        """Check if the given EVC has UNIs with no tag and if these are
728
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
729
        Args:
730
            evc (dict): EVC to be analyzed.
731
        """
732
733
        # No UNIs
734 1
        if not (uni_a or uni_z):
735 1
            return
736
737 1
        if (not (uni_a and not uni_a.user_tag) and
738
                not (uni_z and not uni_z.user_tag)):
739 1
            return
740 1
        for circuit in self.circuits.copy().values():
741 1
            if (not circuit.archived and circuit._id != evc_id):
742 1
                if uni_a and uni_a.user_tag is None:
743 1
                    circuit.check_no_tag_duplicate(uni_a)
744 1
                if uni_z and uni_z.user_tag is None:
745 1
                    circuit.check_no_tag_duplicate(uni_z)
746
747 1
    @listen_to("kytos/topology.link_up")
748 1
    def on_link_up(self, event):
749
        """Change circuit when link is up or end_maintenance."""
750
        self.handle_link_up(event)
751
752 1
    def handle_link_up(self, event):
753
        """Change circuit when link is up or end_maintenance."""
754 1
        log.info("Event handle_link_up %s", event.content["link"])
755 1
        for evc in self.get_evcs_by_svc_level():
756 1
            if evc.is_enabled() and not evc.archived:
757 1
                with evc.lock:
758 1
                    evc.handle_link_up(event.content["link"])
759
760
    # Possibly replace this with interruptions?
761 1
    @listen_to(
762
        '.*.switch.interface.(link_up|link_down|created|deleted)',
763
        pool='dynamic_single'
764
    )
765 1
    def on_interface_link_change(self, event: KytosEvent):
766
        """
767
        Handler for interface link_up and link_down events
768
        """
769
        with self._lock:
770
            _, _, event_type = event.name.rpartition('.')
771
            iface = event.content.get("interface")
772
            if event_type in ('link_up', 'created'):
773
                self.handle_interface_link_up(iface)
774
            elif event_type in ('link_down', 'deleted'):
775
                self.handle_interface_link_down(iface)
776
777 1
    def handle_interface_link_up(self, interface):
778
        """
779
        Handler for interface link_up events
780
        """
781
        for evc in self.get_evcs_by_svc_level():
782
            log.info("Event handle_interface_link_up %s", interface)
783
            evc.handle_interface_link_up(
784
                interface
785
            )
786
787 1
    def handle_interface_link_down(self, interface):
788
        """
789
        Handler for interface link_down events
790
        """
791
        for evc in self.get_evcs_by_svc_level():
792
            log.info("Event handle_interface_link_down %s", interface)
793
            evc.handle_interface_link_down(
794
                interface
795
            )
796
797 1
    @listen_to("kytos/topology.link_down")
798 1
    def on_link_down(self, event):
799
        """Change circuit when link is down or under_mantenance."""
800
        self.handle_link_down(event)
801
802 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
803
        """Change circuit when link is down or under_mantenance."""
804 1
        link = event.content["link"]
805 1
        log.info("Event handle_link_down %s", link)
806 1
        switch_flows = {}
807 1
        evcs_with_failover = []
808 1
        evcs_normal = []
809 1
        check_failover = []
810 1
        for evc in self.get_evcs_by_svc_level():
811 1
            if evc.is_affected_by_link(link):
812
                # if there is no failover path, handles link down the
813
                # tradditional way
814 1
                if (
815
                    not getattr(evc, 'failover_path', None) or
816
                    evc.is_failover_path_affected_by_link(link)
817
                ):
818 1
                    evcs_normal.append(evc)
819 1
                    continue
820 1
                try:
821 1
                    dpid_flows = evc.get_failover_flows()
822
                # pylint: disable=broad-except
823 1
                except Exception:
824 1
                    err = traceback.format_exc().replace("\n", ", ")
825 1
                    log.error(
826
                        f"Ignore Failover path for {evc} due to error: {err}"
827
                    )
828 1
                    evcs_normal.append(evc)
829 1
                    continue
830 1
                for dpid, flows in dpid_flows.items():
831 1
                    switch_flows.setdefault(dpid, [])
832 1
                    switch_flows[dpid].extend(flows)
833 1
                evcs_with_failover.append(evc)
834
            else:
835 1
                check_failover.append(evc)
836
837 1
        while switch_flows:
838 1
            offset = settings.BATCH_SIZE or None
839 1
            switches = list(switch_flows.keys())
840 1
            for dpid in switches:
841 1
                emit_event(
842
                    self.controller,
843
                    context="kytos.flow_manager",
844
                    name="flows.install",
845
                    content={
846
                        "dpid": dpid,
847
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
848
                    }
849
                )
850 1
                if offset is None or offset >= len(switch_flows[dpid]):
851 1
                    del switch_flows[dpid]
852 1
                    continue
853 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
854 1
            time.sleep(settings.BATCH_INTERVAL)
855
856 1
        for evc in evcs_with_failover:
857 1
            with evc.lock:
858 1
                old_path = evc.current_path
859 1
                evc.current_path = evc.failover_path
860 1
                evc.failover_path = old_path
861 1
                evc.sync()
862 1
            emit_event(self.controller, "redeployed_link_down",
863
                       content=map_evc_event_content(evc))
864 1
            log.info(
865
                f"{evc} redeployed with failover due to link down {link.id}"
866
            )
867
868 1
        for evc in evcs_normal:
869 1
            emit_event(
870
                self.controller,
871
                "evc_affected_by_link_down",
872
                content={"link_id": link.id} | map_evc_event_content(evc)
873
            )
874
875
        # After handling the hot path, check if new failover paths are needed.
876
        # Note that EVCs affected by link down will generate a KytosEvent for
877
        # deployed|redeployed, which will trigger the failover path setup.
878
        # Thus, we just need to further check the check_failover list
879 1
        for evc in check_failover:
880 1
            if evc.is_failover_path_affected_by_link(link):
881 1
                with evc.lock:
882 1
                    evc.setup_failover_path()
883
884 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
885 1
    def on_evc_affected_by_link_down(self, event):
886
        """Change circuit when link is down or under_mantenance."""
887
        self.handle_evc_affected_by_link_down(event)
888
889 1
    def handle_evc_affected_by_link_down(self, event):
890
        """Change circuit when link is down or under_mantenance."""
891 1
        evc = self.circuits.get(event.content["evc_id"])
892 1
        link_id = event.content['link_id']
893 1
        if not evc:
894 1
            return
895 1
        with evc.lock:
896 1
            result = evc.handle_link_down()
897 1
        event_name = "error_redeploy_link_down"
898 1
        if result:
899 1
            log.info(f"{evc} redeployed due to link down {link_id}")
900 1
            event_name = "redeployed_link_down"
901 1
        emit_event(self.controller, event_name,
902
                   content=map_evc_event_content(evc))
903
904 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
905 1
    def on_evc_deployed(self, event):
906
        """Handle EVC deployed|redeployed_link_down."""
907
        self.handle_evc_deployed(event)
908
909 1
    def handle_evc_deployed(self, event):
910
        """Setup failover path on evc deployed."""
911 1
        evc = self.circuits.get(event.content["evc_id"])
912 1
        if not evc:
913 1
            return
914 1
        with evc.lock:
915 1
            evc.setup_failover_path()
916
917 1
    @listen_to("kytos/topology.topology_loaded")
918 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
919
        """Load EVCs once the topology is available."""
920
        self.load_all_evcs()
921
922 1
    def load_all_evcs(self):
923
        """Try to load all EVCs on startup."""
924 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
925 1
        for circuit_id, circuit in circuits:
926 1
            if circuit_id not in self.circuits:
927 1
                self._load_evc(circuit)
928 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits))
929
930 1
    def _load_evc(self, circuit_dict):
931
        """Load one EVC from mongodb to memory."""
932 1
        try:
933 1
            evc = self._evc_from_dict(circuit_dict)
934 1
        except (ValueError, KytosTagError) as exception:
935 1
            log.error(
936
                f"Could not load EVC: dict={circuit_dict} error={exception}"
937
            )
938 1
            return None
939 1
        if evc.archived:
940 1
            return None
941
942 1
        self.circuits.setdefault(evc.id, evc)
943 1
        self.sched.add(evc)
944 1
        return evc
945
946 1
    @listen_to("kytos/flow_manager.flow.error")
947 1
    def on_flow_mod_error(self, event):
948
        """Handle flow mod errors related to an EVC."""
949
        self.handle_flow_mod_error(event)
950
951 1
    def handle_flow_mod_error(self, event):
952
        """Handle flow mod errors related to an EVC."""
953 1
        flow = event.content["flow"]
954 1
        command = event.content.get("error_command")
955 1
        if command != "add":
956
            return
957 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
958 1
        if evc:
959 1
            with evc.lock:
960 1
                evc.remove_current_flows()
961
962 1
    def _evc_dict_with_instances(self, evc_dict):
963
        """Convert some dict values to instance of EVC classes.
964
965
        This method will convert: [UNI, Link]
966
        """
967 1
        data = evc_dict.copy()  # Do not modify the original dict
968 1
        for attribute, value in data.items():
969
            # Get multiple attributes.
970
            # Ex: uni_a, uni_z
971 1
            if "uni" in attribute:
972 1
                try:
973 1
                    data[attribute] = self._uni_from_dict(value)
974 1
                except ValueError as exception:
975 1
                    result = "Error creating UNI: Invalid value"
976 1
                    raise ValueError(result) from exception
977
978 1
            if attribute == "circuit_scheduler":
979 1
                data[attribute] = []
980 1
                for schedule in value:
981 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
982
983
            # Get multiple attributes.
984
            # Ex: primary_links,
985
            #     backup_links,
986
            #     current_links_cache,
987
            #     primary_links_cache,
988
            #     backup_links_cache
989 1
            if "links" in attribute:
990 1
                data[attribute] = [
991
                    self._link_from_dict(link) for link in value
992
                ]
993
994
            # Ex: current_path,
995
            #     primary_path,
996
            #     backup_path
997 1
            if "path" in attribute and attribute != "dynamic_backup_path":
998 1
                data[attribute] = Path(
999
                    [self._link_from_dict(link) for link in value]
1000
                )
1001
1002 1
        return data
1003
1004 1
    def _evc_from_dict(self, evc_dict):
1005 1
        data = self._evc_dict_with_instances(evc_dict)
1006 1
        data["table_group"] = self.table_group
1007 1
        return EVC(self.controller, **data)
1008
1009 1
    def _uni_from_dict(self, uni_dict):
1010
        """Return a UNI object from python dict."""
1011 1
        if uni_dict is None:
1012 1
            return False
1013
1014 1
        interface_id = uni_dict.get("interface_id")
1015 1
        interface = self.controller.get_interface_by_id(interface_id)
1016 1
        if interface is None:
1017 1
            result = (
1018
                "Error creating UNI:"
1019
                + f"Could not instantiate interface {interface_id}"
1020
            )
1021 1
            raise ValueError(result) from ValueError
1022 1
        tag_convert = {1: "vlan"}
1023 1
        tag_dict = uni_dict.get("tag", None)
1024 1
        if tag_dict:
1025 1
            tag_type = tag_dict.get("tag_type")
1026 1
            tag_type = tag_convert.get(tag_type, tag_type)
1027 1
            tag_value = tag_dict.get("value")
1028 1
            if isinstance(tag_value, list):
1029 1
                tag_value = get_tag_ranges(tag_value)
1030 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1031 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1032
            else:
1033 1
                tag = TAG(tag_type, tag_value)
1034
        else:
1035 1
            tag = None
1036 1
        uni = UNI(interface, tag)
1037 1
        return uni
1038
1039 1
    def _link_from_dict(self, link_dict):
1040
        """Return a Link object from python dict."""
1041 1
        id_a = link_dict.get("endpoint_a").get("id")
1042 1
        id_b = link_dict.get("endpoint_b").get("id")
1043
1044 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1045 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1046 1
        if not endpoint_a:
1047 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1048 1
            raise ValueError(error_msg)
1049 1
        if not endpoint_b:
1050
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1051
            raise ValueError(error_msg)
1052
1053 1
        link = Link(endpoint_a, endpoint_b)
1054 1
        if "metadata" in link_dict:
1055 1
            link.extend_metadata(link_dict.get("metadata"))
1056
1057 1
        s_vlan = link.get_metadata("s_vlan")
1058 1
        if s_vlan:
1059 1
            tag = TAG.from_dict(s_vlan)
1060 1
            if tag is False:
1061
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1062
                raise ValueError(error_msg)
1063 1
            link.update_metadata("s_vlan", tag)
1064 1
        return link
1065
1066 1
    def _find_evc_by_schedule_id(self, schedule_id):
1067
        """
1068
        Find an EVC and CircuitSchedule based on schedule_id.
1069
1070
        :param schedule_id: Schedule ID
1071
        :return: EVC and Schedule
1072
        """
1073 1
        circuits = self._get_circuits_buffer()
1074 1
        found_schedule = None
1075 1
        evc = None
1076
1077
        # pylint: disable=unused-variable
1078 1
        for c_id, circuit in circuits.items():
1079 1
            for schedule in circuit.circuit_scheduler:
1080 1
                if schedule.id == schedule_id:
1081 1
                    found_schedule = schedule
1082 1
                    evc = circuit
1083 1
                    break
1084 1
            if found_schedule:
1085 1
                break
1086 1
        return evc, found_schedule
1087
1088 1
    def _get_circuits_buffer(self):
1089
        """
1090
        Return the circuit buffer.
1091
1092
        If the buffer is empty, try to load data from mongodb.
1093
        """
1094 1
        if not self.circuits:
1095
            # Load circuits from mongodb to buffer
1096 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1097 1
            for c_id, circuit in circuits.items():
1098 1
                evc = self._evc_from_dict(circuit)
1099 1
                self.circuits[c_id] = evc
1100 1
        return self.circuits
1101
1102
    # pylint: disable=attribute-defined-outside-init
1103 1
    @alisten_to("kytos/of_multi_table.enable_table")
1104 1
    async def on_table_enabled(self, event):
1105
        """Handle a recently table enabled."""
1106 1
        table_group = event.content.get("mef_eline", None)
1107 1
        if not table_group:
1108 1
            return
1109 1
        for group in table_group:
1110 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1111 1
                log.error(f'The table group "{group}" is not allowed for '
1112
                          f'mef_eline. Allowed table groups are '
1113
                          f'{settings.TABLE_GROUP_ALLOWED}')
1114 1
                return
1115 1
        self.table_group.update(table_group)
1116 1
        content = {"group_table": self.table_group}
1117 1
        name = "kytos/mef_eline.enable_table"
1118
        await aemit_event(self.controller, name, content)
1119