Passed
Push — master ( 06a873...24a68d )
by Aldo
05:04
created

build.main.Main.update()   D

Complexity

Conditions 12

Size

Total Lines 58
Code Lines 49

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 32
CRAP Score 14.4102

Importance

Changes 0
Metric Value
cc 12
eloc 49
nop 2
dl 0
loc 58
ccs 32
cts 43
cp 0.7442
crap 14.4102
rs 4.8
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.update() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40
41
# pylint: disable=too-many-public-methods
42 1
class Main(KytosNApp):
43
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46
    """
47
48 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49
50 1
    def setup(self):
51
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57
        """
58
        # object used to scheduler circuit events
59 1
        self.sched = Scheduler()
60
61
        # object to save and load circuits
62 1
        self.mongo_controller = self.get_eline_controller()
63 1
        self.mongo_controller.bootstrap_indexes()
64
65
        # set the controller that will manager the dynamic paths
66 1
        DynamicPathManager.set_controller(self.controller)
67
68
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70 1
        self.circuits = dict[str, EVC]()
71
72 1
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75 1
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78
79 1
        self.load_all_evcs()
80 1
        self._topology_updated_at = None
81
82 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83
        """Get circuits sorted by desc service level and asc creation_time.
84
85
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87 1
        if enable_filter:
88 1
            return sorted(
89
                          [circuit for circuit in self.circuits.values()
90
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93 1
        return sorted(self.circuits.values(),
94
                      key=lambda x: (-x.service_level, x.creation_time))
95
96 1
    @staticmethod
97 1
    def get_eline_controller():
98
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100
101 1
    def execute(self):
102
        """Execute once when the napp is running."""
103 1
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106 1
        with self._lock:
107 1
            self.execute_consistency()
108 1
        log.debug("Finished consistency routine")
109
110 1
    def should_be_checked(self, circuit):
111
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113 1
        if (
114
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124 1
            return True
125 1
        return False
126
127 1
    def execute_consistency(self):
128
        """Execute consistency routine."""
129 1
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            if (
134
                circuit.is_active()
135
                and not circuit.failover_path
136
                and circuit.is_eligible_for_failover_path()
137
            ):
138 1
                emit_event(
139
                    self.controller,
140
                    "need_failover",
141
                    content=map_evc_event_content(circuit)
142
                )
143 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
144 1
        for circuit in circuits_to_check:
145 1
            is_checked = circuits_checked.get(circuit.id)
146 1
            if is_checked:
147 1
                circuit.execution_rounds = 0
148 1
                log.info(f"{circuit} enabled but inactive - activating")
149 1
                with circuit.lock:
150 1
                    circuit.activate()
151 1
                    circuit.sync()
152
            else:
153 1
                circuit.execution_rounds += 1
154 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
155 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
156 1
                    with circuit.lock:
157 1
                        circuit.deploy()
158
159 1
    def shutdown(self):
160
        """Execute when your napp is unloaded.
161
162
        If you have some cleanup procedure, insert it here.
163
        """
164
165 1
    @rest("/v2/evc/", methods=["GET"])
166 1
    def list_circuits(self, request: Request) -> JSONResponse:
167
        """Endpoint to return circuits stored.
168
169
        archive query arg if defined (not null) will be filtered
170
        accordingly, by default only non archived evcs will be listed
171
        """
172 1
        log.debug("list_circuits /v2/evc")
173 1
        args = request.query_params
174 1
        archived = args.get("archived", "false").lower()
175 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
176 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
177
                                                      metadata=args)
178 1
        circuits = circuits['circuits']
179 1
        return JSONResponse(circuits)
180
181 1
    @rest("/v2/evc/schedule", methods=["GET"])
182 1
    def list_schedules(self, _request: Request) -> JSONResponse:
183
        """Endpoint to return all schedules stored for all circuits.
184
185
        Return a JSON with the following template:
186
        [{"schedule_id": <schedule_id>,
187
         "circuit_id": <circuit_id>,
188
         "schedule": <schedule object>}]
189
        """
190 1
        log.debug("list_schedules /v2/evc/schedule")
191 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
192 1
        if not circuits:
193 1
            result = {}
194 1
            status = 200
195 1
            return JSONResponse(result, status_code=status)
196
197 1
        result = []
198 1
        status = 200
199 1
        for circuit in circuits:
200 1
            circuit_scheduler = circuit.get("circuit_scheduler")
201 1
            if circuit_scheduler:
202 1
                for scheduler in circuit_scheduler:
203 1
                    value = {
204
                        "schedule_id": scheduler.get("id"),
205
                        "circuit_id": circuit.get("id"),
206
                        "schedule": scheduler,
207
                    }
208 1
                    result.append(value)
209
210 1
        log.debug("list_schedules result %s %s", result, status)
211 1
        return JSONResponse(result, status_code=status)
212
213 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
214 1
    def get_circuit(self, request: Request) -> JSONResponse:
215
        """Endpoint to return a circuit based on id."""
216 1
        circuit_id = request.path_params["circuit_id"]
217 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
218 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
219 1
        if not circuit:
220 1
            result = f"circuit_id {circuit_id} not found"
221 1
            log.debug("get_circuit result %s %s", result, 404)
222 1
            raise HTTPException(404, detail=result)
223 1
        status = 200
224 1
        log.debug("get_circuit result %s %s", circuit, status)
225 1
        return JSONResponse(circuit, status_code=status)
226
227
    # pylint: disable=too-many-branches, too-many-statements
228 1
    @rest("/v2/evc/", methods=["POST"])
229 1
    @validate_openapi(spec)
230 1
    def create_circuit(self, request: Request) -> JSONResponse:
231
        """Try to create a new circuit.
