Test Failed
Pull Request — master (#690)
by
unknown
04:37
created

build.main.Main.handle_evc_deployed()   B

Complexity

Conditions 7

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 12
nop 2
dl 0
loc 14
ccs 9
cts 9
cp 1
crap 7
rs 8
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40
41
# pylint: disable=too-many-public-methods
42 1
class Main(KytosNApp):
43
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46
    """
47
48 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49
50 1
    def setup(self):
51
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57
        """
58
        # object used to scheduler circuit events
59 1
        self.sched = Scheduler()
60
61
        # object to save and load circuits
62 1
        self.mongo_controller = self.get_eline_controller()
63 1
        self.mongo_controller.bootstrap_indexes()
64
65
        # set the controller that will manager the dynamic paths
66 1
        DynamicPathManager.set_controller(self.controller)
67
68
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70 1
        self.circuits = dict[str, EVC]()
71
72 1
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75 1
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78
79 1
        self.load_all_evcs()
80 1
        self._topology_updated_at = None
81
82 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83
        """Get circuits sorted by desc service level and asc creation_time.
84
85
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87 1
        if enable_filter:
88 1
            return sorted(
89
                          [circuit for circuit in self.circuits.values()
90
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93 1
        return sorted(self.circuits.values(),
94
                      key=lambda x: (-x.service_level, x.creation_time))
95
96 1
    @staticmethod
97 1
    def get_eline_controller():
98
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100
101 1
    def execute(self):
102
        """Execute once when the napp is running."""
103 1
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106 1
        with self._lock:
107 1
            self.execute_consistency()
108 1
        log.debug("Finished consistency routine")
109
110 1
    def should_be_checked(self, circuit):
111
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113 1
        if (
114
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124 1
            return True
125
        return False
126
127 1
    def execute_consistency(self):
128
        """Execute consistency routine."""
129 1
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            if (
134 1
                circuit.is_active()
135 1
                and not circuit.failover_path
136 1
                and circuit.is_eligible_for_failover_path()
137 1
            ):
138 1
                emit_event(
139 1
                    self.controller,
140 1
                    "need_failover",
141 1
                    content=map_evc_event_content(circuit)
142 1
                )
143
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
144 1
        for circuit in circuits_to_check:
145 1
            is_checked = circuits_checked.get(circuit.id)
146 1
            if is_checked:
147 1
                circuit.execution_rounds = 0
148 1
                log.info(f"{circuit} enabled but inactive - activating")
149
                with circuit.lock:
150 1
                    circuit.activate()
151
                    circuit.sync()
152
            else:
153
                circuit.execution_rounds += 1
154
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
155
                    log.info(f"{circuit} enabled but inactive - redeploy")
156 1
                    with circuit.lock:
157 1
                        circuit.deploy()
158
159
    def shutdown(self):
160
        """Execute when your napp is unloaded.
161
162
        If you have some cleanup procedure, insert it here.
163 1
        """
164 1
165 1
    @rest("/v2/evc/", methods=["GET"])
166 1
    def list_circuits(self, request: Request) -> JSONResponse:
167 1
        """Endpoint to return circuits stored.
168
169 1
        archive query arg if defined (not null) will be filtered
170 1
        accordingly, by default only non archived evcs will be listed
171
        """
172 1
        log.debug("list_circuits /v2/evc")
173 1
        args = request.query_params
174
        archived = args.get("archived", "false").lower()
175
        args = {k: v for k, v in args.items() if k not in {"archived"}}
176
        circuits = self.mongo_controller.get_circuits(archived=archived,
177
                                                      metadata=args)
178
        circuits = circuits['circuits']
179
        return JSONResponse(circuits)
180
181 1
    @rest("/v2/evc/schedule", methods=["GET"])
182 1
    def list_schedules(self, _request: Request) -> JSONResponse:
183 1
        """Endpoint to return all schedules stored for all circuits.
184 1
185 1
        Return a JSON with the following template:
186 1
        [{"schedule_id": <schedule_id>,
187
         "circuit_id": <circuit_id>,
188 1
         "schedule": <schedule object>}]
189 1
        """
190 1
        log.debug("list_schedules /v2/evc/schedule")
191 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
192 1
        if not circuits:
193 1
            result = {}
194 1
            status = 200
195
            return JSONResponse(result, status_code=status)
196
197
        result = []
198
        status = 200
199 1
        for circuit in circuits:
200
            circuit_scheduler = circuit.get("circuit_scheduler")
201 1
            if circuit_scheduler:
202 1
                for scheduler in circuit_scheduler:
203
                    value = {
204 1
                        "schedule_id": scheduler.get("id"),
205 1
                        "circuit_id": circuit.get("id"),
206
                        "schedule": scheduler,
207 1
                    }
208 1
                    result.append(value)
209 1
210 1
        log.debug("list_schedules result %s %s", result, status)
211 1
        return JSONResponse(result, status_code=status)
212 1
213 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
214 1
    def get_circuit(self, request: Request) -> JSONResponse:
215 1
        """Endpoint to return a circuit based on id."""
216 1
        circuit_id = request.path_params["circuit_id"]
217
        log.debug("get_circuit /v2/evc/%s", circuit_id)
218
        circuit = self.mongo_controller.get_circuit(circuit_id)
219 1
        if not circuit:
220 1
            result = f"circuit_id {circuit_id} not found"
221 1
            log.debug("get_circuit result %s %s", result, 404)
222
            raise HTTPException(404, detail=result)
223
        status = 200
224
        log.debug("get_circuit result %s %s", circuit, status)
225
        return JSONResponse(circuit, status_code=status)
226
227
    # pylint: disable=too-many-branches, too-many-statements
228
    @rest("/v2/evc/", methods=["POST"])
229
    @validate_openapi(spec)
230
    def create_circuit(self, request: Request) -> JSONResponse:
231
        """Try to create a new circuit.
232
233
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
234
        UNI_Z's requested C-VID are available from the interfaces' pools. This
235
        is checked when creating the UNI object.
236
237
        Then, E-Line NApp requests a primary and a backup path to the
238
        Pathfinder NApp using the attributes primary_links and backup_links
239
        submitted via REST
240
241
        # For each link composing paths in #3:
242
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
243
        #  - Using the S-VID obtained, generate abstract flow entries to be
244
        #    sent to FlowManager
245
246 1
        Push abstract flow entries to FlowManager and FlowManager pushes
247 1
        OpenFlow entries to datapaths
248
249 1
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
250 1
        creation
251 1
252 1
        Finnaly, notify user of the status of its request.
253 1
        """
254 1
        # Try to create the circuit object
255
        log.debug("create_circuit /v2/evc/")
256
        data = get_json_or_400(request, self.controller.loop)
257
258
        try:
259
            evc = self._evc_from_dict(data)
260
        except (ValueError, KytosTagError) as exception:
261
            log.debug("create_circuit result %s %s", exception, 400)
262
            raise HTTPException(400, detail=str(exception)) from exception
263
        if evc.primary_path:
264
            try:
265
                evc.primary_path.is_valid(
266 1
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272
                    400,
273
                    detail=f"primary_path is not valid: {exception}"
274
                ) from exception
275
        if evc.backup_path:
276
            try:
277
                evc.backup_path.is_valid(
278
                    evc.uni_a.interface.switch,
279 1
                    evc.uni_z.interface.switch,
280
                    bool(evc.circuit_scheduler),
281
                )
282
            except InvalidPath as exception:
283 1
                raise HTTPException(
284 1
                    400,
285 1
                    detail=f"backup_path is not valid: {exception}"
286 1
                ) from exception
287
288 1
        if not evc._tag_lists_equal():
289 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
290
            raise HTTPException(400, detail=detail)
291
292
        try:
293
            evc._validate_has_primary_or_dynamic()
294 1
        except ValueError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296 1
297 1
        try:
298
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
299
        except DuplicatedNoTagUNI as exception:
300 1
            log.debug("create_circuit result %s %s", exception, 409)
301 1
            raise HTTPException(409, detail=str(exception)) from exception
302
303
        try:
304
            self._use_uni_tags(evc)
305
        except KytosTagError as exception:
306 1
            raise HTTPException(400, detail=str(exception)) from exception
307
308
        # save circuit
309 1
        try:
310
            evc.sync()
311
        except ValidationError as exception:
312 1
            raise HTTPException(400, detail=str(exception)) from exception
313 1
314 1
        # store circuit in dictionary
315 1
        self.circuits[evc.id] = evc
316
317
        # Schedule the circuit deploy
318 1
        self.sched.add(evc)
319 1
320 1
        # Circuit has no schedule, deploy now
321 1
        deployed = False
322
        if not evc.circuit_scheduler:
323 1
            with evc.lock:
324
                deployed = evc.deploy()
325 1
326 1
        # Notify users
327 1
        result = {"circuit_id": evc.id, "deployed": deployed}
328 1
        status = 201
329 1
        log.debug("create_circuit result %s %s", result, status)
330 1
        emit_event(self.controller, name="created",
331 1
                   content=map_evc_event_content(evc))
332 1
        return JSONResponse(result, status_code=status)
333 1
334 1
    @staticmethod
335
    def _use_uni_tags(evc):
336 1
        uni_a = evc.uni_a
337 1
        evc._use_uni_vlan(uni_a)
338
        try:
339
            uni_z = evc.uni_z
340
            evc._use_uni_vlan(uni_z)
341 1
        except KytosTagError as err:
342
            evc.make_uni_vlan_available(uni_a)
343 1
            raise err
344 1
345 1
    @listen_to('kytos/flow_manager.flow.removed')
346 1
    def on_flow_delete(self, event):
347 1
        """Capture delete messages to keep track when flows got removed."""
348
        self.handle_flow_delete(event)
349 1
350 1
    def handle_flow_delete(self, event):
351 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
352
        flow = event.content["flow"]
353
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
354
        if evc:
355
            log.debug("Flow removed in EVC %s", evc.id)
356
            evc.set_flow_removed_at()
357 1
358 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
359 1
    @validate_openapi(spec)
360 1
    def update(self, request: Request) -> JSONResponse:
361 1
        """Update a circuit based on payload.
362 1
363 1
        The EVC attributes (creation_time, active, current_path,
364 1
        failover_path, _id, archived) can't be updated.
365 1
        """
366
        data = get_json_or_400(request, self.controller.loop)
367 1
        circuit_id = request.path_params["circuit_id"]
368 1
        log.debug("update /v2/evc/%s", circuit_id)
369 1
        try:
370
            evc = self.circuits[circuit_id]
371
        except KeyError:
372
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("update result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375 1
376 1
        try:
377
            updated_data = self._evc_dict_with_instances(data)
378
            self._check_no_tag_duplication(
379
                circuit_id, updated_data.get("uni_a"),
380
                updated_data.get("uni_z")
381
            )
382
            enable, redeploy = evc.update(**updated_data)
383
        except (ValueError, KytosTagError, ValidationError) as exception:
384
            log.debug("update result %s %s", exception, 400)
385
            raise HTTPException(400, detail=str(exception)) from exception
386 1
        except DuplicatedNoTagUNI as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388
            raise HTTPException(409, detail=str(exception)) from exception
389
        except DisabledSwitch as exception:
390
            log.debug("update result %s %s", exception, 409)
391
            raise HTTPException(
392
                    409,
393
                    detail=f"Path is not valid: {exception}"
394
                ) from exception
395
        redeployed = False
396 1
        if evc.is_active():
397 1
            if enable is False:  # disable if active
398 1
                with evc.lock:
399 1
                    evc.remove()
400 1
            elif redeploy is not None:  # redeploy if active
401 1
                with evc.lock:
402 1
                    evc.remove()
403 1
                    redeployed = evc.deploy()
404 1
        else:
405
            if enable is True:  # enable if inactive
406 1
                with evc.lock:
407 1
                    redeployed = evc.deploy()
408
            elif evc.is_enabled() and redeploy:
409 1
                with evc.lock:
410
                    evc.remove()
411 1
                    redeployed = evc.deploy()
412 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
413
        status = 200
414
415
        log.debug("update result %s %s", result, status)
416
        emit_event(self.controller, "updated",
417
                   content=map_evc_event_content(evc, **data))
418 1
        return JSONResponse(result, status_code=status)
419 1
420 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
421 1
    def delete_circuit(self, request: Request) -> JSONResponse:
422 1
        """Remove a circuit.
423 1
424 1
        First, the flows are removed from the switches, and then the EVC is
425 1
        disabled.
426 1
        """
427
        circuit_id = request.path_params["circuit_id"]
428 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
429 1
        try:
430 1
            evc = self.circuits.pop(circuit_id)
431 1
        except KeyError:
432 1
            result = f"circuit_id {circuit_id} not found"
433 1
            log.debug("delete_circuit result %s %s", result, 404)
434 1
            raise HTTPException(404, detail=result) from KeyError
435 1
        log.info("Removing %s", evc)
436 1
437 1
        with evc.lock:
438 1
            if not evc.archived:
439
                evc.deactivate()
440
                evc.disable()
441
                self.sched.remove(evc)
442
                evc.remove_current_flows(sync=False)
443 1
                evc.remove_failover_flows(sync=False)
444 1
                evc.archive()
445 1
                evc.remove_uni_tags()
446 1
                evc.sync()
447
                emit_event(
448 1
                    self.controller, "deleted",
449
                    content=map_evc_event_content(evc)
450 1
                )
451 1
452
        log.info("EVC removed. %s", evc)
453 1
        result = {"response": f"Circuit {circuit_id} removed"}
454 1
        status = 200
455 1
        log.debug("delete_circuit result %s %s", result, status)
456
457
        return JSONResponse(result, status_code=status)
458
459
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
460
    def get_metadata(self, request: Request) -> JSONResponse:
461
        """Get metadata from an EVC."""
462
        circuit_id = request.path_params["circuit_id"]
463
        try:
464 1
            return (
465 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
466 1
            )
467
        except KeyError as error:
468 1
            raise HTTPException(
469 1
                404,
470
                detail=f"circuit_id {circuit_id} not found."
471 1
            ) from error
472
473 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474 1
    @validate_openapi(spec)
475 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
476 1
        """Add metadata to a bulk of EVCs."""
477 1
        data = get_json_or_400(request, self.controller.loop)
478 1
        circuit_ids = data.pop("circuit_ids")
479 1
480
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
481 1
482 1
        fail_evcs = []
483 1
        for _id in circuit_ids:
484
            try:
485 1
                evc = self.circuits[_id]
486 1
                evc.extend_metadata(data)
487 1
            except KeyError:
488
                fail_evcs.append(_id)
489 1
490 1
        if fail_evcs:
491 1
            raise HTTPException(404, detail=fail_evcs)
492
        return JSONResponse("Operation successful", status_code=201)
493 1
494 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
495 1
    @validate_openapi(spec)
496 1
    def add_metadata(self, request: Request) -> JSONResponse:
497
        """Add metadata to an EVC."""
498
        circuit_id = request.path_params["circuit_id"]
499
        metadata = get_json_or_400(request, self.controller.loop)
500
        if not isinstance(metadata, dict):
501 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
502 1
        try:
503 1
            evc = self.circuits[circuit_id]
504
        except KeyError as error:
505 1
            raise HTTPException(
506 1
                404,
507 1
                detail=f"circuit_id {circuit_id} not found."
508
            ) from error
509 1
510 1
        evc.extend_metadata(metadata)
511 1
        evc.sync()
512 1
        return JSONResponse("Operation successful", status_code=201)
513
514 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
515
    @validate_openapi(spec)
516 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
517 1
        """Delete metada from a bulk of EVCs"""
518 1
        data = get_json_or_400(request, self.controller.loop)
519 1
        key = request.path_params["key"]
520 1
        circuit_ids = data.pop("circuit_ids")
521 1
        self.mongo_controller.update_evcs_metadata(
522 1
            circuit_ids, {key: ""}, "del"
523
        )
524 1
525 1
        fail_evcs = []
526 1
        for _id in circuit_ids:
527
            try:
528 1
                evc = self.circuits[_id]
529 1
                evc.remove_metadata(key)
530
            except KeyError:
531 1
                fail_evcs.append(_id)
532 1
533 1
        if fail_evcs:
534 1
            raise HTTPException(404, detail=fail_evcs)
535 1
        return JSONResponse("Operation successful")
536 1
537
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
538
    def delete_metadata(self, request: Request) -> JSONResponse:
539
        """Delete metadata from an EVC."""
540
        circuit_id = request.path_params["circuit_id"]
541 1
        key = request.path_params["key"]
542 1
        try:
543 1
            evc = self.circuits[circuit_id]
544
        except KeyError as error:
545 1
            raise HTTPException(
546 1
                404,
547
                detail=f"circuit_id {circuit_id} not found."
548 1
            ) from error
549 1
550
        evc.remove_metadata(key)
551
        evc.sync()
552 1
        return JSONResponse("Operation successful")
553 1
554
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
555
    def redeploy(self, request: Request) -> JSONResponse:
556 1
        """Endpoint to force the redeployment of an EVC."""
557 1
        circuit_id = request.path_params["circuit_id"]
558 1
        try_avoid_same_s_vlan = request.query_params.get(
559 1
            "try_avoid_same_s_vlan", "true"
560 1
        )
561
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
562
        if try_avoid_same_s_vlan not in {"true", "false"}:
563
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
564 1
            raise HTTPException(400, detail=msg)
565 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
566 1
        try:
567 1
            evc = self.circuits[circuit_id]
568
        except KeyError:
569
            raise HTTPException(
570
                404,
571 1
                detail=f"circuit_id {circuit_id} not found"
572 1
            ) from KeyError
573 1
        deployed = False
574 1
        with evc.lock:
575 1
            if evc.is_enabled():
576
                path_dict = evc.remove_current_flows(
577 1
                    sync=False,
578
                    return_path=try_avoid_same_s_vlan == "true"
579
                )
580 1
                evc.remove_failover_flows(sync=True)
581
                deployed = evc.deploy(path_dict)
582 1
        if deployed:
583
            result = {"response": f"Circuit {circuit_id} redeploy received."}
584 1
            status = 202
585 1
        else:
586 1
            result = {
587
                "response": f"Circuit {circuit_id} is disabled."
588
            }
589
            status = 409
590
591
        return JSONResponse(result, status_code=status)
592
593
    @rest("/v2/evc/schedule/", methods=["POST"])
594
    @validate_openapi(spec)
595
    def create_schedule(self, request: Request) -> JSONResponse:
596
        """
597
        Create a new schedule for a given circuit.
598
599
        This service do no check if there are conflicts with another schedule.
600
        Payload example:
601
            {
602 1
              "circuit_id":"aa:bb:cc",
603 1
              "schedule": {
604 1
                "date": "2019-08-07T14:52:10.967Z",
605 1
                "interval": "string",
606
                "frequency": "1 * * * *",
607
                "action": "create"
608 1
              }
609
            }
610
        """
611 1
        log.debug("create_schedule /v2/evc/schedule/")
612
        data = get_json_or_400(request, self.controller.loop)
613
        circuit_id = data["circuit_id"]
614 1
        schedule_data = data["schedule"]
615 1
616 1
        # Get EVC from circuits buffer
617 1
        circuits = self._get_circuits_buffer()
618
619
        # get the circuit
620 1
        evc = circuits.get(circuit_id)
621
622
        # get the circuit
623 1
        if not evc:
624 1
            result = f"circuit_id {circuit_id} not found"
625
            log.debug("create_schedule result %s %s", result, 404)
626
            raise HTTPException(404, detail=result)
627 1
628
        # new schedule from dict
629
        new_schedule = CircuitSchedule.from_dict(schedule_data)
630 1
631
        # If there is no schedule, create the list
632
        if not evc.circuit_scheduler:
633 1
            evc.circuit_scheduler = []
634
635 1
        # Add the new schedule
636 1
        evc.circuit_scheduler.append(new_schedule)
637
638 1
        # Add schedule job
639 1
        self.sched.add_circuit_job(evc, new_schedule)
640
641 1
        # save circuit to mongodb
642 1
        evc.sync()
643 1
644
        result = new_schedule.as_dict()
645
        status = 201
646
647
        log.debug("create_schedule result %s %s", result, status)
648
        return JSONResponse(result, status_code=status)
649
650
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
651
    @validate_openapi(spec)
652
    def update_schedule(self, request: Request) -> JSONResponse:
653
        """Update a schedule.
654
655
        Change all attributes from the given schedule from a EVC circuit.
656 1
        The schedule ID is preserved as default.
657 1
        Payload example:
658 1
            {
659
              "date": "2019-08-07T14:52:10.967Z",
660
              "interval": "string",
661 1
              "frequency": "1 * * *",
662
              "action": "create"
663
            }
664 1
        """
665 1
        data = get_json_or_400(request, self.controller.loop)
666 1
        schedule_id = request.path_params["schedule_id"]
667 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
668
669 1
        # Try to find a circuit schedule
670 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
671
672 1
        # Can not modify circuits deleted and archived
673
        if not found_schedule:
674 1
            result = f"schedule_id {schedule_id} not found"
675
            log.debug("update_schedule result %s %s", result, 404)
676
            raise HTTPException(404, detail=result)
677 1
678
        new_schedule = CircuitSchedule.from_dict(data)
679 1
        new_schedule.id = found_schedule.id
680
        # Remove the old schedule
681 1
        evc.circuit_scheduler.remove(found_schedule)
682
        # Append the modified schedule
683 1
        evc.circuit_scheduler.append(new_schedule)
684 1
685
        # Cancel all schedule jobs
686 1
        self.sched.cancel_job(found_schedule.id)
687 1
        # Add the new circuit schedule
688
        self.sched.add_circuit_job(evc, new_schedule)
689 1
        # Save EVC to mongodb
690 1
        evc.sync()
691
692
        result = new_schedule.as_dict()
693
        status = 200
694
695
        log.debug("update_schedule result %s %s", result, status)
696
        return JSONResponse(result, status_code=status)
697 1
698 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
699 1
    def delete_schedule(self, request: Request) -> JSONResponse:
700
        """Remove a circuit schedule.
701
702 1
        Remove the Schedule from EVC.
703 1
        Remove the Schedule from cron job.
704 1
        Save the EVC to the Storehouse.
705 1
        """
706
        schedule_id = request.path_params["schedule_id"]
707
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
708 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
709
710
        # Can not modify circuits deleted and archived
711 1
        if not found_schedule:
712
            result = f"schedule_id {schedule_id} not found"
713 1
            log.debug("delete_schedule result %s %s", result, 404)
714
            raise HTTPException(404, detail=result)
715 1
716 1
        # Remove the old schedule
717
        evc.circuit_scheduler.remove(found_schedule)
718 1
719 1
        # Cancel all schedule jobs
720
        self.sched.cancel_job(found_schedule.id)
721 1
        # Save EVC to mongodb
722
        evc.sync()
723
724
        result = "Schedule removed"
725
        status = 200
726
727
        log.debug("delete_schedule result %s %s", result, status)
728
        return JSONResponse(result, status_code=status)
729
730
    def _check_no_tag_duplication(
731
        self,
732
        evc_id: str,
733
        uni_a: Optional[UNI] = None,
734 1
        uni_z: Optional[UNI] = None
735 1
    ):
736
        """Check if the given EVC has UNIs with no tag and if these are
737 1
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
738
        Args:
739 1
            evc (dict): EVC to be analyzed.
740 1
        """
741 1
742 1
        # No UNIs
743 1
        if not (uni_a or uni_z):
744 1
            return
745 1
746
        if (not (uni_a and not uni_a.user_tag) and
747 1
                not (uni_z and not uni_z.user_tag)):
748 1
            return
749
        for circuit in self.circuits.copy().values():
750
            if (not circuit.archived and circuit._id != evc_id):
751
                if uni_a and uni_a.user_tag is None:
752 1
                    circuit.check_no_tag_duplicate(uni_a)
753
                if uni_z and uni_z.user_tag is None:
754 1
                    circuit.check_no_tag_duplicate(uni_z)
755 1
756 1
    @listen_to("kytos/topology.link_up")
757 1
    def on_link_up(self, event):
758 1
        """Change circuit when link is up or end_maintenance."""
759
        self.handle_link_up(event)
760
761 1
    def handle_link_up(self, event):
762
        """Change circuit when link is up or end_maintenance."""
763
        log.info("Event handle_link_up %s", event.content["link"])
764 1
        for evc in self.get_evcs_by_svc_level():
765
            if evc.is_enabled() and not evc.archived:
766
                with evc.lock:
767
                    evc.handle_link_up(event.content["link"])
768
769
    # Possibly replace this with interruptions?
770 1
    @listen_to(
771
        '.*.switch.interface.(link_up|link_down|created|deleted)'
772
    )
773
    def on_interface_link_change(self, event: KytosEvent):
774
        """
775
        Handler for interface link_up and link_down events.
776
        """
777
        self.handle_on_interface_link_change(event)
778
779
    def handle_on_interface_link_change(self, event: KytosEvent):
780 1
        """
781 1
        Handler to sort interface events {link_(up, down), create, deleted}
782 1
783
        To avoid multiple database updated (link flap):
784 1
        Every interface is identfied and processed in parallel.
785
        Once an interface event is received a time is started.
786
        While time is running self._intf_events will be updated.
787
        After time has passed last received event will be processed.
788 1
        """
789 1
        iface = event.content.get("interface")
790 1
        with self._lock_interfaces[iface.id]:
791 1
            _now = event.timestamp
792 1
            # Return out of order events
793 1
            if (
794 1
                iface.id in self._intf_events
795 1
                and self._intf_events[iface.id]["event"].timestamp > _now
796 1
            ):
797 1
                return
798 1
            self._intf_events[iface.id].update({"event": event})
799 1
            if "last_acquired" in self._intf_events[iface.id]:
800 1
                return
801 1
            self._intf_events[iface.id].update({"last_acquired": now()})
802
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
803 1
        with self._lock_interfaces[iface.id]:
804
            event = self._intf_events[iface.id]["event"]
805
            self._intf_events[iface.id].pop('last_acquired', None)
806
            _, _, event_type = event.name.rpartition('.')
807
            if event_type in ('link_up', 'created'):
808
                self.handle_interface_link_up(iface)
809
            elif event_type in ('link_down', 'deleted'):
810
                self.handle_interface_link_down(iface)
811
812
    def handle_interface_link_up(self, interface):
813
        """
814
        Handler for interface link_up events
815 1
        """
816
        log.info("Event handle_interface_link_up %s", interface)
817
        for evc in self.get_evcs_by_svc_level():
818
            if _does_uni_affect_evc(evc, interface, "up"):
819
                with evc.lock:
820
                    evc.handle_interface_link_up(
821
                        interface
822
                    )
823
824
    def handle_interface_link_down(self, interface):
825
        """
826
        Handler for interface link_down events
827 1
        """
828 1
        log.info("Event handle_interface_link_down %s", interface)
829
        for evc in self.get_evcs_by_svc_level():
830
            if _does_uni_affect_evc(evc, interface, "down"):
831
                with evc.lock:
832 1
                    evc.handle_interface_link_down(
833
                        interface
834
                    )
835
836
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
837
    def on_link_down(self, event):
838
        """Change circuit when link is down or under_mantenance."""
839
        self.handle_link_down(event)
840
841
    def prepare_swap_to_failover_flow(self, evc: EVC):
842
        """Prepare an evc for switching to failover."""
843
        install_flows = {}
844
        try:
845
            install_flows = evc.get_failover_flows()
846 1
        # pylint: disable=broad-except
847
        except Exception:
848
            err = traceback.format_exc()
849
            log.error(
850
                "Ignore Failover path for "
851
                f"{evc} due to error: {err}"
852
            )
853 1
        return install_flows
854
855
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
856
        """Prepare event contents for swap to failover."""
857
        return map_evc_event_content(
858 1
            evc,
859 1
            flows=deepcopy(install_flow)
860 1
        )
861 1
862
    def execute_swap_to_failover(
863 1
        self,
864 1
        evcs: list[EVC]
865 1
    ) -> tuple[list[EVC], list[EVC]]:
866 1
        """Process changes needed to commit a swap to failover."""
867 1
        event_contents = {}
868
        install_flows = {}
869 1
        swapped_evcs = list[EVC]()
870
        not_swapped_evcs = list[EVC]()
871 1
872
        for evc in evcs:
873 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
874 1
            if install_flow:
875
                install_flows = merge_flow_dicts(install_flows, install_flow)
876
                event_contents[evc.id] =\
877
                    self.prepare_swap_to_failover_event(evc, install_flow)
878 1
                swapped_evcs.append(evc)
879 1
            else:
880 1
                not_swapped_evcs.append(evc)
881 1
882 1
        try:
883
            send_flow_mods_http(
884
                install_flows,
885
                "install"
886 1
            )
887 1
            for evc in swapped_evcs:
888 1
                temp_path = evc.current_path
889 1
                evc.current_path = evc.failover_path
890
                evc.failover_path = temp_path
891 1
            emit_event(
892
                self.controller, "failover_link_down",
893
                content=deepcopy(event_contents)
894
            )
895
            return swapped_evcs, not_swapped_evcs
896
        except FlowModException as exc:
897
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
898
            return [], [*swapped_evcs, *not_swapped_evcs]
899
900
    def prepare_clear_failover_flow(self, evc: EVC):
901
        """Prepare an evc for clearing the old path."""
902
        del_flows = {}
903
        try:
904
            del_flows = prepare_delete_flow(
905
                merge_flow_dicts(
906
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
907 1
                    evc._prepare_nni_flows(evc.failover_path)
908
                )
909
            )
910
        # pylint: disable=broad-except
911
        except Exception:
912
            err = traceback.format_exc()
913
            log.error(f"Fail to remove {evc} old_path: {err}")
914
        return del_flows
915 1
916
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
917
        """Prepare event contents for clearing failover."""
918
        return map_evc_event_content(
919
            evc,
920 1
            current_path=evc.current_path.as_dict(),
921 1
            removed_flows=deepcopy(delete_flow)
922 1
        )
923 1
924
    def execute_clear_failover(
925 1
        self,
926 1
        evcs: list[EVC]
927 1
    ) -> tuple[list[EVC], list[EVC]]:
928 1
        """Process changes needed to commit clearing the failover path"""
929 1
        event_contents = {}
930
        delete_flows = {}
931 1
        cleared_evcs = list[EVC]()
932
        not_cleared_evcs = list[EVC]()
933 1
934
        for evc in evcs:
935 1
            delete_flow = self.prepare_clear_failover_flow(evc)
936 1
            if delete_flow:
937
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
938
                event_contents[evc.id] =\
939
                    self.prepare_clear_failover_event(evc, delete_flow)
940 1
                cleared_evcs.append(evc)
941 1
            else:
942 1
                not_cleared_evcs.append(evc)
943 1
944
        try:
945
            send_flow_mods_http(
946
                delete_flows,
947
                'delete'
948 1
            )
949 1
            for evc in cleared_evcs:
950 1
                evc.failover_path.make_vlans_available(self.controller)
951 1
                evc.failover_path = Path([])
952
            emit_event(
953 1
                self.controller,
954
                "failover_old_path",
955
                content=event_contents
956
            )
957
            return cleared_evcs, not_cleared_evcs
958
        except FlowModException as exc:
959
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
960
            return [], [*cleared_evcs, *not_cleared_evcs]
961
962
    def prepare_undeploy_flow(self, evc: EVC):
963
        """Prepare an evc for undeploying."""
964
        del_flows = {}
965
        try:
966
            del_flows = prepare_delete_flow(
967
                merge_flow_dicts(
968
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
969
                    evc._prepare_nni_flows(evc.current_path),
970 1
                    evc._prepare_nni_flows(evc.failover_path)
971
                )
972 1
            )
973 1
        # pylint: disable=broad-except
974 1
        except Exception:
975
            err = traceback.format_exc()
976 1
            log.error(f"Fail to undeploy {evc}: {err}")
977 1
        return del_flows
978 1
979 1
    def execute_undeploy(self, evcs: list[EVC]):
980 1
        """Process changes needed to commit an undeploy"""
981
        delete_flows = {}
982 1
        undeploy_evcs = list[EVC]()
983
        not_undeploy_evcs = list[EVC]()
984 1
985 1
        for evc in evcs:
986
            delete_flow = self.prepare_undeploy_flow(evc)
987
            if delete_flow:
988
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
989
                undeploy_evcs.append(evc)
990 1
            else:
991 1
                not_undeploy_evcs.append(evc)
992 1
993 1
        try:
994 1
            send_flow_mods_http(
995 1
                delete_flows,
996 1
                'delete'
997
            )
998
999
            for evc in undeploy_evcs:
1000
                evc.current_path.make_vlans_available(self.controller)
1001 1
                evc.failover_path.make_vlans_available(self.controller)
1002 1
                evc.current_path = Path([])
1003 1
                evc.failover_path = Path([])
1004 1
                evc.deactivate()
1005
                emit_event(
1006
                    self.controller,
1007 1
                    "need_redeploy",
1008
                    content={"evc_id": evc.id}
1009 1
                )
1010
                log.info(f"{evc} scheduled for redeploy")
1011 1
            return undeploy_evcs, not_undeploy_evcs
1012 1
        except FlowModException as exc:
1013
            log.error(
1014 1
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1015 1
            )
1016 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1017 1
1018 1
    def handle_link_down(self, event):
1019 1
        """Change circuit when link is down or under_mantenance."""
1020
        link = event.content["link"]
1021 1
        log.info("Event handle_link_down %s", link)
1022 1
1023 1
        with ExitStack() as exit_stack:
1024 1
            exit_stack.enter_context(self.multi_evc_lock)
1025
            swap_to_failover = list[EVC]()
1026
            undeploy = list[EVC]()
1027
            clear_failover = list[EVC]()
1028
            evcs_to_update = dict[str, EVC]()
1029 1
1030 1
            for evc in self.get_evcs_by_svc_level():
1031
                with ExitStack() as sub_stack:
1032
                    sub_stack.enter_context(evc.lock)
1033
                    if all((
1034
                        evc.is_affected_by_link(link),
1035 1
                        evc.failover_path,
1036 1
                        not evc.is_failover_path_affected_by_link(link)
1037
                    )):
1038
                        swap_to_failover.append(evc)
1039
                    elif all((
1040
                        evc.is_affected_by_link(link),
1041 1
                        not evc.failover_path or
1042
                        evc.is_failover_path_affected_by_link(link)
1043
                    )):
1044
                        undeploy.append(evc)
1045 1
                    elif all((
1046
                        not evc.is_affected_by_link(link),
1047
                        evc.failover_path,
1048
                        evc.is_failover_path_affected_by_link(link)
1049 1
                    )):
1050 1
                        clear_failover.append(evc)
1051
                    else:
1052
                        continue
1053
1054 1
                    exit_stack.push(sub_stack.pop_all())
1055
1056 1
            # Swap from current path to failover path
1057
1058 1
            if swap_to_failover:
1059
                success, failure = self.execute_swap_to_failover(
1060
                    swap_to_failover
1061
                )
1062 1
1063 1
                clear_failover.extend(success)
1064
1065 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1066
1067 1
                undeploy.extend(failure)
1068
1069
            # Clear out failover path
1070
1071 1
            if clear_failover:
1072 1
                success, failure = self.execute_clear_failover(clear_failover)
1073
1074 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1075
1076 1
                undeploy.extend(failure)
1077 1
1078
            # Undeploy the evc, schedule a redeploy
1079
1080
            if undeploy:
1081 1
                success, failure = self.execute_undeploy(undeploy)
1082 1
1083
                evcs_to_update.update((evc.id, evc) for evc in success)
1084
1085
                if failure:
1086 1
                    log.error(f"Failed to handle_link_down for {failure}")
1087 1
1088
            # Push update to DB
1089
1090
            if evcs_to_update:
1091 1
                self.mongo_controller.update_evcs(
1092
                    [evc.as_dict() for evc in evcs_to_update.values()]
1093
                )
1094
1095
    @listen_to("kytos/mef_eline.need_redeploy")
1096
    def on_evc_need_redeploy(self, event):
1097
        """Redeploy evcs that need to be redeployed."""
1098
        self.handle_evc_need_redeploy(event)
1099
1100
    def handle_evc_need_redeploy(self, event):
1101
        """Redeploy evcs that need to be redeployed."""
1102
        evc = self.circuits.get(event.content["evc_id"])
1103
        if evc is None:
1104
            return
1105
        with evc.lock:
1106
            if not evc.is_enabled() or evc.is_active():
1107 1
                return
1108 1
            result = evc.deploy()
1109
        event_name = "error_redeploy_link_down"
1110
        if result:
1111
            log.info(f"{evc} redeployed")
1112 1
            event_name = "redeployed_link_down"
1113
        emit_event(self.controller, event_name,
1114 1
                   content=map_evc_event_content(evc))
1115 1
1116 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1117 1
    def on_evc_affected_by_link_down(self, event):
1118 1
        """Change circuit when link is down or under_mantenance."""
1119 1
        self.handle_evc_affected_by_link_down(event)
1120
1121 1
    def handle_evc_affected_by_link_down(self, event):
1122 1
        """Change circuit when link is down or under_mantenance."""
1123 1
        evc = self.circuits.get(event.content["evc_id"])
1124 1
        link = event.content['link']
1125 1
        if not evc:
1126 1
            return
1127
        with evc.lock:
1128
            if not evc.is_affected_by_link(link):
1129 1
                return
1130 1
            result = evc.handle_link_down()
1131
        event_name = "error_redeploy_link_down"
1132
        if result:
1133
            log.info(f"{evc} redeployed due to link down {link.id}")
1134 1
            event_name = "redeployed_link_down"
1135
        emit_event(self.controller, event_name,
1136
                   content=map_evc_event_content(evc))
1137
1138
    @listen_to(
1139
        "kytos/mef_eline.(redeployed_link_(up|down)|deployed|need_failover)"
1140
    )
1141 1
    def on_evc_deployed(self, event):
1142 1
        """Handle EVC deployed|redeployed_link_down."""
1143
        self.handle_evc_deployed(event)
1144
1145
    def handle_evc_deployed(self, event):
1146 1
        """Setup failover path on evc deployed."""
1147
        evc = self.circuits.get(event.content["evc_id"])
1148 1
        if evc is None:
1149 1
            return
1150 1
        with evc.lock:
1151 1
            if (
1152 1
                not evc.is_eligible_for_failover_path()
1153
                or not evc.is_active()
1154
                or evc.failover_path
1155 1
                or not evc.current_path
1156
            ):
1157 1
                return
1158 1
            evc.setup_failover_path()
1159 1
1160 1
    @listen_to("kytos/topology.topology_loaded")
1161
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1162
        """Load EVCs once the topology is available."""
1163 1
        self.load_all_evcs()
1164 1
1165 1
    def load_all_evcs(self):
1166
        """Try to load all EVCs on startup."""
1167 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1168 1
        for circuit_id, circuit in circuits:
1169 1
            if circuit_id not in self.circuits:
1170
                self._load_evc(circuit)
1171 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1172 1
                   timeout=1)
1173
1174
    def _load_evc(self, circuit_dict):
1175
        """Load one EVC from mongodb to memory."""
1176 1
        try:
1177
            evc = self._evc_from_dict(circuit_dict)
1178 1
        except (ValueError, KytosTagError) as exception:
1179 1
            log.error(
1180 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1181 1
            )
1182 1
            return None
1183 1
        if evc.archived:
1184 1
            return None
1185 1
1186 1
        self.circuits.setdefault(evc.id, evc)
1187 1
        self.sched.add(evc)
1188
        return evc
1189 1
1190
    @listen_to("kytos/flow_manager.flow.error")
1191
    def on_flow_mod_error(self, event):
1192
        """Handle flow mod errors related to an EVC."""
1193
        self.handle_flow_mod_error(event)
1194 1
1195 1
    def handle_flow_mod_error(self, event):
1196
        """Handle flow mod errors related to an EVC."""
1197
        flow = event.content["flow"]
1198 1
        command = event.content.get("error_command")
1199 1
        if command != "add":
1200 1
            return
1201 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1202 1
        if not evc or evc.archived or not evc.is_enabled():
1203 1
            return
1204
        with evc.lock:
1205 1
            evc.remove_current_flows(sync=False)
1206 1
            evc.remove_failover_flows(sync=True)
1207 1
1208 1
    def _evc_dict_with_instances(self, evc_dict):
1209
        """Convert some dict values to instance of EVC classes.
1210
1211
        This method will convert: [UNI, Link]
1212
        """
1213
        data = evc_dict.copy()  # Do not modify the original dict
1214
        for attribute, value in data.items():
1215
            # Get multiple attributes.
1216 1
            # Ex: uni_a, uni_z
1217 1
            if "uni" in attribute:
1218
                try:
1219
                    data[attribute] = self._uni_from_dict(value)
1220
                except ValueError as exception:
1221
                    result = "Error creating UNI: Invalid value"
1222
                    raise ValueError(result) from exception
1223
1224 1
            if attribute == "circuit_scheduler":
1225
                data[attribute] = []
1226 1
                for schedule in value:
1227
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1228
1229
            # Get multiple attributes.
1230 1
            # Ex: primary_links,
1231
            #     backup_links,
1232 1
            #     current_links_cache,
1233 1
            #     primary_links_cache,
1234 1
            #     backup_links_cache
1235 1
            if "links" in attribute:
1236
                data[attribute] = [
1237 1
                    self._link_from_dict(link, attribute) for link in value
1238
                ]
1239 1
1240 1
            # Ex: current_path,
1241
            #     primary_path,
1242 1
            #     backup_path
1243 1
            if (attribute.endswith("path") and
1244 1
                    attribute != "dynamic_backup_path"):
1245 1
                data[attribute] = Path(
1246
                    [self._link_from_dict(link, attribute) for link in value]
1247
                )
1248
1249 1
        return data
1250 1
1251 1
    def _evc_from_dict(self, evc_dict):
1252 1
        data = self._evc_dict_with_instances(evc_dict)
1253 1
        data["table_group"] = self.table_group
1254 1
        return EVC(self.controller, **data)
1255 1
1256 1
    def _uni_from_dict(self, uni_dict):
1257 1
        """Return a UNI object from python dict."""
1258 1
        if uni_dict is None:
1259 1
            return False
1260
1261 1
        interface_id = uni_dict.get("interface_id")
1262
        interface = self.controller.get_interface_by_id(interface_id)
1263 1
        if interface is None:
1264 1
            result = (
1265 1
                "Error creating UNI:"
1266
                + f"Could not instantiate interface {interface_id}"
1267 1
            )
1268
            raise ValueError(result) from ValueError
1269 1
        tag_convert = {1: "vlan"}
1270 1
        tag_dict = uni_dict.get("tag", None)
1271
        if tag_dict:
1272 1
            tag_type = tag_dict.get("tag_type")
1273 1
            tag_type = tag_convert.get(tag_type, tag_type)
1274 1
            tag_value = tag_dict.get("value")
1275 1
            if isinstance(tag_value, list):
1276 1
                tag_value = get_tag_ranges(tag_value)
1277 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1278
                tag = TAGRange(tag_type, tag_value, mask_list)
1279
            else:
1280
                tag = TAG(tag_type, tag_value)
1281 1
        else:
1282 1
            tag = None
1283 1
        uni = UNI(interface, tag)
1284 1
        return uni
1285
1286 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1287 1
        """Return a Link object from python dict."""
1288 1
        id_a = link_dict.get("endpoint_a").get("id")
1289 1
        id_b = link_dict.get("endpoint_b").get("id")
1290
1291
        endpoint_a = self.controller.get_interface_by_id(id_a)
1292 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1293 1
        if not endpoint_a:
1294
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1295 1
            raise ValueError(error_msg)
1296
        if not endpoint_b:
1297
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1298
            raise ValueError(error_msg)
1299
1300
        link = Link(endpoint_a, endpoint_b)
1301
        allowed_paths = {"current_path", "failover_path"}
1302 1
        if "metadata" in link_dict and attribute in allowed_paths:
1303 1
            link.extend_metadata(link_dict.get("metadata"))
1304 1
1305
        s_vlan = link.get_metadata("s_vlan")
1306
        if s_vlan:
1307 1
            tag = TAG.from_dict(s_vlan)
1308 1
            if tag is False:
1309 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1310 1
                raise ValueError(error_msg)
1311 1
            link.update_metadata("s_vlan", tag)
1312 1
        return link
1313 1
1314 1
    def _find_evc_by_schedule_id(self, schedule_id):
1315 1
        """
1316
        Find an EVC and CircuitSchedule based on schedule_id.
1317 1
1318
        :param schedule_id: Schedule ID
1319
        :return: EVC and Schedule
1320
        """
1321
        circuits = self._get_circuits_buffer()
1322
        found_schedule = None
1323 1
        evc = None
1324
1325 1
        # pylint: disable=unused-variable
1326 1
        for c_id, circuit in circuits.items():
1327 1
            for schedule in circuit.circuit_scheduler:
1328 1
                if schedule.id == schedule_id:
1329 1
                    found_schedule = schedule
1330
                    evc = circuit
1331
                    break
1332 1
            if found_schedule:
1333 1
                break
1334
        return evc, found_schedule
1335 1
1336 1
    def _get_circuits_buffer(self):
1337 1
        """
1338 1
        Return the circuit buffer.
1339 1
1340 1
        If the buffer is empty, try to load data from mongodb.
1341
        """
1342
        if not self.circuits:
1343 1
            # Load circuits from mongodb to buffer
1344 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1345 1
            for c_id, circuit in circuits.items():
1346 1
                evc = self._evc_from_dict(circuit)
1347 1
                self.circuits[c_id] = evc
1348
        return self.circuits
1349
1350
    # pylint: disable=attribute-defined-outside-init
1351
    @alisten_to("kytos/of_multi_table.enable_table")
1352
    async def on_table_enabled(self, event):
1353
        """Handle a recently table enabled."""
1354
        table_group = event.content.get("mef_eline", None)
1355
        if not table_group:
1356
            return
1357
        for group in table_group:
1358
            if group not in settings.TABLE_GROUP_ALLOWED:
1359
                log.error(f'The table group "{group}" is not allowed for '
1360
                          f'mef_eline. Allowed table groups are '
1361
                          f'{settings.TABLE_GROUP_ALLOWED}')
1362
                return
1363
        self.table_group.update(table_group)
1364
        content = {"group_table": self.table_group}
1365
        name = "kytos/mef_eline.enable_table"
1366
        await aemit_event(self.controller, name, content)
1367