Passed
Push — master ( 9a93f5...4e93b8 )
by
unknown
03:42 queued 10s
created

build.main.Main.redeploy()   B

Complexity

Conditions 6

Size

Total Lines 38
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 6.0184

Importance

Changes 0
Metric Value
cc 6
eloc 32
nop 2
dl 0
loc 38
ccs 23
cts 25
cp 0.92
crap 6.0184
rs 8.1786
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40
41
# pylint: disable=too-many-public-methods
42 1
class Main(KytosNApp):
43
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46
    """
47
48 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49
50 1
    def setup(self):
51
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57
        """
58
        # object used to scheduler circuit events
59 1
        self.sched = Scheduler()
60
61
        # object to save and load circuits
62 1
        self.mongo_controller = self.get_eline_controller()
63 1
        self.mongo_controller.bootstrap_indexes()
64
65
        # set the controller that will manager the dynamic paths
66 1
        DynamicPathManager.set_controller(self.controller)
67
68
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70 1
        self.circuits = dict[str, EVC]()
71
72 1
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75 1
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78
79 1
        self.load_all_evcs()
80 1
        self._topology_updated_at = None
81
82 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83
        """Get circuits sorted by desc service level and asc creation_time.
84
85
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87 1
        if enable_filter:
88 1
            return sorted(
89
                          [circuit for circuit in self.circuits.values()
90
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93 1
        return sorted(self.circuits.values(),
94
                      key=lambda x: (-x.service_level, x.creation_time))
95
96 1
    @staticmethod
97 1
    def get_eline_controller():
98
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100
101 1
    def execute(self):
102
        """Execute once when the napp is running."""
103 1
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106 1
        with self._lock:
107 1
            self.execute_consistency()
108 1
        log.debug("Finished consistency routine")
109
110 1
    def should_be_checked(self, circuit):
111
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113 1
        if (
114
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124 1
            return True
125 1
        return False
126
127 1
    def execute_consistency(self):
128
        """Execute consistency routine."""
129 1
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            if (
134
                circuit.is_active()
135
                and not circuit.failover_path
136
                and circuit.is_eligible_for_failover_path()
137
            ):
138 1
                emit_event(
139
                    self.controller,
140
                    "need_failover",
141
                    content=map_evc_event_content(circuit)
142
                )
143 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
144 1
        for circuit in circuits_to_check:
145 1
            is_checked = circuits_checked.get(circuit.id)
146 1
            if is_checked:
147 1
                circuit.execution_rounds = 0
148 1
                log.info(f"{circuit} enabled but inactive - activating")
149 1
                with circuit.lock:
150 1
                    circuit.activate()
151 1
                    circuit.sync()
152
            else:
153 1
                circuit.execution_rounds += 1
154 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
155 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
156 1
                    with circuit.lock:
157 1
                        circuit.deploy()
158
159 1
    def shutdown(self):
160
        """Execute when your napp is unloaded.
161
162
        If you have some cleanup procedure, insert it here.
163
        """
164
165 1
    @rest("/v2/evc/", methods=["GET"])
166 1
    def list_circuits(self, request: Request) -> JSONResponse:
167
        """Endpoint to return circuits stored.
168
169
        archive query arg if defined (not null) will be filtered
170
        accordingly, by default only non archived evcs will be listed
171
        """
172 1
        log.debug("list_circuits /v2/evc")
173 1
        args = request.query_params
174 1
        archived = args.get("archived", "false").lower()
175 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
176 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
177
                                                      metadata=args)
178 1
        circuits = circuits['circuits']
179 1
        return JSONResponse(circuits)
180
181 1
    @rest("/v2/evc/schedule", methods=["GET"])
182 1
    def list_schedules(self, _request: Request) -> JSONResponse:
183
        """Endpoint to return all schedules stored for all circuits.
184
185
        Return a JSON with the following template:
186
        [{"schedule_id": <schedule_id>,
187
         "circuit_id": <circuit_id>,
188
         "schedule": <schedule object>}]
189
        """
190 1
        log.debug("list_schedules /v2/evc/schedule")
191 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
192 1
        if not circuits:
193 1
            result = {}
194 1
            status = 200
195 1
            return JSONResponse(result, status_code=status)
196
197 1
        result = []
198 1
        status = 200
199 1
        for circuit in circuits:
200 1
            circuit_scheduler = circuit.get("circuit_scheduler")
201 1
            if circuit_scheduler:
202 1
                for scheduler in circuit_scheduler:
203 1
                    value = {
204
                        "schedule_id": scheduler.get("id"),
205
                        "circuit_id": circuit.get("id"),
206
                        "schedule": scheduler,
207
                    }
208 1
                    result.append(value)
209
210 1
        log.debug("list_schedules result %s %s", result, status)
211 1
        return JSONResponse(result, status_code=status)
212
213 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
214 1
    def get_circuit(self, request: Request) -> JSONResponse:
215
        """Endpoint to return a circuit based on id."""
216 1
        circuit_id = request.path_params["circuit_id"]
217 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
218 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
219 1
        if not circuit:
220 1
            result = f"circuit_id {circuit_id} not found"
221 1
            log.debug("get_circuit result %s %s", result, 404)
222 1
            raise HTTPException(404, detail=result)
223 1
        status = 200
224 1
        log.debug("get_circuit result %s %s", circuit, status)
225 1
        return JSONResponse(circuit, status_code=status)
226
227
    # pylint: disable=too-many-branches, too-many-statements
228 1
    @rest("/v2/evc/", methods=["POST"])
229 1
    @validate_openapi(spec)
230 1
    def create_circuit(self, request: Request) -> JSONResponse:
231
        """Try to create a new circuit.
232
233
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
234
        UNI_Z's requested C-VID are available from the interfaces' pools. This
235
        is checked when creating the UNI object.
236
237
        Then, E-Line NApp requests a primary and a backup path to the
238
        Pathfinder NApp using the attributes primary_links and backup_links