232
233
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
234
        UNI_Z's requested C-VID are available from the interfaces' pools. This
235
        is checked when creating the UNI object.
236
237
        Then, E-Line NApp requests a primary and a backup path to the
238
        Pathfinder NApp using the attributes primary_links and backup_links
239
        submitted via REST
240
241
        # For each link composing paths in #3:
242
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
243
        #  - Using the S-VID obtained, generate abstract flow entries to be
244
        #    sent to FlowManager
245
246
        Push abstract flow entries to FlowManager and FlowManager pushes
247
        OpenFlow entries to datapaths
248
249
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
250
        creation
251
252
        Finnaly, notify user of the status of its request.
253
        """
254
        # Try to create the circuit object
255 1
        log.debug("create_circuit /v2/evc/")
256 1
        data = get_json_or_400(request, self.controller.loop)
257
258 1
        try:
259 1
            evc = self._evc_from_dict(data)
260 1
        except (ValueError, KytosTagError) as exception:
261 1
            log.debug("create_circuit result %s %s", exception, 400)
262 1
            raise HTTPException(400, detail=str(exception)) from exception
263 1
        if evc.primary_path:
264
            try:
265
                evc.primary_path.is_valid(
266
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272
                    400,
273
                    detail=f"primary_path is not valid: {exception}"
274
                ) from exception
275 1
        if evc.backup_path:
276
            try:
277
                evc.backup_path.is_valid(
278
                    evc.uni_a.interface.switch,
279
                    evc.uni_z.interface.switch,
280
                    bool(evc.circuit_scheduler),
281
                )
282
            except InvalidPath as exception:
283
                raise HTTPException(
284
                    400,
285
                    detail=f"backup_path is not valid: {exception}"
286
                ) from exception
287
288 1
        if not evc._tag_lists_equal():
289
            detail = "UNI_A and UNI_Z tag lists should be the same."
290
            raise HTTPException(400, detail=detail)
291
292 1
        try:
293 1
            evc._validate_has_primary_or_dynamic()
294 1
        except ValueError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297 1
        try:
298 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
299
        except DuplicatedNoTagUNI as exception:
300
            log.debug("create_circuit result %s %s", exception, 409)
301
            raise HTTPException(409, detail=str(exception)) from exception
302
303 1
        try:
304 1
            self._use_uni_tags(evc)
305 1
        except KytosTagError as exception:
306 1
            raise HTTPException(400, detail=str(exception)) from exception
307
308
        # save circuit
309 1
        try:
310 1
            evc.sync()
311
        except ValidationError as exception:
312
            raise HTTPException(400, detail=str(exception)) from exception
313
314
        # store circuit in dictionary
315 1
        self.circuits[evc.id] = evc
316
317
        # Schedule the circuit deploy
318 1
        self.sched.add(evc)
319
320
        # Circuit has no schedule, deploy now
321 1
        deployed = False
322 1
        if not evc.circuit_scheduler:
323 1
            with evc.lock:
324 1
                deployed = evc.deploy()
325
326
        # Notify users
327 1
        result = {"circuit_id": evc.id, "deployed": deployed}
328 1
        status = 201
329 1
        log.debug("create_circuit result %s %s", result, status)
330 1
        emit_event(self.controller, name="created",
331
                   content=map_evc_event_content(evc))
332 1
        return JSONResponse(result, status_code=status)
333
334 1
    @staticmethod
335 1
    def _use_uni_tags(evc):
336 1
        uni_a = evc.uni_a
337 1
        evc._use_uni_vlan(uni_a)
338 1
        try:
339 1
            uni_z = evc.uni_z
340 1
            evc._use_uni_vlan(uni_z)
341 1
        except KytosTagError as err:
342 1
            evc.make_uni_vlan_available(uni_a)
343 1
            raise err
344
345 1
    @listen_to('kytos/flow_manager.flow.removed')
346 1
    def on_flow_delete(self, event):
347
        """Capture delete messages to keep track when flows got removed."""
348
        self.handle_flow_delete(event)
349
350 1
    def handle_flow_delete(self, event):
351
        """Keep track when the EVC got flows removed by deriving its cookie."""
352 1
        flow = event.content["flow"]
353 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
354 1
        if evc:
355 1
            log.debug("Flow removed in EVC %s", evc.id)
356 1
            evc.set_flow_removed_at()
357
358 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
359 1
    @validate_openapi(spec)
360 1
    def update(self, request: Request) -> JSONResponse:
361
        """Update a circuit based on payload.
362
363
        The EVC attributes (creation_time, active, current_path,
364
        failover_path, _id, archived) can't be updated.
