Issues (28)

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         check_disabled_component, emit_event,
36
                                         get_vlan_tags_and_masks,
37
                                         map_evc_event_content,
38
                                         merge_flow_dicts, prepare_delete_flow,
39
                                         send_flow_mods_http)
40
41
42
# pylint: disable=too-many-public-methods
43 1
class Main(KytosNApp):
44
    """Main class of amlight/mef_eline NApp.
45
46
    This class is the entry point for this napp.
47
    """
48
49 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
50
51 1
    def setup(self):
52
        """Replace the '__init__' method for the KytosNApp subclass.
53
54
        The setup method is automatically called by the controller when your
55
        application is loaded.
56
57
        So, if you have any setup routine, insert it here.
58
        """
59
        # object used to scheduler circuit events
60 1
        self.sched = Scheduler()
61
62
        # object to save and load circuits
63 1
        self.mongo_controller = self.get_eline_controller()
64 1
        self.mongo_controller.bootstrap_indexes()
65
66
        # set the controller that will manager the dynamic paths
67 1
        DynamicPathManager.set_controller(self.controller)
68
69
        # dictionary of EVCs created. It acts as a circuit buffer.
70
        # Every create/update/delete must be synced to mongodb.
71 1
        self.circuits = dict[str, EVC]()
72
73 1
        self._intf_events = defaultdict(dict)
74 1
        self._lock_interfaces = defaultdict(Lock)
75 1
        self.table_group = {"epl": 0, "evpl": 0}
76 1
        self._lock = Lock()
77 1
        self.multi_evc_lock = Lock()
78 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
79
80 1
        self.load_all_evcs()
81 1
        self._topology_updated_at = None
82
83 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
84
        """Get circuits sorted by desc service level and asc creation_time.
85
86
        In the future, as more ops are offloaded it should be get from the DB.
87
        """
88 1
        if enable_filter:
89 1
            return sorted(
90
                          [circuit for circuit in self.circuits.values()
91
                           if circuit.is_enabled()],
92
                          key=lambda x: (-x.service_level, x.creation_time),
93
            )
94 1
        return sorted(self.circuits.values(),
95
                      key=lambda x: (-x.service_level, x.creation_time))
96
97 1
    @staticmethod
98 1
    def get_eline_controller():
99
        """Return the ELineController instance."""
100
        return controllers.ELineController()
101
102 1
    def execute(self):
103
        """Execute once when the napp is running."""
104 1
        if self._lock.locked():
105 1
            return
106 1
        log.debug("Starting consistency routine")
107 1
        with self._lock:
108 1
            self.execute_consistency()
109 1
        log.debug("Finished consistency routine")
110
111 1
    def should_be_checked(self, circuit):
112
        "Verify if the circuit meets the necessary conditions to be checked"
113
        # pylint: disable=too-many-boolean-expressions
114 1
        if (
115
                circuit.is_enabled()
116
                and not circuit.is_active()
117
                and not circuit.lock.locked()
118
                and not circuit.has_recent_removed_flow()
119
                and not circuit.is_recent_updated()
120
                and circuit.are_unis_active()
121
                # if a inter-switch EVC does not have current_path, it does not
122
                # make sense to run sdntrace on it
123
                and (circuit.is_intra_switch() or circuit.current_path)
124
                ):
125 1
            return True
126
        return False
127
128 1
    def execute_consistency(self):
129
        """Execute consistency routine."""
130 1
        circuits_to_check = []
131 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
132 1
            if self.should_be_checked(circuit):
133 1
                circuits_to_check.append(circuit)
134 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
135 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
136 1
        for circuit in circuits_to_check:
137 1
            is_checked = circuits_checked.get(circuit.id)
138 1
            if is_checked:
139 1
                circuit.execution_rounds = 0
140 1
                log.info(f"{circuit} enabled but inactive - activating")
141 1
                with circuit.lock:
142 1
                    circuit.activate()
143 1
                    circuit.sync()
144
            else:
145 1
                circuit.execution_rounds += 1
146 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
147 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
148 1
                    with circuit.lock:
149 1
                        circuit.deploy()
150
151 1
    def shutdown(self):
152
        """Execute when your napp is unloaded.
153
154
        If you have some cleanup procedure, insert it here.
155
        """
156
157 1
    @rest("/v2/evc/", methods=["GET"])
158 1
    def list_circuits(self, request: Request) -> JSONResponse:
159
        """Endpoint to return circuits stored.
160
161
        archive query arg if defined (not null) will be filtered
162
        accordingly, by default only non archived evcs will be listed
163
        """
164 1
        log.debug("list_circuits /v2/evc")
165 1
        args = request.query_params
166 1
        archived = args.get("archived", "false").lower()
167 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
168 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
169
                                                      metadata=args)
170 1
        circuits = circuits['circuits']
171 1
        return JSONResponse(circuits)
172
173 1
    @rest("/v2/evc/schedule", methods=["GET"])
174 1
    def list_schedules(self, _request: Request) -> JSONResponse:
175
        """Endpoint to return all schedules stored for all circuits.
176
177
        Return a JSON with the following template:
178
        [{"schedule_id": <schedule_id>,
179
         "circuit_id": <circuit_id>,
180
         "schedule": <schedule object>}]
181
        """
182 1
        log.debug("list_schedules /v2/evc/schedule")
183 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
184 1
        if not circuits:
185 1
            result = {}
186 1
            status = 200
187 1
            return JSONResponse(result, status_code=status)
188
189 1
        result = []
190 1
        status = 200
191 1
        for circuit in circuits:
192 1
            circuit_scheduler = circuit.get("circuit_scheduler")
193 1
            if circuit_scheduler:
194 1
                for scheduler in circuit_scheduler:
195 1
                    value = {
196
                        "schedule_id": scheduler.get("id"),
197
                        "circuit_id": circuit.get("id"),
198
                        "schedule": scheduler,
199
                    }
200 1
                    result.append(value)
201
202 1
        log.debug("list_schedules result %s %s", result, status)
203 1
        return JSONResponse(result, status_code=status)
204
205 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
206 1
    def get_circuit(self, request: Request) -> JSONResponse:
207
        """Endpoint to return a circuit based on id."""
208 1
        circuit_id = request.path_params["circuit_id"]
209 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
210 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
211 1
        if not circuit:
212 1
            result = f"circuit_id {circuit_id} not found"
213 1
            log.debug("get_circuit result %s %s", result, 404)
214 1
            raise HTTPException(404, detail=result)
215 1
        status = 200
216 1
        log.debug("get_circuit result %s %s", circuit, status)
217 1
        return JSONResponse(circuit, status_code=status)
218
219
    # pylint: disable=too-many-branches, too-many-statements
220 1
    @rest("/v2/evc/", methods=["POST"])
221 1
    @validate_openapi(spec)
222 1
    def create_circuit(self, request: Request) -> JSONResponse:
223
        """Try to create a new circuit.
224
225
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
226
        UNI_Z's requested C-VID are available from the interfaces' pools. This
227
        is checked when creating the UNI object.
228
229
        Then, E-Line NApp requests a primary and a backup path to the
230
        Pathfinder NApp using the attributes primary_links and backup_links
231
        submitted via REST
232
233
        # For each link composing paths in #3:
234
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
235
        #  - Using the S-VID obtained, generate abstract flow entries to be
236
        #    sent to FlowManager
237
238
        Push abstract flow entries to FlowManager and FlowManager pushes
239
        OpenFlow entries to datapaths
240
241
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
242
        creation
243
244
        Finnaly, notify user of the status of its request.
245
        """
246
        # Try to create the circuit object
247 1
        log.debug("create_circuit /v2/evc/")
248 1
        data = get_json_or_400(request, self.controller.loop)
249
250 1
        try:
251 1
            evc = self._evc_from_dict(data)
252 1
        except (ValueError, KytosTagError) as exception:
253 1
            log.debug("create_circuit result %s %s", exception, 400)
254 1
            raise HTTPException(400, detail=str(exception)) from exception
255 1
        try:
256 1
            check_disabled_component(evc.uni_a, evc.uni_z)
257 1
        except DisabledSwitch as exception:
258 1
            log.debug("create_circuit result %s %s", exception, 409)
259 1
            raise HTTPException(
260
                    409,
261
                    detail=f"Path is not valid: {exception}"
262
                ) from exception
263
264 1
        if evc.primary_path:
265 1
            try:
266 1
                evc.primary_path.is_valid(
267
                    evc.uni_a.interface.switch,
268
                    evc.uni_z.interface.switch,
269
                    bool(evc.circuit_scheduler),
270
                )
271 1
            except InvalidPath as exception:
272 1
                raise HTTPException(
273
                    400,
274
                    detail=f"primary_path is not valid: {exception}"
275
                ) from exception
276 1
        if evc.backup_path:
277 1
            try:
278 1
                evc.backup_path.is_valid(
279
                    evc.uni_a.interface.switch,
280
                    evc.uni_z.interface.switch,
281
                    bool(evc.circuit_scheduler),
282
                )
283 1
            except InvalidPath as exception:
284 1
                raise HTTPException(
285
                    400,
286
                    detail=f"backup_path is not valid: {exception}"
287
                ) from exception
288
289 1
        if not evc._tag_lists_equal():
290 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
291 1
            raise HTTPException(400, detail=detail)
292
293 1
        try:
294 1
            evc._validate_has_primary_or_dynamic()
295 1
        except ValueError as exception:
296 1
            raise HTTPException(400, detail=str(exception)) from exception
297
298 1
        try:
299 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
300
        except DuplicatedNoTagUNI as exception:
301
            log.debug("create_circuit result %s %s", exception, 409)
302
            raise HTTPException(409, detail=str(exception)) from exception
303
304 1
        try:
305 1
            self._use_uni_tags(evc)
306 1
        except KytosTagError as exception:
307 1
            raise HTTPException(400, detail=str(exception)) from exception
308
309
        # save circuit
310 1
        try:
311 1
            evc.sync()
312
        except ValidationError as exception:
313
            raise HTTPException(400, detail=str(exception)) from exception
314
315
        # store circuit in dictionary
316 1
        self.circuits[evc.id] = evc
317
318
        # Schedule the circuit deploy
319 1
        self.sched.add(evc)
320
321
        # Circuit has no schedule, deploy now
322 1
        deployed = False
323 1
        if not evc.circuit_scheduler:
324 1
            with evc.lock:
325 1
                deployed = evc.deploy()
326
327
        # Notify users
328 1
        result = {"circuit_id": evc.id, "deployed": deployed}
329 1
        status = 201
330 1
        log.debug("create_circuit result %s %s", result, status)
331 1
        emit_event(self.controller, name="created",
332
                   content=map_evc_event_content(evc))
333 1
        return JSONResponse(result, status_code=status)
334
335 1
    @staticmethod
336 1
    def _use_uni_tags(evc):
337 1
        uni_a = evc.uni_a
338 1
        evc._use_uni_vlan(uni_a)
339 1
        try:
340 1
            uni_z = evc.uni_z
341 1
            evc._use_uni_vlan(uni_z)
342 1
        except KytosTagError as err:
343 1
            evc.make_uni_vlan_available(uni_a)
344 1
            raise err
345
346 1
    @listen_to('kytos/flow_manager.flow.removed')
347 1
    def on_flow_delete(self, event):
348
        """Capture delete messages to keep track when flows got removed."""
349
        self.handle_flow_delete(event)
350
351 1
    def handle_flow_delete(self, event):
352
        """Keep track when the EVC got flows removed by deriving its cookie."""
353 1
        flow = event.content["flow"]
354 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
355 1
        if evc:
356 1
            log.debug("Flow removed in EVC %s", evc.id)
357 1
            evc.set_flow_removed_at()
358
359 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
360 1
    @validate_openapi(spec)
361 1
    def update(self, request: Request) -> JSONResponse:
362
        """Update a circuit based on payload.
363
364
        The EVC attributes (creation_time, active, current_path,
365
        failover_path, _id, archived) can't be updated.
366
        """
367 1
        data = get_json_or_400(request, self.controller.loop)