239
        submitted via REST
240
241
        # For each link composing paths in #3:
242
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
243
        #  - Using the S-VID obtained, generate abstract flow entries to be
244
        #    sent to FlowManager
245
246
        Push abstract flow entries to FlowManager and FlowManager pushes
247
        OpenFlow entries to datapaths
248
249
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
250
        creation
251
252
        Finnaly, notify user of the status of its request.
253
        """
254
        # Try to create the circuit object
255 1
        log.debug("create_circuit /v2/evc/")
256 1
        data = get_json_or_400(request, self.controller.loop)
257
258 1
        try:
259 1
            evc = self._evc_from_dict(data)
260 1
        except (ValueError, KytosTagError) as exception:
261 1
            log.debug("create_circuit result %s %s", exception, 400)
262 1
            raise HTTPException(400, detail=str(exception)) from exception
263 1
        if evc.primary_path:
264
            try:
265
                evc.primary_path.is_valid(
266
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272
                    400,
273
                    detail=f"primary_path is not valid: {exception}"
274
                ) from exception
275 1
        if evc.backup_path:
276
            try:
277
                evc.backup_path.is_valid(
278
                    evc.uni_a.interface.switch,
279
                    evc.uni_z.interface.switch,
280
                    bool(evc.circuit_scheduler),
281
                )
282
            except InvalidPath as exception:
283
                raise HTTPException(
284
                    400,
285
                    detail=f"backup_path is not valid: {exception}"
286
                ) from exception
287
288 1
        if not evc._tag_lists_equal():
289
            detail = "UNI_A and UNI_Z tag lists should be the same."
290
            raise HTTPException(400, detail=detail)
291
292 1
        try:
293 1
            evc._validate_has_primary_or_dynamic()
294 1
        except ValueError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297 1
        try:
298 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
299
        except DuplicatedNoTagUNI as exception:
300
            log.debug("create_circuit result %s %s", exception, 409)
301
            raise HTTPException(409, detail=str(exception)) from exception
302
303 1
        try:
304 1
            self._use_uni_tags(evc)
305 1
        except KytosTagError as exception:
306 1
            raise HTTPException(400, detail=str(exception)) from exception
307
308
        # save circuit
309 1
        try:
310 1
            evc.sync()
311
        except ValidationError as exception:
312
            raise HTTPException(400, detail=str(exception)) from exception
313
314
        # store circuit in dictionary
315 1
        self.circuits[evc.id] = evc
316
317
        # Schedule the circuit deploy
318 1
        self.sched.add(evc)
319
320
        # Circuit has no schedule, deploy now
321 1
        deployed = False
322 1
        if not evc.circuit_scheduler:
323 1
            with evc.lock:
324 1
                deployed = evc.deploy()
325
326
        # Notify users
327 1
        result = {"circuit_id": evc.id, "deployed": deployed}
328 1
        status = 201
329 1
        log.debug("create_circuit result %s %s", result, status)
330 1
        emit_event(self.controller, name="created",
331
                   content=map_evc_event_content(evc))
332 1
        return JSONResponse(result, status_code=status)
333
334 1
    @staticmethod
335 1
    def _use_uni_tags(evc):
336 1
        uni_a = evc.uni_a
337 1
        evc._use_uni_vlan(uni_a)
338 1
        try:
339 1
            uni_z = evc.uni_z
340 1
            evc._use_uni_vlan(uni_z)
341 1
        except KytosTagError as err:
342 1
            evc.make_uni_vlan_available(uni_a)
343 1
            raise err
344
345 1
    @listen_to('kytos/flow_manager.flow.removed')
346 1
    def on_flow_delete(self, event):
347
        """Capture delete messages to keep track when flows got removed."""
348
        self.handle_flow_delete(event)
349
350 1
    def handle_flow_delete(self, event):
351
        """Keep track when the EVC got flows removed by deriving its cookie."""
352 1
        flow = event.content["flow"]
353 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
354 1
        if evc:
355 1
            log.debug("Flow removed in EVC %s", evc.id)
356 1
            evc.set_flow_removed_at()
357
358 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
359 1
    @validate_openapi(spec)
360 1
    def update(self, request: Request) -> JSONResponse:
361
        """Update a circuit based on payload.
362
363
        The EVC attributes (creation_time, active, current_path,
364
        failover_path, _id, archived) can't be updated.
365
        """
366 1
        data = get_json_or_400(request, self.controller.loop)
367 1
        circuit_id = request.path_params["circuit_id"]
368 1
        log.debug("update /v2/evc/%s", circuit_id)
369 1
        try:
370 1
            evc = self.circuits[circuit_id]
371 1
        except KeyError:
372 1
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("update result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375
376 1
        try:
377 1
            updated_data = self._evc_dict_with_instances(data)
378 1
            self._check_no_tag_duplication(
379
                circuit_id, updated_data.get("uni_a"),
380
                updated_data.get("uni_z")
381
            )
382 1
            enable, redeploy = evc.update(**updated_data)
383 1
        except (ValueError, KytosTagError, ValidationError) as exception:
384 1
            log.debug("update result %s %s", exception, 400)
385 1
            raise HTTPException(400, detail=str(exception)) from exception
386
        except DuplicatedNoTagUNI as exception:
387
            log.debug("update result %s %s", exception, 409)
388
            raise HTTPException(409, detail=str(exception)) from exception
389
        except DisabledSwitch as exception:
390
            log.debug("update result %s %s", exception, 409)
391
            raise HTTPException(
392
                    409,
393
                    detail=f"Path is not valid: {exception}"
394
                ) from exception
395 1
        redeployed = False
396 1
        if evc.is_active():
397
            if enable is False:  # disable if active
398
                with evc.lock:
399
                    evc.remove()
400
            elif redeploy is not None:  # redeploy if active
401
                with evc.lock:
402
                    evc.remove()
403
                    redeployed = evc.deploy()
404
        else:
405 1
            if enable is True:  # enable if inactive
406 1
                with evc.lock:
407 1
                    redeployed = evc.deploy()
408 1
            elif evc.is_enabled() and redeploy:
409 1
                with evc.lock:
410 1
                    evc.remove()
411 1
                    redeployed = evc.deploy()
412 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
413 1
        status = 200
414
415 1
        log.debug("update result %s %s", result, status)
416 1
        emit_event(self.controller, "updated",
417
                   content=map_evc_event_content(evc, **data))
418 1
        return JSONResponse(result, status_code=status)
419
420 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
421 1
    def delete_circuit(self, request: Request) -> JSONResponse:
422
        """Remove a circuit.