365
        """
366 1
        data = get_json_or_400(request, self.controller.loop)
367 1
        circuit_id = request.path_params["circuit_id"]
368 1
        log.debug("update /v2/evc/%s", circuit_id)
369 1
        try:
370 1
            evc = self.circuits[circuit_id]
371 1
        except KeyError:
372 1
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("update result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375
376 1
        with evc.lock:
377 1
            try:
378 1
                updated_data = self._evc_dict_with_instances(data)
379 1
                self._check_no_tag_duplication(
380
                    circuit_id, updated_data.get("uni_a"),
381
                    updated_data.get("uni_z")
382
                )
383 1
                enable, redeploy = evc.update(**updated_data)
384 1
            except (ValueError, KytosTagError, ValidationError) as exception:
385 1
                log.debug("update result %s %s", exception, 400)
386 1
                raise HTTPException(400, detail=str(exception)) from exception
387
            except DuplicatedNoTagUNI as exception:
388
                log.debug("update result %s %s", exception, 409)
389
                raise HTTPException(409, detail=str(exception)) from exception
390
            except DisabledSwitch as exception:
391
                log.debug("update result %s %s", exception, 409)
392
                raise HTTPException(
393
                        409,
394
                        detail=f"Path is not valid: {exception}"
395
                    ) from exception
396 1
            redeployed = False
397 1
            if evc.is_active():
398
                if enable is False:  # disable if active
399
                    evc.remove()
400
                elif redeploy is not None:  # redeploy if active
401
                    evc.remove()
402
                    redeployed = evc.deploy()
403
            else:
404 1
                if enable is True:  # enable if inactive
405 1
                    redeployed = evc.deploy()
406 1
                elif evc.is_enabled() and redeploy:
407 1
                    evc.remove()
408 1
                    redeployed = evc.deploy()
409 1
            result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
410 1
            status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432 1
        log.info("Removing %s", evc)
433
434 1
        with evc.lock:
435 1
            if not evc.archived:
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.remove_current_flows(sync=False)
440 1
                evc.remove_failover_flows(sync=False)
441 1
                evc.archive()
442 1
                evc.remove_uni_tags()
443 1
                evc.sync()
444 1
                emit_event(
445
                    self.controller, "deleted",
446
                    content=map_evc_event_content(evc)
447
                )
448
449 1
        log.info("EVC removed. %s", evc)
450 1
        result = {"response": f"Circuit {circuit_id} removed"}
451 1
        status = 200
452 1
        log.debug("delete_circuit result %s %s", result, status)
453
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs_metadata(
519
            circuit_ids, {key: ""}, "del"
520
        )
521
522 1
        fail_evcs = []
523 1
        for _id in circuit_ids:
524 1
            try:
525 1
                evc = self.circuits[_id]
526 1
                evc.remove_metadata(key)
527 1
            except KeyError:
528 1
                fail_evcs.append(_id)
529
530 1
        if fail_evcs:
531 1
            raise HTTPException(404, detail=fail_evcs)
532 1
        return JSONResponse("Operation successful")
533
534 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
535 1
    def delete_metadata(self, request: Request) -> JSONResponse:
536
        """Delete metadata from an EVC."""
537 1
        circuit_id = request.path_params["circuit_id"]
538 1
        key = request.path_params["key"]
539 1
        try:
540 1
            evc = self.circuits[circuit_id]
541 1
        except KeyError as error:
542 1
            raise HTTPException(
543
                404,
544
                detail=f"circuit_id {circuit_id} not found."
545
            ) from error
546
547 1
        evc.remove_metadata(key)
548 1
        evc.sync()
549 1
        return JSONResponse("Operation successful")
550
551 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
552 1
    def redeploy(self, request: Request) -> JSONResponse:
553
        """Endpoint to force the redeployment of an EVC."""
554 1
        circuit_id = request.path_params["circuit_id"]
555 1
        try_avoid_same_s_vlan = request.query_params.get(
556
            "try_avoid_same_s_vlan", "true"
557
        )
558 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
559 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
560
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
561
            raise HTTPException(400, detail=msg)
562 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
563 1
        try:
564 1
            evc = self.circuits[circuit_id]
565 1
        except KeyError:
566 1
            raise HTTPException(
567
                404,
568
                detail=f"circuit_id {circuit_id} not found"
569
            ) from KeyError
570 1
        deployed = False
571 1
        with evc.lock:
572 1
            if evc.is_enabled():
573 1
                path_dict = evc.remove_current_flows(
574
                    sync=False,
575
                    return_path=try_avoid_same_s_vlan == "true"
576
                )
577 1
                evc.remove_failover_flows(sync=True)
578 1
                deployed = evc.deploy(path_dict)
579 1
        if deployed:
580 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
581 1
            status = 202
582
        else:
583 1
            result = {
584
                "response": f"Circuit {circuit_id} is disabled."
585
            }
586 1
            status = 409
587
588 1
        return JSONResponse(result, status_code=status)
589
590 1
    @rest("/v2/evc/schedule/", methods=["POST"])
591 1
    @validate_openapi(spec)
592 1
    def create_schedule(self, request: Request) -> JSONResponse:
593
        """
594
        Create a new schedule for a given circuit.
595
596
        This service do no check if there are conflicts with another schedule.
597
        Payload example:
598
            {
599
              "circuit_id":"aa:bb:cc",
600
              "schedule": {
601
                "date": "2019-08-07T14:52:10.967Z",
602
                "interval": "string",
603
                "frequency": "1 * * * *",
604
                "action": "create"
605
              }
606
            }