368 1
        circuit_id = request.path_params["circuit_id"]
369 1
        log.debug("update /v2/evc/%s", circuit_id)
370 1
        try:
371 1
            evc = self.circuits[circuit_id]
372 1
        except KeyError:
373 1
            result = f"circuit_id {circuit_id} not found"
374 1
            log.debug("update result %s %s", result, 404)
375 1
            raise HTTPException(404, detail=result) from KeyError
376
377 1
        try:
378 1
            updated_data = self._evc_dict_with_instances(data)
379 1
            self._check_no_tag_duplication(
380
                circuit_id, updated_data.get("uni_a"),
381
                updated_data.get("uni_z")
382
            )
383 1
            enable, redeploy = evc.update(**updated_data)
384 1
        except (ValueError, KytosTagError, ValidationError) as exception:
385 1
            log.debug("update result %s %s", exception, 400)
386 1
            raise HTTPException(400, detail=str(exception)) from exception
387 1
        except DuplicatedNoTagUNI as exception:
388
            log.debug("update result %s %s", exception, 409)
389
            raise HTTPException(409, detail=str(exception)) from exception
390 1
        except DisabledSwitch as exception:
391 1
            log.debug("update result %s %s", exception, 409)
392 1
            raise HTTPException(
393
                    409,
394
                    detail=f"Path is not valid: {exception}"
395
                ) from exception
396 1
        redeployed = False
397 1
        if evc.is_active():
398
            if enable is False:  # disable if active
399
                with evc.lock:
400
                    evc.remove()
401
            elif redeploy is not None:  # redeploy if active
402
                with evc.lock:
403
                    evc.remove()
404
                    redeployed = evc.deploy()
405
        else:
406 1
            if enable is True:  # enable if inactive
407 1
                with evc.lock:
408 1
                    redeployed = evc.deploy()
409 1
            elif evc.is_enabled() and redeploy:
410 1
                with evc.lock:
411 1
                    evc.remove()
412 1
                    redeployed = evc.deploy()
413 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
414 1
        status = 200
415
416 1
        log.debug("update result %s %s", result, status)
417 1
        emit_event(self.controller, "updated",
418
                   content=map_evc_event_content(evc, **data))
419 1
        return JSONResponse(result, status_code=status)
420
421 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
422 1
    def delete_circuit(self, request: Request) -> JSONResponse:
423
        """Remove a circuit.
424
425
        First, the flows are removed from the switches, and then the EVC is
426
        disabled.
427
        """
428 1
        circuit_id = request.path_params["circuit_id"]
429 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
430 1
        try:
431 1
            evc = self.circuits.pop(circuit_id)
432 1
        except KeyError:
433 1
            result = f"circuit_id {circuit_id} not found"
434 1
            log.debug("delete_circuit result %s %s", result, 404)
435 1
            raise HTTPException(404, detail=result) from KeyError
436 1
        log.info("Removing %s", evc)
437
438 1
        with evc.lock:
439 1
            if not evc.archived:
440 1
                evc.deactivate()
441 1
                evc.disable()
442 1
                self.sched.remove(evc)
443 1
                evc.remove_current_flows(sync=False)
444 1
                evc.remove_failover_flows(sync=False)
445 1
                evc.archive()
446 1
                evc.remove_uni_tags()
447 1
                evc.sync()
448 1
                emit_event(
449
                    self.controller, "deleted",
450
                    content=map_evc_event_content(evc)
451
                )
452
453 1
        log.info("EVC removed. %s", evc)
454 1
        result = {"response": f"Circuit {circuit_id} removed"}
455 1
        status = 200
456 1
        log.debug("delete_circuit result %s %s", result, status)
457
458 1
        return JSONResponse(result, status_code=status)
459
460 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
461 1
    def get_metadata(self, request: Request) -> JSONResponse:
462
        """Get metadata from an EVC."""
463 1
        circuit_id = request.path_params["circuit_id"]
464 1
        try:
465 1
            return (
466
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
467
            )
468
        except KeyError as error:
469
            raise HTTPException(
470
                404,
471
                detail=f"circuit_id {circuit_id} not found."
472
            ) from error
473
474 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
475 1
    @validate_openapi(spec)
476 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
477
        """Add metadata to a bulk of EVCs."""
478 1
        data = get_json_or_400(request, self.controller.loop)
479 1
        circuit_ids = data.pop("circuit_ids")
480
481 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
482
483 1
        fail_evcs = []
484 1
        for _id in circuit_ids:
485 1
            try:
486 1
                evc = self.circuits[_id]
487 1
                evc.extend_metadata(data)
488 1
            except KeyError:
489 1
                fail_evcs.append(_id)
490
491 1
        if fail_evcs:
492 1
            raise HTTPException(404, detail=fail_evcs)
493 1
        return JSONResponse("Operation successful", status_code=201)
494
495 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
496 1
    @validate_openapi(spec)
497 1
    def add_metadata(self, request: Request) -> JSONResponse:
498
        """Add metadata to an EVC."""
499 1
        circuit_id = request.path_params["circuit_id"]
500 1
        metadata = get_json_or_400(request, self.controller.loop)
501 1
        if not isinstance(metadata, dict):
502
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
503 1
        try:
504 1
            evc = self.circuits[circuit_id]
505 1
        except KeyError as error:
506 1
            raise HTTPException(
507
                404,
508
                detail=f"circuit_id {circuit_id} not found."
509
            ) from error
510
511 1
        evc.extend_metadata(metadata)
512 1
        evc.sync()
513 1
        return JSONResponse("Operation successful", status_code=201)
514
515 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
516 1
    @validate_openapi(spec)
517 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
518
        """Delete metada from a bulk of EVCs"""
519 1
        data = get_json_or_400(request, self.controller.loop)
520 1
        key = request.path_params["key"]
521 1
        circuit_ids = data.pop("circuit_ids")
522 1
        self.mongo_controller.update_evcs_metadata(
523
            circuit_ids, {key: ""}, "del"
524
        )
525
526 1
        fail_evcs = []
527 1
        for _id in circuit_ids:
528 1
            try:
529 1
                evc = self.circuits[_id]
530 1
                evc.remove_metadata(key)
531 1
            except KeyError:
532 1
                fail_evcs.append(_id)
533
534 1
        if fail_evcs:
535 1
            raise HTTPException(404, detail=fail_evcs)
536 1
        return JSONResponse("Operation successful")
537
538 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
539 1
    def delete_metadata(self, request: Request) -> JSONResponse:
540
        """Delete metadata from an EVC."""
541 1
        circuit_id = request.path_params["circuit_id"]
542 1
        key = request.path_params["key"]
543 1
        try:
544 1
            evc = self.circuits[circuit_id]
545 1
        except KeyError as error:
546 1
            raise HTTPException(
547
                404,
548
                detail=f"circuit_id {circuit_id} not found."
549
            ) from error
550
551 1
        evc.remove_metadata(key)
552 1
        evc.sync()
553 1
        return JSONResponse("Operation successful")
554
555 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
556 1
    def redeploy(self, request: Request) -> JSONResponse:
557
        """Endpoint to force the redeployment of an EVC."""
558 1
        circuit_id = request.path_params["circuit_id"]
559 1
        try_avoid_same_s_vlan = request.query_params.get(
560
            "try_avoid_same_s_vlan", "true"
561
        )
562 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
563 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
564
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
565
            raise HTTPException(400, detail=msg)
566 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
567 1
        try:
568 1
            evc = self.circuits[circuit_id]
569 1
        except KeyError:
570 1
            raise HTTPException(
571
                404,
572
                detail=f"circuit_id {circuit_id} not found"
573
            ) from KeyError
574 1
        deployed = False
575 1
        if evc.is_enabled():
576 1
            with evc.lock:
577 1
                path_dict = evc.remove_current_flows(
578
                    sync=False,
579
                    return_path=try_avoid_same_s_vlan == "true"
580
                )
581 1
                evc.remove_failover_flows(sync=True)
582 1
                deployed = evc.deploy(path_dict)
583 1
        if deployed:
584 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
585 1
            status = 202
586
        else:
587 1
            result = {
588
                "response": f"Circuit {circuit_id} is disabled."
589
            }
590 1
            status = 409
591
592 1
        return JSONResponse(result, status_code=status)
593
594 1
    @rest("/v2/evc/schedule/", methods=["POST"])
595 1
    @validate_openapi(spec)
596 1
    def create_schedule(self, request: Request) -> JSONResponse:
597
        """
598
        Create a new schedule for a given circuit.
599
600
        This service do no check if there are conflicts with another schedule.
601
        Payload example:
602
            {
603
              "circuit_id":"aa:bb:cc",
604
              "schedule": {
605
                "date": "2019-08-07T14:52:10.967Z",
606
                "interval": "string",
607
                "frequency": "1 * * * *",
608
                "action": "create"
609
              }
610
            }
611
        """
612 1
        log.debug("create_schedule /v2/evc/schedule/")
613 1
        data = get_json_or_400(request, self.controller.loop)
614 1
        circuit_id = data["circuit_id"]
615 1
        schedule_data = data["schedule"]
616
617
        # Get EVC from circuits buffer
618 1
        circuits = self._get_circuits_buffer()
619
620
        # get the circuit
621 1
        evc = circuits.get(circuit_id)
622
623
        # get the circuit
624 1
        if not evc:
625 1
            result = f"circuit_id {circuit_id} not found"
626 1
            log.debug("create_schedule result %s %s", result, 404)
627 1
            raise HTTPException(404, detail=result)
628
629
        # new schedule from dict
630 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
631
632
        # If there is no schedule, create the list
633 1
        if not evc.circuit_scheduler:
634 1
            evc.circuit_scheduler = []
635
636
        # Add the new schedule
637 1
        evc.circuit_scheduler.append(new_schedule)
638
639
        # Add schedule job
640 1
        self.sched.add_circuit_job(evc, new_schedule)
641
642
        # save circuit to mongodb
643 1
        evc.sync()
644
645 1
        result = new_schedule.as_dict()
646 1
        status = 201
647
648 1
        log.debug("create_schedule result %s %s", result, status)
649 1
        return JSONResponse(result, status_code=status)
650
651 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
652 1
    @validate_openapi(spec)
653 1
    def update_schedule(self, request: Request) -> JSONResponse:
654
        """Update a schedule.
655
656
        Change all attributes from the given schedule from a EVC circuit.
657
        The schedule ID is preserved as default.
658
        Payload example:
659
            {
660
              "date": "2019-08-07T14:52:10.967Z",
661
              "interval": "string",
662
              "frequency": "1 * * *",
663
              "action": "create"
664
            }
665
        """
666 1
        data = get_json_or_400(request, self.controller.loop)
667 1
        schedule_id = request.path_params["schedule_id"]
668 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
669
670
        # Try to find a circuit schedule
671 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
672
673
        # Can not modify circuits deleted and archived
674 1
        if not found_schedule:
675 1
            result = f"schedule_id {schedule_id} not found"
676 1
            log.debug("update_schedule result %s %s", result, 404)
677 1
            raise HTTPException(404, detail=result)
678
679 1
        new_schedule = CircuitSchedule.from_dict(data)
680 1
        new_schedule.id = found_schedule.id
681
        # Remove the old schedule
682 1
        evc.circuit_scheduler.remove(found_schedule)
683
        # Append the modified schedule
684 1
        evc.circuit_scheduler.append(new_schedule)
685
686
        # Cancel all schedule jobs
687 1
        self.sched.cancel_job(found_schedule.id)
688
        # Add the new circuit schedule
689 1
        self.sched.add_circuit_job(evc, new_schedule)
690
        # Save EVC to mongodb
691 1
        evc.sync()
692
693 1
        result = new_schedule.as_dict()
694 1
        status = 200
695
696 1
        log.debug("update_schedule result %s %s", result, status)
697 1
        return JSONResponse(result, status_code=status)
698
699 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
700 1
    def delete_schedule(self, request: Request) -> JSONResponse:
701
        """Remove a circuit schedule.
702
703
        Remove the Schedule from EVC.
704
        Remove the Schedule from cron job.
705
        Save the EVC to the Storehouse.
706
        """
707 1
        schedule_id = request.path_params["schedule_id"]
708 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
709 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
710
711
        # Can not modify circuits deleted and archived
712 1
        if not found_schedule:
713 1
            result = f"schedule_id {schedule_id} not found"
714 1
            log.debug("delete_schedule result %s %s", result, 404)
715 1
            raise HTTPException(404, detail=result)
716
717
        # Remove the old schedule
718 1
        evc.circuit_scheduler.remove(found_schedule)
719
720
        # Cancel all schedule jobs
721 1
        self.sched.cancel_job(found_schedule.id)
722
        # Save EVC to mongodb
723 1
        evc.sync()
724
725 1
        result = "Schedule removed"
726 1
        status = 200
727
728 1
        log.debug("delete_schedule result %s %s", result, status)
729 1
        return JSONResponse(result, status_code=status)
730
731 1
    def _check_no_tag_duplication(
732
        self,
733
        evc_id: str,
734
        uni_a: Optional[UNI] = None,
735
        uni_z: Optional[UNI] = None
736
    ):
737
        """Check if the given EVC has UNIs with no tag and if these are
738
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
739
        Args:
740
            evc (dict): EVC to be analyzed.
741
        """
742
743
        # No UNIs
744 1
        if not (uni_a or uni_z):
745 1
            return
746
747 1
        if (not (uni_a and not uni_a.user_tag) and
748
                not (uni_z and not uni_z.user_tag)):
749 1
            return
750 1
        for circuit in self.circuits.copy().values():
751 1
            if (not circuit.archived and circuit._id != evc_id):
752 1
                if uni_a and uni_a.user_tag is None:
753 1
                    circuit.check_no_tag_duplicate(uni_a)
754 1
                if uni_z and uni_z.user_tag is None:
755 1
                    circuit.check_no_tag_duplicate(uni_z)
756
757 1
    @listen_to("kytos/topology.link_up")
758 1
    def on_link_up(self, event):
759
        """Change circuit when link is up or end_maintenance."""
760
        self.handle_link_up(event)
761
762 1
    def handle_link_up(self, event):
763
        """Change circuit when link is up or end_maintenance."""
764 1
        log.info("Event handle_link_up %s", event.content["link"])
765 1
        for evc in self.get_evcs_by_svc_level():
766 1
            if evc.is_enabled() and not evc.archived:
767 1
                with evc.lock:
768 1
                    evc.handle_link_up(event.content["link"])
769
770
    # Possibly replace this with interruptions?
771 1
    @listen_to(
772
        '.*.switch.interface.(link_up|link_down|created|deleted)'
773
    )
774 1
    def on_interface_link_change(self, event: KytosEvent):
775
        """
776
        Handler for interface link_up and link_down events.
777
        """
778
        self.handle_on_interface_link_change(event)
779
780 1
    def handle_on_interface_link_change(self, event: KytosEvent):
781
        """
782
        Handler to sort interface events {link_(up, down), create, deleted}
783
784
        To avoid multiple database updated (link flap):
785
        Every interface is identfied and processed in parallel.
786
        Once an interface event is received a time is started.
787
        While time is running self._intf_events will be updated.
788
        After time has passed last received event will be processed.
789
        """
790 1
        iface = event.content.get("interface")
791 1
        with self._lock_interfaces[iface.id]:
792 1
            _now = event.timestamp
793
            # Return out of order events
794 1
            if (
795
                iface.id in self._intf_events
796
                and self._intf_events[iface.id]["event"].timestamp > _now
797
            ):
798 1
                return
799 1
            self._intf_events[iface.id].update({"event": event})
800 1
            if "last_acquired" in self._intf_events[iface.id]:
801 1
                return
802 1
            self._intf_events[iface.id].update({"last_acquired": now()})
803 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
804 1
        with self._lock_interfaces[iface.id]:
805 1
            event = self._intf_events[iface.id]["event"]
806 1
            self._intf_events[iface.id].pop('last_acquired', None)
807 1
            _, _, event_type = event.name.rpartition('.')
808 1
            if event_type in ('link_up', 'created'):
809 1
                self.handle_interface_link_up(iface)
810 1
            elif event_type in ('link_down', 'deleted'):
811 1
                self.handle_interface_link_down(iface)
812
813 1
    def handle_interface_link_up(self, interface):
814
        """
815
        Handler for interface link_up events
816
        """
817
        log.info("Event handle_interface_link_up %s", interface)
818
        for evc in self.get_evcs_by_svc_level():
819
            if _does_uni_affect_evc(evc, interface, "up"):
820
                with evc.lock:
821
                    evc.handle_interface_link_up(
822
                        interface
823
                    )
824
825 1
    def handle_interface_link_down(self, interface):
826
        """
827
        Handler for interface link_down events
828
        """
829
        log.info("Event handle_interface_link_down %s", interface)
830
        for evc in self.get_evcs_by_svc_level():
831
            if _does_uni_affect_evc(evc, interface, "down"):
832
                with evc.lock:
833
                    evc.handle_interface_link_down(
834
                        interface
835
                    )
836
837 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
838 1
    def on_link_down(self, event):
839
        """Change circuit when link is down or under_mantenance."""
840
        self.handle_link_down(event)
841
842 1
    def prepare_swap_to_failover_flow(self, evc: EVC):
843
        """Prepare an evc for switching to failover."""
844
        install_flows = {}
845
        try:
846
            install_flows = evc.get_failover_flows()
847
        # pylint: disable=broad-except
848
        except Exception:
849
            err = traceback.format_exc()
850
            log.error(
851
                "Ignore Failover path for "
852
                f"{evc} due to error: {err}"
853
            )
854
        return install_flows
855
856 1
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
857
        """Prepare event contents for swap to failover."""
858
        return map_evc_event_content(
859
            evc,
860
            flows=deepcopy(install_flow)
861
        )
862
863 1
    def execute_swap_to_failover(
864
        self,
865
        evcs: list[EVC]
866
    ) -> tuple[list[EVC], list[EVC]]:
867
        """Process changes needed to commit a swap to failover."""
868 1
        event_contents = {}
869 1
        install_flows = {}
870 1
        swapped_evcs = list[EVC]()
871 1
        not_swapped_evcs = list[EVC]()
872
873 1
        for evc in evcs:
874 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
875 1
            if install_flow:
876 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
877 1
                event_contents[evc.id] =\
878
                    self.prepare_swap_to_failover_event(evc, install_flow)
879 1
                swapped_evcs.append(evc)
880
            else:
881 1
                not_swapped_evcs.append(evc)
882
883 1
        try:
884 1
            send_flow_mods_http(
885
                install_flows,
886
                "install"
887
            )
888 1
            for evc in swapped_evcs:
889 1
                temp_path = evc.current_path
890 1
                evc.current_path = evc.failover_path
891 1
                evc.failover_path = temp_path
892 1
            emit_event(
893
                self.controller, "failover_link_down",
894
                content=deepcopy(event_contents)
895
            )
896 1
            return swapped_evcs, not_swapped_evcs
897 1
        except FlowModException as exc:
898 1
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
899 1
            return [], [*swapped_evcs, *not_swapped_evcs]
900
901 1
    def prepare_clear_failover_flow(self, evc: EVC):
902
        """Prepare an evc for clearing the old path."""
903
        del_flows = {}
904
        try:
905
            del_flows = prepare_delete_flow(
906
                merge_flow_dicts(
907
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
908
                    evc._prepare_nni_flows(evc.failover_path)
909
                )
910
            )
911
        # pylint: disable=broad-except
912
        except Exception:
913
            err = traceback.format_exc()
914
            log.error(f"Fail to remove {evc} old_path: {err}")
915
        return del_flows
916
917 1
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
918
        """Prepare event contents for clearing failover."""
919
        return map_evc_event_content(
920
            evc,
921
            current_path=evc.current_path.as_dict(),
922
            removed_flows=deepcopy(delete_flow)
923
        )
924
925 1
    def execute_clear_failover(
926
        self,
927
        evcs: list[EVC]
928
    ) -> tuple[list[EVC], list[EVC]]:
929
        """Process changes needed to commit clearing the failover path"""
930 1
        event_contents = {}
931 1
        delete_flows = {}
932 1
        cleared_evcs = list[EVC]()
933 1
        not_cleared_evcs = list[EVC]()
934
935 1
        for evc in evcs:
936 1
            delete_flow = self.prepare_clear_failover_flow(evc)
937 1
            if delete_flow:
938 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
939 1
                event_contents[evc.id] =\
940
                    self.prepare_clear_failover_event(evc, delete_flow)
941 1
                cleared_evcs.append(evc)
942
            else:
943 1
                not_cleared_evcs.append(evc)
944
945 1
        try:
946 1
            send_flow_mods_http(
947
                delete_flows,
948
                'delete'
949
            )
950 1
            for evc in cleared_evcs:
951 1
                evc.failover_path.make_vlans_available(self.controller)
952 1
                evc.failover_path = Path([])
953 1
            emit_event(
954
                self.controller,
955
                "failover_old_path",
956
                content=event_contents
957
            )
958 1
            return cleared_evcs, not_cleared_evcs
959 1
        except FlowModException as exc:
960 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
961 1
            return [], [*cleared_evcs, *not_cleared_evcs]
962
963 1
    def prepare_undeploy_flow(self, evc: EVC):
964
        """Prepare an evc for undeploying."""
965
        del_flows = {}
966
        try:
967
            del_flows = prepare_delete_flow(
968
                merge_flow_dicts(
969
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
970
                    evc._prepare_nni_flows(evc.current_path),
971
                    evc._prepare_nni_flows(evc.failover_path)
972
                )
973
            )
974
        # pylint: disable=broad-except
975
        except Exception:
976
            err = traceback.format_exc()
977
            log.error(f"Fail to undeploy {evc}: {err}")
978
        return del_flows
979
980 1
    def execute_undeploy(self, evcs: list[EVC]):
981
        """Process changes needed to commit an undeploy"""
982 1
        delete_flows = {}
983 1
        undeploy_evcs = list[EVC]()
984 1
        not_undeploy_evcs = list[EVC]()
985
986 1
        for evc in evcs:
987 1
            delete_flow = self.prepare_undeploy_flow(evc)
988 1
            if delete_flow:
989 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
990 1
                undeploy_evcs.append(evc)
991
            else:
992 1
                not_undeploy_evcs.append(evc)
993
994 1
        try:
995 1
            send_flow_mods_http(
996
                delete_flows,
997
                'delete'
998
            )
999
1000 1
            for evc in undeploy_evcs:
1001 1
                evc.current_path.make_vlans_available(self.controller)
1002 1
                evc.failover_path.make_vlans_available(self.controller)
1003 1
                evc.current_path = Path([])
1004 1
                evc.failover_path = Path([])
1005 1
                evc.deactivate()
1006 1
                emit_event(
1007
                    self.controller,
1008
                    "need_redeploy",
1009
                    content={"evc_id": evc.id}
1010
                )
1011 1
                log.info(f"{evc} scheduled for redeploy")
1012 1
            return undeploy_evcs, not_undeploy_evcs
1013 1
        except FlowModException as exc:
1014 1
            log.error(
1015
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1016
            )
1017 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1018
1019 1
    def handle_link_down(self, event):
1020
        """Change circuit when link is down or under_mantenance."""
1021 1
        link = event.content["link"]
1022 1
        log.info("Event handle_link_down %s", link)
1023
1024 1
        with ExitStack() as exit_stack:
1025 1
            exit_stack.enter_context(self.multi_evc_lock)
1026 1
            swap_to_failover = list[EVC]()
1027 1
            undeploy = list[EVC]()
1028 1
            clear_failover = list[EVC]()
1029 1
            evcs_to_update = dict[str, EVC]()
1030
1031 1
            for evc in self.get_evcs_by_svc_level():
1032 1
                with ExitStack() as sub_stack:
1033 1
                    sub_stack.enter_context(evc.lock)