423
424
        First, the flows are removed from the switches, and then the EVC is
425
        disabled.
426
        """
427 1
        circuit_id = request.path_params["circuit_id"]
428 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
429 1
        try:
430 1
            evc = self.circuits.pop(circuit_id)
431 1
        except KeyError:
432 1
            result = f"circuit_id {circuit_id} not found"
433 1
            log.debug("delete_circuit result %s %s", result, 404)
434 1
            raise HTTPException(404, detail=result) from KeyError
435 1
        log.info("Removing %s", evc)
436
437 1
        with evc.lock:
438 1
            if not evc.archived:
439 1
                evc.deactivate()
440 1
                evc.disable()
441 1
                self.sched.remove(evc)
442 1
                evc.remove_current_flows(sync=False)
443 1
                evc.remove_failover_flows(sync=False)
444 1
                evc.archive()
445 1
                evc.remove_uni_tags()
446 1
                evc.sync()
447 1
                emit_event(
448
                    self.controller, "deleted",
449
                    content=map_evc_event_content(evc)
450
                )
451
452 1
        log.info("EVC removed. %s", evc)
453 1
        result = {"response": f"Circuit {circuit_id} removed"}
454 1
        status = 200
455 1
        log.debug("delete_circuit result %s %s", result, status)
456
457 1
        return JSONResponse(result, status_code=status)
458
459 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
460 1
    def get_metadata(self, request: Request) -> JSONResponse:
461
        """Get metadata from an EVC."""
462 1
        circuit_id = request.path_params["circuit_id"]
463 1
        try:
464 1
            return (
465
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
466
            )
467
        except KeyError as error:
468
            raise HTTPException(
469
                404,
470
                detail=f"circuit_id {circuit_id} not found."
471
            ) from error
472
473 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474 1
    @validate_openapi(spec)
475 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
476
        """Add metadata to a bulk of EVCs."""
477 1
        data = get_json_or_400(request, self.controller.loop)
478 1
        circuit_ids = data.pop("circuit_ids")
479
480 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
481
482 1
        fail_evcs = []
483 1
        for _id in circuit_ids:
484 1
            try:
485 1
                evc = self.circuits[_id]
486 1
                evc.extend_metadata(data)
487 1
            except KeyError:
488 1
                fail_evcs.append(_id)
489
490 1
        if fail_evcs:
491 1
            raise HTTPException(404, detail=fail_evcs)
492 1
        return JSONResponse("Operation successful", status_code=201)
493
494 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
495 1
    @validate_openapi(spec)
496 1
    def add_metadata(self, request: Request) -> JSONResponse:
497
        """Add metadata to an EVC."""
498 1
        circuit_id = request.path_params["circuit_id"]
499 1
        metadata = get_json_or_400(request, self.controller.loop)
500 1
        if not isinstance(metadata, dict):
501
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
502 1
        try:
503 1
            evc = self.circuits[circuit_id]
504 1
        except KeyError as error:
505 1
            raise HTTPException(
506
                404,
507
                detail=f"circuit_id {circuit_id} not found."
508
            ) from error
509
510 1
        evc.extend_metadata(metadata)
511 1
        evc.sync()
512 1
        return JSONResponse("Operation successful", status_code=201)
513
514 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
515 1
    @validate_openapi(spec)
516 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
517
        """Delete metada from a bulk of EVCs"""
518 1
        data = get_json_or_400(request, self.controller.loop)
519 1
        key = request.path_params["key"]
520 1
        circuit_ids = data.pop("circuit_ids")
521 1
        self.mongo_controller.update_evcs_metadata(
522
            circuit_ids, {key: ""}, "del"
523
        )
524
525 1
        fail_evcs = []
526 1
        for _id in circuit_ids:
527 1
            try:
528 1
                evc = self.circuits[_id]
529 1
                evc.remove_metadata(key)
530 1
            except KeyError:
531 1
                fail_evcs.append(_id)
532
533 1
        if fail_evcs:
534 1
            raise HTTPException(404, detail=fail_evcs)
535 1
        return JSONResponse("Operation successful")
536
537 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
538 1
    def delete_metadata(self, request: Request) -> JSONResponse:
539
        """Delete metadata from an EVC."""
540 1
        circuit_id = request.path_params["circuit_id"]
541 1
        key = request.path_params["key"]
542 1
        try:
543 1
            evc = self.circuits[circuit_id]
544 1
        except KeyError as error:
545 1
            raise HTTPException(
546
                404,
547
                detail=f"circuit_id {circuit_id} not found."
548
            ) from error
549
550 1
        evc.remove_metadata(key)
551 1
        evc.sync()
552 1
        return JSONResponse("Operation successful")
553
554 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
555 1
    def redeploy(self, request: Request) -> JSONResponse:
556
        """Endpoint to force the redeployment of an EVC."""
557 1
        circuit_id = request.path_params["circuit_id"]
558 1
        try_avoid_same_s_vlan = request.query_params.get(
559
            "try_avoid_same_s_vlan", "true"
560
        )
561 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
562 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
563
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
564
            raise HTTPException(400, detail=msg)
565 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
566 1
        try:
567 1
            evc = self.circuits[circuit_id]
568 1
        except KeyError:
569 1
            raise HTTPException(
570
                404,
571
                detail=f"circuit_id {circuit_id} not found"
572
            ) from KeyError
573 1
        deployed = False
574 1
        with evc.lock:
575 1
            if evc.is_enabled():
576 1
                path_dict = evc.remove_current_flows(
577
                    sync=False,
578
                    return_path=try_avoid_same_s_vlan == "true"
579
                )
580 1
                evc.remove_failover_flows(sync=True)
581 1
                deployed = evc.deploy(path_dict)
582 1
        if deployed:
583 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
584 1
            status = 202
585
        else:
586 1
            result = {
587
                "response": f"Circuit {circuit_id} is disabled."
588
            }
589 1
            status = 409
590
591 1
        return JSONResponse(result, status_code=status)
592
593 1
    @rest("/v2/evc/schedule/", methods=["POST"])
594 1
    @validate_openapi(spec)
595 1
    def create_schedule(self, request: Request) -> JSONResponse:
596
        """