607
        """
608 1
        log.debug("create_schedule /v2/evc/schedule/")
609 1
        data = get_json_or_400(request, self.controller.loop)
610 1
        circuit_id = data["circuit_id"]
611 1
        schedule_data = data["schedule"]
612
613
        # Get EVC from circuits buffer
614 1
        circuits = self._get_circuits_buffer()
615
616
        # get the circuit
617 1
        evc = circuits.get(circuit_id)
618
619
        # get the circuit
620 1
        if not evc:
621 1
            result = f"circuit_id {circuit_id} not found"
622 1
            log.debug("create_schedule result %s %s", result, 404)
623 1
            raise HTTPException(404, detail=result)
624
625
        # new schedule from dict
626 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
627
628
        # If there is no schedule, create the list
629 1
        if not evc.circuit_scheduler:
630 1
            evc.circuit_scheduler = []
631
632
        # Add the new schedule
633 1
        evc.circuit_scheduler.append(new_schedule)
634
635
        # Add schedule job
636 1
        self.sched.add_circuit_job(evc, new_schedule)
637
638
        # save circuit to mongodb
639 1
        evc.sync()
640
641 1
        result = new_schedule.as_dict()
642 1
        status = 201
643
644 1
        log.debug("create_schedule result %s %s", result, status)
645 1
        return JSONResponse(result, status_code=status)
646
647 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
648 1
    @validate_openapi(spec)
649 1
    def update_schedule(self, request: Request) -> JSONResponse:
650
        """Update a schedule.
651
652
        Change all attributes from the given schedule from a EVC circuit.
653
        The schedule ID is preserved as default.
654
        Payload example:
655
            {
656
              "date": "2019-08-07T14:52:10.967Z",
657
              "interval": "string",
658
              "frequency": "1 * * *",
659
              "action": "create"
660
            }
661
        """
662 1
        data = get_json_or_400(request, self.controller.loop)
663 1
        schedule_id = request.path_params["schedule_id"]
664 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
665
666
        # Try to find a circuit schedule
667 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
668
669
        # Can not modify circuits deleted and archived
670 1
        if not found_schedule:
671 1
            result = f"schedule_id {schedule_id} not found"
672 1
            log.debug("update_schedule result %s %s", result, 404)
673 1
            raise HTTPException(404, detail=result)
674
675 1
        new_schedule = CircuitSchedule.from_dict(data)
676 1
        new_schedule.id = found_schedule.id
677
        # Remove the old schedule
678 1
        evc.circuit_scheduler.remove(found_schedule)
679
        # Append the modified schedule
680 1
        evc.circuit_scheduler.append(new_schedule)
681
682
        # Cancel all schedule jobs
683 1
        self.sched.cancel_job(found_schedule.id)
684
        # Add the new circuit schedule
685 1
        self.sched.add_circuit_job(evc, new_schedule)
686
        # Save EVC to mongodb
687 1
        evc.sync()
688
689 1
        result = new_schedule.as_dict()
690 1
        status = 200
691
692 1
        log.debug("update_schedule result %s %s", result, status)
693 1
        return JSONResponse(result, status_code=status)
694
695 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
696 1
    def delete_schedule(self, request: Request) -> JSONResponse:
697
        """Remove a circuit schedule.
698
699
        Remove the Schedule from EVC.
700
        Remove the Schedule from cron job.
701
        Save the EVC to the Storehouse.
702
        """
703 1
        schedule_id = request.path_params["schedule_id"]
704 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
705 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
706
707
        # Can not modify circuits deleted and archived
708 1
        if not found_schedule:
709 1
            result = f"schedule_id {schedule_id} not found"
710 1
            log.debug("delete_schedule result %s %s", result, 404)
711 1
            raise HTTPException(404, detail=result)
712
713
        # Remove the old schedule
714 1
        evc.circuit_scheduler.remove(found_schedule)
715
716
        # Cancel all schedule jobs
717 1
        self.sched.cancel_job(found_schedule.id)
718
        # Save EVC to mongodb
719 1
        evc.sync()
720
721 1
        result = "Schedule removed"
722 1
        status = 200
723
724 1
        log.debug("delete_schedule result %s %s", result, status)
725 1
        return JSONResponse(result, status_code=status)
726
727 1
    def _check_no_tag_duplication(
728
        self,
729
        evc_id: str,
730
        uni_a: Optional[UNI] = None,
731
        uni_z: Optional[UNI] = None
732
    ):
733
        """Check if the given EVC has UNIs with no tag and if these are
734
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
735
        Args:
736
            evc (dict): EVC to be analyzed.
737
        """
738
739
        # No UNIs
740 1
        if not (uni_a or uni_z):
741 1
            return
742
743 1
        if (not (uni_a and not uni_a.user_tag) and
744
                not (uni_z and not uni_z.user_tag)):
745 1
            return
746 1
        for circuit in self.circuits.copy().values():
747 1
            if (not circuit.archived and circuit._id != evc_id):
748 1
                if uni_a and uni_a.user_tag is None:
749 1
                    circuit.check_no_tag_duplicate(uni_a)
750 1
                if uni_z and uni_z.user_tag is None:
751 1
                    circuit.check_no_tag_duplicate(uni_z)
752
753 1
    @listen_to("kytos/topology.link_up")
754 1
    def on_link_up(self, event):
755
        """Change circuit when link is up or end_maintenance."""
756
        self.handle_link_up(event)
757
758 1
    def handle_link_up(self, event):
759
        """Change circuit when link is up or end_maintenance."""
760 1
        log.info("Event handle_link_up %s", event.content["link"])
761 1
        for evc in self.get_evcs_by_svc_level():
762 1
            if evc.is_enabled() and not evc.archived:
763 1
                with evc.lock:
764 1
                    evc.handle_link_up(event.content["link"])
765
766
    # Possibly replace this with interruptions?
767 1
    @listen_to(
768
        '.*.switch.interface.(link_up|link_down|created|deleted)',
769
        '.*.interface.(disabled|enabled|up|down)'
770
    )
771 1
    def on_interface_link_change(self, event: KytosEvent):
772
        """
773
        Handler for interface link_up and link_down events.
774
        """
775
        self.handle_on_interface_link_change(event)
776
777 1
    def handle_on_interface_link_change(self, event: KytosEvent):
778
        """