1034 1
                    if all((
1035
                        evc.is_affected_by_link(link),
1036
                        evc.failover_path,
1037
                        not evc.is_failover_path_affected_by_link(link)
1038
                    )):
1039 1
                        swap_to_failover.append(evc)
1040 1
                    elif all((
1041
                        evc.is_affected_by_link(link),
1042
                        not evc.failover_path or
1043
                        evc.is_failover_path_affected_by_link(link)
1044
                    )):
1045 1
                        undeploy.append(evc)
1046 1
                    elif all((
1047
                        not evc.is_affected_by_link(link),
1048
                        evc.failover_path,
1049
                        evc.is_failover_path_affected_by_link(link)
1050
                    )):
1051 1
                        clear_failover.append(evc)
1052
                    else:
1053
                        continue
1054
1055 1
                    exit_stack.push(sub_stack.pop_all())
1056
1057
            # Swap from current path to failover path
1058
1059 1
            if swap_to_failover:
1060 1
                success, failure = self.execute_swap_to_failover(
1061
                    swap_to_failover
1062
                )
1063
1064 1
                clear_failover.extend(success)
1065
1066 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1067
1068 1
                undeploy.extend(failure)
1069
1070
            # Clear out failover path
1071
1072 1
            if clear_failover:
1073 1
                success, failure = self.execute_clear_failover(clear_failover)
1074
1075 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1076
1077 1
                undeploy.extend(failure)
1078
1079
            # Undeploy the evc, schedule a redeploy
1080
1081 1
            if undeploy:
1082 1
                success, failure = self.execute_undeploy(undeploy)
1083
1084 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1085
1086 1
                if failure:
1087 1
                    log.error(f"Failed to handle_link_down for {failure}")
1088
1089
            # Push update to DB
1090
1091 1
            if evcs_to_update:
1092 1
                self.mongo_controller.update_evcs(
1093
                    [evc.as_dict() for evc in evcs_to_update.values()]
1094
                )
1095
1096 1
    @listen_to("kytos/mef_eline.need_redeploy")
1097 1
    def on_evc_need_redeploy(self, event):
1098
        """Redeploy evcs that need to be redeployed."""
1099
        self.handle_evc_need_redeploy(event)
1100
1101 1
    def handle_evc_need_redeploy(self, event):
1102
        """Redeploy evcs that need to be redeployed."""
1103
        evc = self.circuits.get(event.content["evc_id"])
1104
        if evc is None:
1105
            return
1106
        with evc.lock:
1107
            if not evc.is_enabled() or evc.is_active():
1108
                return
1109
            result = evc.deploy()
1110
        event_name = "error_redeploy_link_down"
1111
        if result:
1112
            log.info(f"{evc} redeployed")
1113
            event_name = "redeployed_link_down"
1114
        emit_event(self.controller, event_name,
1115
                   content=map_evc_event_content(evc))
1116
1117 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1118 1
    def on_evc_affected_by_link_down(self, event):
1119
        """Change circuit when link is down or under_mantenance."""
1120
        self.handle_evc_affected_by_link_down(event)
1121
1122 1
    def handle_evc_affected_by_link_down(self, event):
1123
        """Change circuit when link is down or under_mantenance."""
1124 1
        evc = self.circuits.get(event.content["evc_id"])
1125 1
        link = event.content['link']
1126 1
        if not evc:
1127 1
            return
1128 1
        with evc.lock:
1129 1
            if not evc.is_affected_by_link(link):
1130
                return
1131 1
            result = evc.handle_link_down()
1132 1
        event_name = "error_redeploy_link_down"
1133 1
        if result:
1134 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1135 1
            event_name = "redeployed_link_down"
1136 1
        emit_event(self.controller, event_name,
1137
                   content=map_evc_event_content(evc))
1138
1139 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1140 1
    def on_evc_deployed(self, event):
1141
        """Handle EVC deployed|redeployed_link_down."""
1142
        self.handle_evc_deployed(event)
1143
1144 1
    def handle_evc_deployed(self, event):
1145
        """Setup failover path on evc deployed."""
1146
        evc = self.circuits.get(event.content["evc_id"])
1147
        if not evc:
1148
            return
1149
        evc.try_setup_failover_path()
1150
1151 1
    @listen_to("kytos/topology.topology_loaded")
1152 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1153
        """Load EVCs once the topology is available."""
1154
        self.load_all_evcs()
1155
1156 1
    def load_all_evcs(self):
1157
        """Try to load all EVCs on startup."""
1158 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1159 1
        for circuit_id, circuit in circuits:
1160 1
            if circuit_id not in self.circuits:
1161 1
                self._load_evc(circuit)
1162 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1163
                   timeout=1)
1164
1165 1
    def _load_evc(self, circuit_dict):
1166
        """Load one EVC from mongodb to memory."""
1167 1
        try:
1168 1
            evc = self._evc_from_dict(circuit_dict)
1169 1
        except (ValueError, KytosTagError) as exception:
1170 1
            log.error(
1171
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1172
            )
1173 1
            return None
1174 1
        if evc.archived:
1175 1
            return None
1176
1177 1
        self.circuits.setdefault(evc.id, evc)
1178 1
        self.sched.add(evc)
1179 1
        return evc
1180
1181 1
    @listen_to("kytos/flow_manager.flow.error")
1182 1
    def on_flow_mod_error(self, event):
1183
        """Handle flow mod errors related to an EVC."""
1184
        self.handle_flow_mod_error(event)
1185
1186 1
    def handle_flow_mod_error(self, event):
1187
        """Handle flow mod errors related to an EVC."""
1188 1
        flow = event.content["flow"]
1189 1
        command = event.content.get("error_command")
1190 1
        if command != "add":
1191 1
            return
1192 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1193 1
        if not evc or evc.archived or not evc.is_enabled():
1194 1
            return
1195 1
        with evc.lock:
1196 1
            evc.remove_current_flows(sync=False)
1197 1
            evc.remove_failover_flows(sync=True)
1198
1199 1
    def _evc_dict_with_instances(self, evc_dict):