597
        Create a new schedule for a given circuit.
598
599
        This service do no check if there are conflicts with another schedule.
600
        Payload example:
601
            {
602
              "circuit_id":"aa:bb:cc",
603
              "schedule": {
604
                "date": "2019-08-07T14:52:10.967Z",
605
                "interval": "string",
606
                "frequency": "1 * * * *",
607
                "action": "create"
608
              }
609
            }
610
        """
611 1
        log.debug("create_schedule /v2/evc/schedule/")
612 1
        data = get_json_or_400(request, self.controller.loop)
613 1
        circuit_id = data["circuit_id"]
614 1
        schedule_data = data["schedule"]
615
616
        # Get EVC from circuits buffer
617 1
        circuits = self._get_circuits_buffer()
618
619
        # get the circuit
620 1
        evc = circuits.get(circuit_id)
621
622
        # get the circuit
623 1
        if not evc:
624 1
            result = f"circuit_id {circuit_id} not found"
625 1
            log.debug("create_schedule result %s %s", result, 404)
626 1
            raise HTTPException(404, detail=result)
627
628
        # new schedule from dict
629 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
630
631
        # If there is no schedule, create the list
632 1
        if not evc.circuit_scheduler:
633 1
            evc.circuit_scheduler = []
634
635
        # Add the new schedule
636 1
        evc.circuit_scheduler.append(new_schedule)
637
638
        # Add schedule job
639 1
        self.sched.add_circuit_job(evc, new_schedule)
640
641
        # save circuit to mongodb
642 1
        evc.sync()
643
644 1
        result = new_schedule.as_dict()
645 1
        status = 201
646
647 1
        log.debug("create_schedule result %s %s", result, status)
648 1
        return JSONResponse(result, status_code=status)
649
650 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
651 1
    @validate_openapi(spec)
652 1
    def update_schedule(self, request: Request) -> JSONResponse:
653
        """Update a schedule.
654
655
        Change all attributes from the given schedule from a EVC circuit.
656
        The schedule ID is preserved as default.
657
        Payload example:
658
            {
659
              "date": "2019-08-07T14:52:10.967Z",
660
              "interval": "string",
661
              "frequency": "1 * * *",
662
              "action": "create"
663
            }
664
        """
665 1
        data = get_json_or_400(request, self.controller.loop)
666 1
        schedule_id = request.path_params["schedule_id"]
667 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
668
669
        # Try to find a circuit schedule
670 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
671
672
        # Can not modify circuits deleted and archived
673 1
        if not found_schedule:
674 1
            result = f"schedule_id {schedule_id} not found"
675 1
            log.debug("update_schedule result %s %s", result, 404)
676 1
            raise HTTPException(404, detail=result)
677
678 1
        new_schedule = CircuitSchedule.from_dict(data)
679 1
        new_schedule.id = found_schedule.id
680
        # Remove the old schedule
681 1
        evc.circuit_scheduler.remove(found_schedule)
682
        # Append the modified schedule
683 1
        evc.circuit_scheduler.append(new_schedule)
684
685
        # Cancel all schedule jobs
686 1
        self.sched.cancel_job(found_schedule.id)
687
        # Add the new circuit schedule
688 1
        self.sched.add_circuit_job(evc, new_schedule)
689
        # Save EVC to mongodb
690 1
        evc.sync()
691
692 1
        result = new_schedule.as_dict()
693 1
        status = 200
694
695 1
        log.debug("update_schedule result %s %s", result, status)
696 1
        return JSONResponse(result, status_code=status)
697
698 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
699 1
    def delete_schedule(self, request: Request) -> JSONResponse:
700
        """Remove a circuit schedule.
701
702
        Remove the Schedule from EVC.
703
        Remove the Schedule from cron job.
704
        Save the EVC to the Storehouse.
705
        """
706 1
        schedule_id = request.path_params["schedule_id"]
707 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
708 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
709
710
        # Can not modify circuits deleted and archived
711 1
        if not found_schedule:
712 1
            result = f"schedule_id {schedule_id} not found"
713 1
            log.debug("delete_schedule result %s %s", result, 404)
714 1
            raise HTTPException(404, detail=result)
715
716
        # Remove the old schedule
717 1
        evc.circuit_scheduler.remove(found_schedule)
718
719
        # Cancel all schedule jobs
720 1
        self.sched.cancel_job(found_schedule.id)
721
        # Save EVC to mongodb
722 1
        evc.sync()
723
724 1
        result = "Schedule removed"
725 1
        status = 200
726
727 1
        log.debug("delete_schedule result %s %s", result, status)
728 1
        return JSONResponse(result, status_code=status)
729
730 1
    def _check_no_tag_duplication(
731
        self,
732
        evc_id: str,
733
        uni_a: Optional[UNI] = None,
734
        uni_z: Optional[UNI] = None
735
    ):
736
        """Check if the given EVC has UNIs with no tag and if these are
737
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
738
        Args:
739
            evc (dict): EVC to be analyzed.