779
        Handler to sort interface events {link_(up, down), create, deleted}
780
781
        To avoid multiple database updated (link flap):
782
        Every interface is identfied and processed in parallel.
783
        Once an interface event is received a time is started.
784
        While time is running self._intf_events will be updated.
785
        After time has passed last received event will be processed.
786
        """
787 1
        iface = event.content.get("interface")
788 1
        with self._lock_interfaces[iface.id]:
789 1
            _now = event.timestamp
790
            # Return out of order events
791 1
            if (
792
                iface.id in self._intf_events
793
                and self._intf_events[iface.id]["event"].timestamp > _now
794
            ):
795 1
                return
796 1
            self._intf_events[iface.id].update({"event": event})
797 1
            if "last_acquired" in self._intf_events[iface.id]:
798 1
                return
799 1
            self._intf_events[iface.id].update({"last_acquired": now()})
800 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
801 1
        with self._lock_interfaces[iface.id]:
802 1
            event = self._intf_events[iface.id]["event"]
803 1
            self._intf_events[iface.id].pop('last_acquired', None)
804 1
            _, _, event_type = event.name.rpartition('.')
805 1
            if event_type in ('link_up', 'created', 'enabled', 'up'):
806 1
                self.handle_interface_link_up(iface)
807 1
            elif event_type in ('link_down', 'deleted', 'disabled', 'down'):
808 1
                self.handle_interface_link_down(iface)
809
810 1
    def handle_interface_link_up(self, interface):
811
        """
812
        Handler for interface link_up events
813
        """
814
        log.info("Event handle_interface_link_up %s", interface)
815
        for evc in self.get_evcs_by_svc_level():
816
            if _does_uni_affect_evc(evc, interface, "up"):
817
                with evc.lock:
818
                    evc.handle_interface_link_up(
819
                        interface
820
                    )
821
822 1
    def handle_interface_link_down(self, interface):
823
        """
824
        Handler for interface link_down events