1200
        """Convert some dict values to instance of EVC classes.
1201
1202
        This method will convert: [UNI, Link]
1203
        """
1204 1
        data = evc_dict.copy()  # Do not modify the original dict
1205 1
        for attribute, value in data.items():
1206
            # Get multiple attributes.
1207
            # Ex: uni_a, uni_z
1208 1
            if "uni" in attribute:
1209 1
                try:
1210 1
                    data[attribute] = self._uni_from_dict(value)
1211 1
                except ValueError as exception:
1212 1
                    result = "Error creating UNI: Invalid value"
1213 1
                    raise ValueError(result) from exception
1214
1215 1
            if attribute == "circuit_scheduler":
1216 1
                data[attribute] = []
1217 1
                for schedule in value:
1218 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1219
1220
            # Get multiple attributes.
1221
            # Ex: primary_links,
1222
            #     backup_links,
1223
            #     current_links_cache,
1224
            #     primary_links_cache,
1225
            #     backup_links_cache
1226 1
            if "links" in attribute:
1227 1
                data[attribute] = [
1228
                    self._link_from_dict(link, attribute) for link in value
1229
                ]
1230
1231
            # Ex: current_path,
1232
            #     primary_path,
1233
            #     backup_path
1234 1
            if (attribute.endswith("path") and
1235
                    attribute != "dynamic_backup_path"):
1236 1
                data[attribute] = Path(
1237
                    [self._link_from_dict(link, attribute) for link in value]
1238
                )
1239
1240 1
        return data
1241
1242 1
    def _evc_from_dict(self, evc_dict):
1243 1
        data = self._evc_dict_with_instances(evc_dict)
1244 1
        data["table_group"] = self.table_group
1245 1
        return EVC(self.controller, **data)
1246
1247 1
    def _uni_from_dict(self, uni_dict):
1248
        """Return a UNI object from python dict."""
1249 1
        if uni_dict is None:
1250 1
            return False
1251
1252 1
        interface_id = uni_dict.get("interface_id")
1253 1
        interface = self.controller.get_interface_by_id(interface_id)
1254 1
        if interface is None:
1255 1
            result = (
1256
                "Error creating UNI:"
1257
                + f"Could not instantiate interface {interface_id}"
1258
            )
1259 1
            raise ValueError(result) from ValueError
1260 1
        tag_convert = {1: "vlan"}
1261 1
        tag_dict = uni_dict.get("tag", None)
1262 1
        if tag_dict:
1263 1
            tag_type = tag_dict.get("tag_type")
1264 1
            tag_type = tag_convert.get(tag_type, tag_type)
1265 1
            tag_value = tag_dict.get("value")
1266 1
            if isinstance(tag_value, list):
1267 1
                tag_value = get_tag_ranges(tag_value)
1268 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1269 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1270
            else:
1271 1
                tag = TAG(tag_type, tag_value)
1272
        else:
1273 1
            tag = None
1274 1
        uni = UNI(interface, tag)
1275 1
        return uni
1276
1277 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1278
        """Return a Link object from python dict."""
1279 1
        id_a = link_dict.get("endpoint_a").get("id")
1280 1
        id_b = link_dict.get("endpoint_b").get("id")
1281
1282 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1283 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1284 1
        if not endpoint_a:
1285 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1286 1
            raise ValueError(error_msg)
1287 1
        if not endpoint_b:
1288
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1289
            raise ValueError(error_msg)
1290
1291 1
        link = Link(endpoint_a, endpoint_b)
1292 1
        allowed_paths = {"current_path", "failover_path"}
1293 1
        if "metadata" in link_dict and attribute in allowed_paths:
1294 1
            link.extend_metadata(link_dict.get("metadata"))
1295
1296 1
        s_vlan = link.get_metadata("s_vlan")
1297 1
        if s_vlan:
1298 1
            tag = TAG.from_dict(s_vlan)
1299 1
            if tag is False:
1300
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1301
                raise ValueError(error_msg)
1302 1
            link.update_metadata("s_vlan", tag)
1303 1
        return link
1304
1305 1
    def _find_evc_by_schedule_id(self, schedule_id):
1306
        """
1307
        Find an EVC and CircuitSchedule based on schedule_id.
1308
1309
        :param schedule_id: Schedule ID
1310
        :return: EVC and Schedule
1311
        """
1312 1
        circuits = self._get_circuits_buffer()
1313 1
        found_schedule = None
1314 1
        evc = None
1315
1316
        # pylint: disable=unused-variable
1317 1
        for c_id, circuit in circuits.items():
1318 1
            for schedule in circuit.circuit_scheduler:
1319 1
                if schedule.id == schedule_id:
1320 1
                    found_schedule = schedule
1321 1
                    evc = circuit
1322 1
                    break
1323 1
            if found_schedule:
1324 1
                break
1325 1
        return evc, found_schedule
1326
1327 1
    def _get_circuits_buffer(self):
1328
        """
1329
        Return the circuit buffer.
1330
1331
        If the buffer is empty, try to load data from mongodb.
1332
        """
1333 1
        if not self.circuits:
1334
            # Load circuits from mongodb to buffer
1335 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1336 1
            for c_id, circuit in circuits.items():
1337 1
                evc = self._evc_from_dict(circuit)
1338 1
                self.circuits[c_id] = evc
1339 1
        return self.circuits
1340
1341
    # pylint: disable=attribute-defined-outside-init
1342 1
    @alisten_to("kytos/of_multi_table.enable_table")
1343 1
    async def on_table_enabled(self, event):
1344
        """Handle a recently table enabled."""
1345 1
        table_group = event.content.get("mef_eline", None)
1346 1
        if not table_group:
1347 1
            return
1348 1
        for group in table_group:
1349 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1350 1
                log.error(f'The table group "{group}" is not allowed for '
1351
                          f'mef_eline. Allowed table groups are '
1352
                          f'{settings.TABLE_GROUP_ALLOWED}')
1353 1
                return
1354 1
        self.table_group.update(table_group)
1355 1
        content = {"group_table": self.table_group}
1356 1
        name = "kytos/mef_eline.enable_table"
1357
        await aemit_event(self.controller, name, content)
1358