740
        """
741
742
        # No UNIs
743 1
        if not (uni_a or uni_z):
744 1
            return
745
746 1
        if (not (uni_a and not uni_a.user_tag) and
747
                not (uni_z and not uni_z.user_tag)):
748 1
            return
749 1
        for circuit in self.circuits.copy().values():
750 1
            if (not circuit.archived and circuit._id != evc_id):
751 1
                if uni_a and uni_a.user_tag is None:
752 1
                    circuit.check_no_tag_duplicate(uni_a)
753 1
                if uni_z and uni_z.user_tag is None:
754 1
                    circuit.check_no_tag_duplicate(uni_z)
755
756 1
    @listen_to("kytos/topology.link_up")
757 1
    def on_link_up(self, event):
758
        """Change circuit when link is up or end_maintenance."""
759
        self.handle_link_up(event)
760
761 1
    def handle_link_up(self, event):
762
        """Change circuit when link is up or end_maintenance."""
763 1
        log.info("Event handle_link_up %s", event.content["link"])
764 1
        for evc in self.get_evcs_by_svc_level():
765 1
            if evc.is_enabled() and not evc.archived:
766 1
                with evc.lock:
767 1
                    evc.handle_link_up(event.content["link"])
768
769
    # Possibly replace this with interruptions?
770 1
    @listen_to(
771
        '.*.switch.interface.(link_up|link_down|created|deleted)',
772
        '.*.interface.(disabled|enabled|up|down)'
773
    )
774 1
    def on_interface_link_change(self, event: KytosEvent):
775
        """
776
        Handler for interface link_up and link_down events.
777
        """
778
        self.handle_on_interface_link_change(event)
779
780 1
    def handle_on_interface_link_change(self, event: KytosEvent):
781
        """
782
        Handler to sort interface events {link_(up, down), create, deleted}
783
784
        To avoid multiple database updated (link flap):
785
        Every interface is identfied and processed in parallel.
786
        Once an interface event is received a time is started.
787
        While time is running self._intf_events will be updated.
788
        After time has passed last received event will be processed.
789
        """
790 1
        iface = event.content.get("interface")
791 1
        with self._lock_interfaces[iface.id]:
792 1
            _now = event.timestamp
793
            # Return out of order events
794 1
            if (
795
                iface.id in self._intf_events
796
                and self._intf_events[iface.id]["event"].timestamp > _now
797
            ):
798 1
                return
799 1
            self._intf_events[iface.id].update({"event": event})
800 1
            if "last_acquired" in self._intf_events[iface.id]:
801 1
                return
802 1
            self._intf_events[iface.id].update({"last_acquired": now()})
803 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
804 1
        with self._lock_interfaces[iface.id]:
805 1
            event = self._intf_events[iface.id]["event"]
806 1
            self._intf_events[iface.id].pop('last_acquired', None)
807 1
            _, _, event_type = event.name.rpartition('.')
808 1
            if event_type in ('link_up', 'created', 'enabled', 'up'):
809 1
                self.handle_interface_link_up(iface)
810 1
            elif event_type in ('link_down', 'deleted', 'disabled', 'down'):
811 1
                self.handle_interface_link_down(iface)
812
813 1
    def handle_interface_link_up(self, interface):
814
        """
815
        Handler for interface link_up events
816
        """
817
        log.info("Event handle_interface_link_up %s", interface)
818
        for evc in self.get_evcs_by_svc_level():
819
            if _does_uni_affect_evc(evc, interface, "up"):
820
                with evc.lock:
821
                    evc.handle_interface_link_up(
822
                        interface
823
                    )
824
825 1
    def handle_interface_link_down(self, interface):
826
        """
827
        Handler for interface link_down events