825
        """
826
        log.info("Event handle_interface_link_down %s", interface)
827
        for evc in self.get_evcs_by_svc_level():
828
            if _does_uni_affect_evc(evc, interface, "down"):
829
                with evc.lock:
830
                    evc.handle_interface_link_down(
831
                        interface
832
                    )
833
834 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
835 1
    def on_link_down(self, event):
836
        """Change circuit when link is down or under_mantenance."""
837
        self.handle_link_down(event)
838
839 1
    def prepare_swap_to_failover_flow(self, evc: EVC):
840
        """Prepare an evc for switching to failover."""
841
        install_flows = {}
842
        try:
843
            install_flows = evc.get_failover_flows()
844
        # pylint: disable=broad-except
845
        except Exception:
846
            err = traceback.format_exc()
847
            log.error(
848
                "Ignore Failover path for "
849
                f"{evc} due to error: {err}"
850
            )
851
        return install_flows
852
853 1
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
854
        """Prepare event contents for swap to failover."""
855
        return map_evc_event_content(
856
            evc,
857
            flows=deepcopy(install_flow)
858
        )
859
860 1
    def execute_swap_to_failover(
861
        self,
862
        evcs: list[EVC]
863
    ) -> tuple[list[EVC], list[EVC]]:
864
        """Process changes needed to commit a swap to failover."""
865 1
        event_contents = {}
866 1
        install_flows = {}
867 1
        swapped_evcs = list[EVC]()
868 1
        not_swapped_evcs = list[EVC]()
869
870 1
        for evc in evcs:
871 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
872 1
            if install_flow:
873 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
874 1
                event_contents[evc.id] =\
875
                    self.prepare_swap_to_failover_event(evc, install_flow)
876 1
                swapped_evcs.append(evc)
877
            else:
878 1
                not_swapped_evcs.append(evc)
879
880 1
        try:
881 1
            send_flow_mods_http(
882
                install_flows,
883
                "install"
884
            )
885 1
            for evc in swapped_evcs:
886 1
                temp_path = evc.current_path
887 1
                evc.current_path = evc.failover_path
888 1
                evc.failover_path = temp_path
889 1
            emit_event(
890
                self.controller, "failover_link_down",
891
                content=deepcopy(event_contents)
892
            )
893 1
            return swapped_evcs, not_swapped_evcs
894 1
        except FlowModException as exc:
895 1
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
896 1
            return [], [*swapped_evcs, *not_swapped_evcs]
897
898 1
    def prepare_clear_failover_flow(self, evc: EVC):
899
        """Prepare an evc for clearing the old path."""
900
        del_flows = {}
901
        try:
902
            del_flows = prepare_delete_flow(
903
                merge_flow_dicts(
904
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
905
                    evc._prepare_nni_flows(evc.failover_path)
906
                )
907
            )
908
        # pylint: disable=broad-except
909
        except Exception:
910
            err = traceback.format_exc()
911
            log.error(f"Fail to remove {evc} old_path: {err}")
912
        return del_flows
913
914 1
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
915
        """Prepare event contents for clearing failover."""
916
        return map_evc_event_content(
917
            evc,
918
            current_path=evc.current_path.as_dict(),
919
            removed_flows=deepcopy(delete_flow)
920
        )
921
922 1
    def execute_clear_failover(
923
        self,
924
        evcs: list[EVC]
925
    ) -> tuple[list[EVC], list[EVC]]:
926
        """Process changes needed to commit clearing the failover path"""
927 1
        event_contents = {}
928 1
        delete_flows = {}
929 1
        cleared_evcs = list[EVC]()
930 1
        not_cleared_evcs = list[EVC]()
931
932 1
        for evc in evcs:
933 1
            delete_flow = self.prepare_clear_failover_flow(evc)
934 1
            if delete_flow:
935 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
936 1
                event_contents[evc.id] =\
937
                    self.prepare_clear_failover_event(evc, delete_flow)
938 1
                cleared_evcs.append(evc)
939
            else:
940 1
                not_cleared_evcs.append(evc)
941
942 1
        try:
943 1
            send_flow_mods_http(
944
                delete_flows,
945
                'delete'
946
            )
947 1
            for evc in cleared_evcs:
948 1
                evc.failover_path.make_vlans_available(self.controller)
949 1
                evc.failover_path = Path([])
950 1
            emit_event(
951
                self.controller,
952
                "failover_old_path",
953
                content=event_contents
954
            )
955 1
            return cleared_evcs, not_cleared_evcs
956 1
        except FlowModException as exc:
957 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
958 1
            return [], [*cleared_evcs, *not_cleared_evcs]
959
960 1
    def prepare_undeploy_flow(self, evc: EVC):
961
        """Prepare an evc for undeploying."""
962 1
        del_flows = {}
963 1
        try:
964 1
            del_flows = prepare_delete_flow(
965
                merge_flow_dicts(
966
                    evc._prepare_uni_flows(evc.current_path, skip_in=False),
967
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
968
                    evc._prepare_nni_flows(evc.current_path),
969
                    evc._prepare_nni_flows(evc.failover_path)
970
                )
971
            )
972
        # pylint: disable=broad-except
973
        except Exception:
974
            err = traceback.format_exc()
975
            log.error(f"Fail to undeploy {evc}: {err}")
976 1
        return del_flows
977
978 1
    def execute_undeploy(self, evcs: list[EVC]):
979
        """Process changes needed to commit an undeploy"""
980 1
        delete_flows = {}
981 1
        undeploy_evcs = list[EVC]()
982 1
        not_undeploy_evcs = list[EVC]()
983
984 1
        for evc in evcs:
985 1
            delete_flow = self.prepare_undeploy_flow(evc)
986 1
            if delete_flow:
987 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
988 1
                undeploy_evcs.append(evc)
989
            else:
990 1
                not_undeploy_evcs.append(evc)
991
992 1
        try:
993 1
            send_flow_mods_http(
994
                delete_flows,
995
                'delete'
996
            )
997
998 1
            for evc in undeploy_evcs:
999 1
                evc.current_path.make_vlans_available(self.controller)
1000 1
                evc.failover_path.make_vlans_available(self.controller)
1001 1
                evc.current_path = Path([])
1002 1
                evc.failover_path = Path([])
1003 1
                evc.deactivate()
1004 1
                emit_event(
1005
                    self.controller,
1006
                    "need_redeploy",
1007
                    content={"evc_id": evc.id}
1008
                )
1009 1
                log.info(f"{evc} scheduled for redeploy")
1010 1
            return undeploy_evcs, not_undeploy_evcs
1011 1
        except FlowModException as exc:
1012 1
            log.error(
1013
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1014
            )
1015 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1016
1017 1
    def handle_link_down(self, event):
1018
        """Change circuit when link is down or under_mantenance."""
1019 1
        link = event.content["link"]
1020 1
        log.info("Event handle_link_down %s", link)
1021
1022 1
        with ExitStack() as exit_stack:
1023 1
            exit_stack.enter_context(self.multi_evc_lock)
1024 1
            swap_to_failover = list[EVC]()
1025 1
            undeploy = list[EVC]()
1026 1
            clear_failover = list[EVC]()
1027 1
            evcs_to_update = dict[str, EVC]()
1028
1029 1
            for evc in self.get_evcs_by_svc_level():
1030 1
                with ExitStack() as sub_stack:
1031 1
                    sub_stack.enter_context(evc.lock)
1032 1
                    if all((
1033
                        evc.is_affected_by_link(link),
1034
                        evc.failover_path,
1035
                        not evc.is_failover_path_affected_by_link(link)
1036
                    )):
1037 1
                        swap_to_failover.append(evc)
1038 1
                    elif all((
1039
                        evc.is_affected_by_link(link),
1040
                        not evc.failover_path or
1041
                        evc.is_failover_path_affected_by_link(link)
1042
                    )):
1043 1
                        undeploy.append(evc)
1044 1
                    elif all((
1045
                        not evc.is_affected_by_link(link),
1046
                        evc.failover_path,
1047
                        evc.is_failover_path_affected_by_link(link)
1048
                    )):
1049 1
                        clear_failover.append(evc)
1050
                    else:
1051
                        continue
1052
1053 1
                    exit_stack.push(sub_stack.pop_all())
1054
1055
            # Swap from current path to failover path
1056
1057 1
            if swap_to_failover:
1058 1
                success, failure = self.execute_swap_to_failover(
1059
                    swap_to_failover
1060
                )
1061
1062 1
                clear_failover.extend(success)
1063
1064 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1065
1066 1
                undeploy.extend(failure)
1067
1068
            # Clear out failover path
1069
1070 1
            if clear_failover:
1071 1
                success, failure = self.execute_clear_failover(clear_failover)
1072
1073 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1074
1075 1
                undeploy.extend(failure)
1076
1077
            # Undeploy the evc, schedule a redeploy
1078
1079 1
            if undeploy:
1080 1
                success, failure = self.execute_undeploy(undeploy)
1081
1082 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1083
1084 1
                if failure:
1085 1
                    log.error(f"Failed to handle_link_down for {failure}")
1086
1087
            # Push update to DB
1088
1089 1
            if evcs_to_update:
1090 1
                self.mongo_controller.update_evcs(
1091
                    [evc.as_dict() for evc in evcs_to_update.values()]
1092
                )
1093
1094 1
    @listen_to("kytos/mef_eline.need_redeploy")
1095 1
    def on_evc_need_redeploy(self, event):
1096
        """Redeploy evcs that need to be redeployed."""
1097
        self.handle_evc_need_redeploy(event)
1098
1099 1
    def handle_evc_need_redeploy(self, event):
1100
        """Redeploy evcs that need to be redeployed."""
1101
        evc = self.circuits.get(event.content["evc_id"])
1102
        if evc is None:
1103
            return
1104
        with evc.lock:
1105
            if not evc.is_enabled() or evc.is_active():
1106
                return
1107
            result = evc.deploy()
1108
        event_name = "error_redeploy_link_down"
1109
        if result:
1110
            log.info(f"{evc} redeployed")
1111
            event_name = "redeployed_link_down"
1112
        emit_event(self.controller, event_name,
1113
                   content=map_evc_event_content(evc))
1114
1115 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1116 1
    def on_evc_affected_by_link_down(self, event):
1117
        """Change circuit when link is down or under_mantenance."""
1118
        self.handle_evc_affected_by_link_down(event)
1119
1120 1
    def handle_evc_affected_by_link_down(self, event):
1121
        """Change circuit when link is down or under_mantenance."""
1122 1
        evc = self.circuits.get(event.content["evc_id"])
1123 1
        link = event.content['link']
1124 1
        if not evc:
1125 1
            return
1126 1
        with evc.lock:
1127 1
            if not evc.is_affected_by_link(link):
1128
                return
1129 1
            result = evc.handle_link_down()
1130 1
        event_name = "error_redeploy_link_down"
1131 1
        if result:
1132 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1133 1
            event_name = "redeployed_link_down"
1134 1
        emit_event(self.controller, event_name,
1135
                   content=map_evc_event_content(evc))
1136
1137 1
    @listen_to(
1138
        "kytos/mef_eline.(redeployed_link_(up|down)|deployed|need_failover)"
1139
    )
1140 1
    def on_evc_deployed(self, event):
1141
        """Handle EVC deployed|redeployed_link_down."""
1142
        self.handle_evc_deployed(event)
1143
1144 1
    def handle_evc_deployed(self, event):
1145
        """Setup failover path on evc deployed."""
1146 1
        evc = self.circuits.get(event.content["evc_id"])
1147 1
        if evc is None:
1148
            return
1149 1
        with evc.lock:
1150 1
            if (
1151
                not evc.is_eligible_for_failover_path()
1152
                or not evc.is_active()
1153
                or evc.failover_path
1154
                or not evc.current_path
1155
            ):
1156 1
                return
1157 1
            evc.setup_failover_path()
1158
1159 1
    @listen_to("kytos/topology.topology_loaded")
1160 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1161
        """Load EVCs once the topology is available."""
1162
        self.load_all_evcs()
1163
1164 1
    def load_all_evcs(self):
1165
        """Try to load all EVCs on startup."""
1166 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1167 1
        for circuit_id, circuit in circuits:
1168 1
            if circuit_id not in self.circuits:
1169 1
                self._load_evc(circuit)
1170 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1171
                   timeout=1)
1172
1173 1
    def _load_evc(self, circuit_dict):
1174
        """Load one EVC from mongodb to memory."""
1175 1
        try:
1176 1
            evc = self._evc_from_dict(circuit_dict)
1177 1
        except (ValueError, KytosTagError) as exception:
1178 1
            log.error(
1179
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1180
            )
1181 1
            return None
1182 1
        if evc.archived:
1183 1
            return None
1184
1185 1
        self.circuits.setdefault(evc.id, evc)
1186 1
        self.sched.add(evc)
1187 1
        return evc
1188
1189 1
    @listen_to("kytos/flow_manager.flow.error")
1190 1
    def on_flow_mod_error(self, event):
1191
        """Handle flow mod errors related to an EVC."""
1192
        self.handle_flow_mod_error(event)
1193
1194 1
    def handle_flow_mod_error(self, event):
1195
        """Handle flow mod errors related to an EVC."""
1196 1
        flow = event.content["flow"]
1197 1
        command = event.content.get("error_command")
1198 1
        if command != "add":
1199 1
            return
1200 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1201 1
        if not evc or evc.archived or not evc.is_enabled():
1202 1
            return
1203 1
        with evc.lock:
1204 1
            evc.remove_current_flows(sync=False)
1205 1
            evc.remove_failover_flows(sync=True)
1206
1207 1
    def _evc_dict_with_instances(self, evc_dict):
1208
        """Convert some dict values to instance of EVC classes.
1209
1210
        This method will convert: [UNI, Link]
1211
        """
1212 1
        data = evc_dict.copy()  # Do not modify the original dict
1213 1
        for attribute, value in data.items():
1214
            # Get multiple attributes.
1215
            # Ex: uni_a, uni_z
1216 1
            if "uni" in attribute:
1217 1
                try:
1218 1
                    data[attribute] = self._uni_from_dict(value)
1219 1
                except ValueError as exception:
1220 1
                    result = "Error creating UNI: Invalid value"
1221 1
                    raise ValueError(result) from exception
1222
1223 1
            if attribute == "circuit_scheduler":
1224 1
                data[attribute] = []
1225 1
                for schedule in value:
1226 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1227
1228
            # Get multiple attributes.
1229
            # Ex: primary_links,
1230
            #     backup_links,
1231
            #     current_links_cache,
1232
            #     primary_links_cache,
1233
            #     backup_links_cache
1234 1
            if "links" in attribute:
1235 1
                data[attribute] = [
1236
                    self._link_from_dict(link, attribute) for link in value
1237
                ]
1238
1239
            # Ex: current_path,
1240
            #     primary_path,
1241
            #     backup_path
1242 1
            if (attribute.endswith("path") and
1243
                    attribute != "dynamic_backup_path"):
1244 1
                data[attribute] = Path(
1245
                    [self._link_from_dict(link, attribute) for link in value]
1246
                )
1247
1248 1
        return data
1249
1250 1
    def _evc_from_dict(self, evc_dict):
1251 1
        data = self._evc_dict_with_instances(evc_dict)
1252 1
        data["table_group"] = self.table_group
1253 1
        return EVC(self.controller, **data)
1254
1255 1
    def _uni_from_dict(self, uni_dict):
1256
        """Return a UNI object from python dict."""
1257 1
        if uni_dict is None:
1258 1
            return False
1259
1260 1
        interface_id = uni_dict.get("interface_id")
1261 1
        interface = self.controller.get_interface_by_id(interface_id)
1262 1
        if interface is None:
1263 1
            result = (
1264
                "Error creating UNI:"
1265
                + f"Could not instantiate interface {interface_id}"
1266
            )
1267 1
            raise ValueError(result) from ValueError
1268 1
        tag_convert = {1: "vlan"}
1269 1
        tag_dict = uni_dict.get("tag", None)
1270 1
        if tag_dict:
1271 1
            tag_type = tag_dict.get("tag_type")
1272 1
            tag_type = tag_convert.get(tag_type, tag_type)
1273 1
            tag_value = tag_dict.get("value")
1274 1
            if isinstance(tag_value, list):
1275 1
                tag_value = get_tag_ranges(tag_value)
1276 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1277 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1278
            else:
1279 1
                tag = TAG(tag_type, tag_value)
1280
        else:
1281 1
            tag = None
1282 1
        uni = UNI(interface, tag)
1283 1
        return uni
1284
1285 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1286
        """Return a Link object from python dict."""
1287 1
        id_a = link_dict.get("endpoint_a").get("id")
1288 1
        id_b = link_dict.get("endpoint_b").get("id")
1289
1290 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1291 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1292 1
        if not endpoint_a:
1293 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1294 1
            raise ValueError(error_msg)
1295 1
        if not endpoint_b:
1296
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1297
            raise ValueError(error_msg)
1298
1299 1
        link = Link(endpoint_a, endpoint_b)
1300 1
        allowed_paths = {"current_path", "failover_path"}
1301 1
        if "metadata" in link_dict and attribute in allowed_paths:
1302 1
            link.extend_metadata(link_dict.get("metadata"))
1303
1304 1
        s_vlan = link.get_metadata("s_vlan")
1305 1
        if s_vlan:
1306 1
            tag = TAG.from_dict(s_vlan)
1307 1
            if tag is False:
1308
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1309
                raise ValueError(error_msg)
1310 1
            link.update_metadata("s_vlan", tag)
1311 1
        return link
1312
1313 1
    def _find_evc_by_schedule_id(self, schedule_id):
1314
        """