828
        """
829
        log.info("Event handle_interface_link_down %s", interface)
830
        for evc in self.get_evcs_by_svc_level():
831
            if _does_uni_affect_evc(evc, interface, "down"):
832
                with evc.lock:
833
                    evc.handle_interface_link_down(
834
                        interface
835
                    )
836
837 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
838 1
    def on_link_down(self, event):
839
        """Change circuit when link is down or under_mantenance."""
840
        self.handle_link_down(event)
841
842 1
    def prepare_swap_to_failover_flow(self, evc: EVC):
843
        """Prepare an evc for switching to failover."""
844
        install_flows = {}
845
        try:
846
            install_flows = evc.get_failover_flows()
847
        # pylint: disable=broad-except
848
        except Exception:
849
            err = traceback.format_exc()
850
            log.error(
851
                "Ignore Failover path for "
852
                f"{evc} due to error: {err}"
853
            )
854
        return install_flows
855
856 1
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
857
        """Prepare event contents for swap to failover."""
858
        return map_evc_event_content(
859
            evc,
860
            flows=deepcopy(install_flow)
861
        )
862
863 1
    def execute_swap_to_failover(
864
        self,
865
        evcs: list[EVC]
866
    ) -> tuple[list[EVC], list[EVC]]:
867
        """Process changes needed to commit a swap to failover."""
868 1
        event_contents = {}
869 1
        install_flows = {}
870 1
        swapped_evcs = list[EVC]()
871 1
        not_swapped_evcs = list[EVC]()
872
873 1
        for evc in evcs:
874 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
875 1
            if install_flow:
876 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
877 1
                event_contents[evc.id] =\
878
                    self.prepare_swap_to_failover_event(evc, install_flow)
879 1
                swapped_evcs.append(evc)
880
            else:
881 1
                not_swapped_evcs.append(evc)
882
883 1
        try:
884 1
            send_flow_mods_http(
885
                install_flows,
886
                "install"
887
            )
888 1
            for evc in swapped_evcs:
889 1
                temp_path = evc.current_path
890 1
                evc.current_path = evc.failover_path
891 1
                evc.failover_path = temp_path
892 1
            emit_event(
893
                self.controller, "failover_link_down",
894
                content=deepcopy(event_contents)
895
            )
896 1
            return swapped_evcs, not_swapped_evcs
897 1
        except FlowModException as exc:
898 1
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
899 1
            return [], [*swapped_evcs, *not_swapped_evcs]
900
901 1
    def prepare_clear_failover_flow(self, evc: EVC):
902
        """Prepare an evc for clearing the old path."""
903
        del_flows = {}
904
        try:
905
            del_flows = prepare_delete_flow(
906
                merge_flow_dicts(
907
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
908
                    evc._prepare_nni_flows(evc.failover_path)
909
                )
910
            )
911
        # pylint: disable=broad-except
912
        except Exception:
913
            err = traceback.format_exc()
914
            log.error(f"Fail to remove {evc} old_path: {err}")
915
        return del_flows
916
917 1
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
918
        """Prepare event contents for clearing failover."""
919
        return map_evc_event_content(
920
            evc,
921
            current_path=evc.current_path.as_dict(),
922
            removed_flows=deepcopy(delete_flow)
923
        )
924
925 1
    def execute_clear_failover(
926
        self,
927
        evcs: list[EVC]
928
    ) -> tuple[list[EVC], list[EVC]]:
929
        """Process changes needed to commit clearing the failover path"""
930 1
        event_contents = {}
931 1
        delete_flows = {}
932 1
        cleared_evcs = list[EVC]()
933 1
        not_cleared_evcs = list[EVC]()
934
935 1
        for evc in evcs:
936 1
            delete_flow = self.prepare_clear_failover_flow(evc)
937 1
            if delete_flow:
938 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
939 1
                event_contents[evc.id] =\
940
                    self.prepare_clear_failover_event(evc, delete_flow)
941 1
                cleared_evcs.append(evc)
942
            else:
943 1
                not_cleared_evcs.append(evc)
944
945 1
        try:
946 1
            send_flow_mods_http(
947
                delete_flows,
948
                'delete'
949
            )
950 1
            for evc in cleared_evcs:
951 1
                evc.failover_path.make_vlans_available(self.controller)
952 1
                evc.failover_path = Path([])
953 1
            emit_event(
954
                self.controller,
955
                "failover_old_path",
956
                content=event_contents
957
            )
958 1
            return cleared_evcs, not_cleared_evcs
959 1
        except FlowModException as exc:
960 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
961 1
            return [], [*cleared_evcs, *not_cleared_evcs]
962
963 1
    def prepare_undeploy_flow(self, evc: EVC):
964
        """Prepare an evc for undeploying."""
965
        del_flows = {}
966
        try:
967
            del_flows = prepare_delete_flow(
968
                merge_flow_dicts(
969
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
970
                    evc._prepare_nni_flows(evc.current_path),
971
                    evc._prepare_nni_flows(evc.failover_path)
972
                )
973
            )
974
        # pylint: disable=broad-except
975
        except Exception:
976
            err = traceback.format_exc()
977
            log.error(f"Fail to undeploy {evc}: {err}")
978
        return del_flows
979
980 1
    def execute_undeploy(self, evcs: list[EVC]):
981
        """Process changes needed to commit an undeploy"""
982 1
        delete_flows = {}
983 1
        undeploy_evcs = list[EVC]()
984 1
        not_undeploy_evcs = list[EVC]()
985
986 1
        for evc in evcs:
987 1
            delete_flow = self.prepare_undeploy_flow(evc)
988 1
            if delete_flow:
989 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
990 1
                undeploy_evcs.append(evc)
991
            else:
992 1
                not_undeploy_evcs.append(evc)
993
994 1
        try:
995 1
            send_flow_mods_http(
996
                delete_flows,
997
                'delete'
998
            )
999
1000 1
            for evc in undeploy_evcs:
1001 1
                evc.current_path.make_vlans_available(self.controller)
1002 1
                evc.failover_path.make_vlans_available(self.controller)
1003 1
                evc.current_path = Path([])
1004 1
                evc.failover_path = Path([])
1005 1
                evc.deactivate()
1006 1
                emit_event(
1007
                    self.controller,
1008
                    "need_redeploy",
1009
                    content={"evc_id": evc.id}
1010
                )
1011 1
                log.info(f"{evc} scheduled for redeploy")
1012 1
            return undeploy_evcs, not_undeploy_evcs
1013 1
        except FlowModException as exc:
1014 1
            log.error(
1015
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1016
            )
1017 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1018
1019 1
    def handle_link_down(self, event):
1020
        """Change circuit when link is down or under_mantenance."""
1021 1
        link = event.content["link"]
1022 1
        log.info("Event handle_link_down %s", link)
1023
1024 1
        with ExitStack() as exit_stack:
1025 1
            exit_stack.enter_context(self.multi_evc_lock)
1026 1
            swap_to_failover = list[EVC]()
1027 1
            undeploy = list[EVC]()
1028 1
            clear_failover = list[EVC]()
1029 1
            evcs_to_update = dict[str, EVC]()
1030
1031 1
            for evc in self.get_evcs_by_svc_level():
1032 1
                with ExitStack() as sub_stack:
1033 1
                    sub_stack.enter_context(evc.lock)
1034 1
                    if all((
1035
                        evc.is_affected_by_link(link),
1036
                        evc.failover_path,
1037
                        not evc.is_failover_path_affected_by_link(link)
1038
                    )):
1039 1
                        swap_to_failover.append(evc)
1040 1
                    elif all((
1041
                        evc.is_affected_by_link(link),
1042
                        not evc.failover_path or
1043
                        evc.is_failover_path_affected_by_link(link)
1044
                    )):
1045 1
                        undeploy.append(evc)
1046 1
                    elif all((
1047
                        not evc.is_affected_by_link(link),
1048
                        evc.failover_path,
1049
                        evc.is_failover_path_affected_by_link(link)
1050
                    )):
1051 1
                        clear_failover.append(evc)
1052
                    else:
1053
                        continue
1054
1055 1
                    exit_stack.push(sub_stack.pop_all())
1056
1057
            # Swap from current path to failover path
1058
1059 1
            if swap_to_failover:
1060 1
                success, failure = self.execute_swap_to_failover(
1061
                    swap_to_failover
1062
                )
1063
1064 1
                clear_failover.extend(success)
1065
1066 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1067
1068 1
                undeploy.extend(failure)
1069
1070
            # Clear out failover path
1071
1072 1
            if clear_failover:
1073 1
                success, failure = self.execute_clear_failover(clear_failover)
1074
1075 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1076
1077 1
                undeploy.extend(failure)
1078
1079
            # Undeploy the evc, schedule a redeploy
1080
1081 1
            if undeploy:
1082 1
                success, failure = self.execute_undeploy(undeploy)
1083
1084 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1085
1086 1
                if failure:
1087 1
                    log.error(f"Failed to handle_link_down for {failure}")
1088
1089
            # Push update to DB
1090
1091 1
            if evcs_to_update:
1092 1
                self.mongo_controller.update_evcs(
1093
                    [evc.as_dict() for evc in evcs_to_update.values()]
1094
                )
1095
1096 1
    @listen_to("kytos/mef_eline.need_redeploy")
1097 1
    def on_evc_need_redeploy(self, event):
1098
        """Redeploy evcs that need to be redeployed."""
1099
        self.handle_evc_need_redeploy(event)
1100
1101 1
    def handle_evc_need_redeploy(self, event):
1102
        """Redeploy evcs that need to be redeployed."""
1103
        evc = self.circuits.get(event.content["evc_id"])
1104
        if evc is None:
1105
            return
1106
        with evc.lock:
1107
            if not evc.is_enabled() or evc.is_active():
1108
                return
1109
            result = evc.deploy()
1110
        event_name = "error_redeploy_link_down"
1111
        if result:
1112
            log.info(f"{evc} redeployed")
1113
            event_name = "redeployed_link_down"
1114
        emit_event(self.controller, event_name,
1115
                   content=map_evc_event_content(evc))
1116
1117 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1118 1
    def on_evc_affected_by_link_down(self, event):
1119
        """Change circuit when link is down or under_mantenance."""
1120
        self.handle_evc_affected_by_link_down(event)
1121
1122 1
    def handle_evc_affected_by_link_down(self, event):
1123
        """Change circuit when link is down or under_mantenance."""
1124 1
        evc = self.circuits.get(event.content["evc_id"])
1125 1
        link = event.content['link']
1126 1
        if not evc:
1127 1
            return
1128 1
        with evc.lock:
1129 1
            if not evc.is_affected_by_link(link):
1130
                return
1131 1
            result = evc.handle_link_down()
1132 1
        event_name = "error_redeploy_link_down"
1133 1
        if result:
1134 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1135 1
            event_name = "redeployed_link_down"
1136 1
        emit_event(self.controller, event_name,
1137
                   content=map_evc_event_content(evc))
1138
1139 1
    @listen_to(
1140
        "kytos/mef_eline.(redeployed_link_(up|down)|deployed|need_failover)"
1141
    )
1142 1
    def on_evc_deployed(self, event):
1143
        """Handle EVC deployed|redeployed_link_down."""
1144
        self.handle_evc_deployed(event)
1145
1146 1
    def handle_evc_deployed(self, event):
1147
        """Setup failover path on evc deployed."""
1148 1
        evc = self.circuits.get(event.content["evc_id"])
1149 1
        if evc is None:
1150
            return
1151 1
        with evc.lock:
1152 1
            if (
1153
                not evc.is_eligible_for_failover_path()
1154
                or not evc.is_active()
1155
                or evc.failover_path
1156
                or not evc.current_path
1157
            ):
1158 1
                return
1159 1
            evc.setup_failover_path()
1160
1161 1
    @listen_to("kytos/topology.topology_loaded")
1162 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1163
        """Load EVCs once the topology is available."""
1164
        self.load_all_evcs()
1165
1166 1
    def load_all_evcs(self):
1167
        """Try to load all EVCs on startup."""
1168 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1169 1
        for circuit_id, circuit in circuits:
1170 1
            if circuit_id not in self.circuits:
1171 1
                self._load_evc(circuit)
1172 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1173
                   timeout=1)
1174
1175 1
    def _load_evc(self, circuit_dict):
1176
        """Load one EVC from mongodb to memory."""
1177 1
        try:
1178 1
            evc = self._evc_from_dict(circuit_dict)
1179 1
        except (ValueError, KytosTagError) as exception:
1180 1
            log.error(
1181
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1182
            )
1183 1
            return None
1184 1
        if evc.archived:
1185 1
            return None
1186
1187 1
        self.circuits.setdefault(evc.id, evc)
1188 1
        self.sched.add(evc)
1189 1
        return evc
1190
1191 1
    @listen_to("kytos/flow_manager.flow.error")
1192 1
    def on_flow_mod_error(self, event):
1193
        """Handle flow mod errors related to an EVC."""
1194
        self.handle_flow_mod_error(event)
1195
1196 1
    def handle_flow_mod_error(self, event):
1197
        """Handle flow mod errors related to an EVC."""
1198 1
        flow = event.content["flow"]
1199 1
        command = event.content.get("error_command")
1200 1
        if command != "add":
1201 1
            return
1202 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1203 1
        if not evc or evc.archived or not evc.is_enabled():
1204 1
            return
1205 1
        with evc.lock:
1206 1
            evc.remove_current_flows(sync=False)
1207 1
            evc.remove_failover_flows(sync=True)
1208
1209 1
    def _evc_dict_with_instances(self, evc_dict):
1210
        """Convert some dict values to instance of EVC classes.
1211
1212
        This method will convert: [UNI, Link]
1213
        """
1214 1
        data = evc_dict.copy()  # Do not modify the original dict
1215 1
        for attribute, value in data.items():
1216
            # Get multiple attributes.
1217
            # Ex: uni_a, uni_z
1218 1
            if "uni" in attribute:
1219 1
                try:
1220 1
                    data[attribute] = self._uni_from_dict(value)
1221 1
                except ValueError as exception:
1222 1
                    result = "Error creating UNI: Invalid value"
1223 1
                    raise ValueError(result) from exception
1224
1225 1
            if attribute == "circuit_scheduler":
1226 1
                data[attribute] = []
1227 1
                for schedule in value:
1228 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1229
1230
            # Get multiple attributes.
1231
            # Ex: primary_links,
1232
            #     backup_links,
1233
            #     current_links_cache,
1234
            #     primary_links_cache,
1235
            #     backup_links_cache
1236 1
            if "links" in attribute:
1237 1
                data[attribute] = [
1238
                    self._link_from_dict(link, attribute) for link in value
1239
                ]
1240
1241
            # Ex: current_path,
1242
            #     primary_path,
1243
            #     backup_path
1244 1
            if (attribute.endswith("path") and
1245
                    attribute != "dynamic_backup_path"):
1246 1
                data[attribute] = Path(
1247
                    [self._link_from_dict(link, attribute) for link in value]
1248
                )
1249
1250 1
        return data
1251
1252 1
    def _evc_from_dict(self, evc_dict):
1253 1
        data = self._evc_dict_with_instances(evc_dict)
1254 1
        data["table_group"] = self.table_group
1255 1
        return EVC(self.controller, **data)
1256
1257 1
    def _uni_from_dict(self, uni_dict):
1258
        """Return a UNI object from python dict."""
1259 1
        if uni_dict is None:
1260 1
            return False
1261
1262 1
        interface_id = uni_dict.get("interface_id")
1263 1
        interface = self.controller.get_interface_by_id(interface_id)
1264 1
        if interface is None:
1265 1
            result = (
1266
                "Error creating UNI:"
1267
                + f"Could not instantiate interface {interface_id}"
1268
            )
1269 1
            raise ValueError(result) from ValueError
1270 1
        tag_convert = {1: "vlan"}
1271 1
        tag_dict = uni_dict.get("tag", None)
1272 1
        if tag_dict:
1273 1
            tag_type = tag_dict.get("tag_type")
1274 1
            tag_type = tag_convert.get(tag_type, tag_type)
1275 1
            tag_value = tag_dict.get("value")
1276 1
            if isinstance(tag_value, list):
1277 1
                tag_value = get_tag_ranges(tag_value)
1278 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1279 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1280
            else:
1281 1
                tag = TAG(tag_type, tag_value)
1282
        else:
1283 1
            tag = None
1284 1
        uni = UNI(interface, tag)
1285 1
        return uni
1286
1287 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1288
        """Return a Link object from python dict."""
1289 1
        id_a = link_dict.get("endpoint_a").get("id")
1290 1
        id_b = link_dict.get("endpoint_b").get("id")
1291
1292 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1293 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1294 1
        if not endpoint_a:
1295 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1296 1
            raise ValueError(error_msg)
1297 1
        if not endpoint_b:
1298
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1299
            raise ValueError(error_msg)
1300
1301 1
        link = Link(endpoint_a, endpoint_b)
1302 1
        allowed_paths = {"current_path", "failover_path"}
1303 1
        if "metadata" in link_dict and attribute in allowed_paths:
1304 1
            link.extend_metadata(link_dict.get("metadata"))
1305
1306 1
        s_vlan = link.get_metadata("s_vlan")
1307 1
        if s_vlan:
1308 1
            tag = TAG.from_dict(s_vlan)
1309 1
            if tag is False:
1310
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1311
                raise ValueError(error_msg)
1312 1
            link.update_metadata("s_vlan", tag)
1313 1
        return link
1314
1315 1
    def _find_evc_by_schedule_id(self, schedule_id):
1316
        """