1315
        Find an EVC and CircuitSchedule based on schedule_id.
1316
1317
        :param schedule_id: Schedule ID
1318
        :return: EVC and Schedule
1319
        """
1320 1
        circuits = self._get_circuits_buffer()
1321 1
        found_schedule = None
1322 1
        evc = None
1323
1324
        # pylint: disable=unused-variable
1325 1
        for c_id, circuit in circuits.items():
1326 1
            for schedule in circuit.circuit_scheduler:
1327 1
                if schedule.id == schedule_id:
1328 1
                    found_schedule = schedule
1329 1
                    evc = circuit
1330 1
                    break
1331 1
            if found_schedule:
1332 1
                break
1333 1
        return evc, found_schedule
1334
1335 1
    def _get_circuits_buffer(self):
1336
        """
1337
        Return the circuit buffer.
1338
1339
        If the buffer is empty, try to load data from mongodb.
1340
        """
1341 1
        if not self.circuits:
1342
            # Load circuits from mongodb to buffer
1343 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1344 1
            for c_id, circuit in circuits.items():
1345 1
                evc = self._evc_from_dict(circuit)
1346 1
                self.circuits[c_id] = evc
1347 1
        return self.circuits
1348
1349
    # pylint: disable=attribute-defined-outside-init
1350 1
    @alisten_to("kytos/of_multi_table.enable_table")
1351 1
    async def on_table_enabled(self, event):
1352
        """Handle a recently table enabled."""
1353 1
        table_group = event.content.get("mef_eline", None)
1354 1
        if not table_group:
1355 1
            return
1356 1
        for group in table_group:
1357 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1358 1
                log.error(f'The table group "{group}" is not allowed for '
1359
                          f'mef_eline. Allowed table groups are '
1360
                          f'{settings.TABLE_GROUP_ALLOWED}')
1361 1
                return
1362 1
        self.table_group.update(table_group)
1363 1
        content = {"group_table": self.table_group}
1364 1
        name = "kytos/mef_eline.enable_table"
1365
        await aemit_event(self.controller, name, content)
1366