1317
        Find an EVC and CircuitSchedule based on schedule_id.
1318
1319
        :param schedule_id: Schedule ID
1320
        :return: EVC and Schedule
1321
        """
1322 1
        circuits = self._get_circuits_buffer()
1323 1
        found_schedule = None
1324 1
        evc = None
1325
1326
        # pylint: disable=unused-variable
1327 1
        for c_id, circuit in circuits.items():
1328 1
            for schedule in circuit.circuit_scheduler:
1329 1
                if schedule.id == schedule_id:
1330 1
                    found_schedule = schedule
1331 1
                    evc = circuit
1332 1
                    break
1333 1
            if found_schedule:
1334 1
                break
1335 1
        return evc, found_schedule
1336
1337 1
    def _get_circuits_buffer(self):
1338
        """
1339
        Return the circuit buffer.
1340
1341
        If the buffer is empty, try to load data from mongodb.
1342
        """
1343 1
        if not self.circuits:
1344
            # Load circuits from mongodb to buffer
1345 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1346 1
            for c_id, circuit in circuits.items():
1347 1
                evc = self._evc_from_dict(circuit)
1348 1
                self.circuits[c_id] = evc
1349 1
        return self.circuits
1350
1351
    # pylint: disable=attribute-defined-outside-init
1352 1
    @alisten_to("kytos/of_multi_table.enable_table")
1353 1
    async def on_table_enabled(self, event):
1354
        """Handle a recently table enabled."""
1355 1
        table_group = event.content.get("mef_eline", None)
1356 1
        if not table_group:
1357 1
            return
1358 1
        for group in table_group:
1359 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1360 1
                log.error(f'The table group "{group}" is not allowed for '
1361
                          f'mef_eline. Allowed table groups are '
1362
                          f'{settings.TABLE_GROUP_ALLOWED}')
1363 1
                return
1364 1
        self.table_group.update(table_group)
1365 1
        content = {"group_table": self.table_group}
1366 1
        name = "kytos/mef_eline.enable_table"
1367
        await aemit_event(self.controller, name, content)
1368