Test Failed
Pull Request — master (#690)
by
unknown
05:06
created

build.main.Main.handle_evc_deployed()   B

Complexity

Conditions 7

Size

Total Lines 14
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
eloc 12
nop 2
dl 0
loc 14
ccs 9
cts 9
cp 1
crap 7
rs 8
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         check_disabled_component, emit_event,
36
                                         get_vlan_tags_and_masks,
37
                                         map_evc_event_content,
38
                                         merge_flow_dicts, prepare_delete_flow,
39
                                         send_flow_mods_http)
40
41
42
# pylint: disable=too-many-public-methods
43 1
class Main(KytosNApp):
44
    """Main class of amlight/mef_eline NApp.
45
46
    This class is the entry point for this napp.
47
    """
48
49 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
50
51 1
    def setup(self):
52
        """Replace the '__init__' method for the KytosNApp subclass.
53
54
        The setup method is automatically called by the controller when your
55
        application is loaded.
56
57
        So, if you have any setup routine, insert it here.
58
        """
59
        # object used to scheduler circuit events
60 1
        self.sched = Scheduler()
61
62
        # object to save and load circuits
63 1
        self.mongo_controller = self.get_eline_controller()
64 1
        self.mongo_controller.bootstrap_indexes()
65
66
        # set the controller that will manager the dynamic paths
67 1
        DynamicPathManager.set_controller(self.controller)
68
69
        # dictionary of EVCs created. It acts as a circuit buffer.
70
        # Every create/update/delete must be synced to mongodb.
71 1
        self.circuits = dict[str, EVC]()
72
73 1
        self._intf_events = defaultdict(dict)
74 1
        self._lock_interfaces = defaultdict(Lock)
75 1
        self.table_group = {"epl": 0, "evpl": 0}
76 1
        self._lock = Lock()
77 1
        self.multi_evc_lock = Lock()
78 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
79
80 1
        self.load_all_evcs()
81 1
        self._topology_updated_at = None
82
83 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
84
        """Get circuits sorted by desc service level and asc creation_time.
85
86
        In the future, as more ops are offloaded it should be get from the DB.
87
        """
88 1
        if enable_filter:
89 1
            return sorted(
90
                          [circuit for circuit in self.circuits.values()
91
                           if circuit.is_enabled()],
92
                          key=lambda x: (-x.service_level, x.creation_time),
93
            )
94 1
        return sorted(self.circuits.values(),
95
                      key=lambda x: (-x.service_level, x.creation_time))
96
97 1
    @staticmethod
98 1
    def get_eline_controller():
99
        """Return the ELineController instance."""
100
        return controllers.ELineController()
101
102 1
    def execute(self):
103
        """Execute once when the napp is running."""
104 1
        if self._lock.locked():
105 1
            return
106 1
        log.debug("Starting consistency routine")
107 1
        with self._lock:
108 1
            self.execute_consistency()
109 1
        log.debug("Finished consistency routine")
110
111 1
    def should_be_checked(self, circuit):
112
        "Verify if the circuit meets the necessary conditions to be checked"
113
        # pylint: disable=too-many-boolean-expressions
114 1
        if (
115
                circuit.is_enabled()
116
                and not circuit.is_active()
117
                and not circuit.lock.locked()
118
                and not circuit.has_recent_removed_flow()
119
                and not circuit.is_recent_updated()
120
                and circuit.are_unis_active()
121
                # if a inter-switch EVC does not have current_path, it does not
122
                # make sense to run sdntrace on it
123
                and (circuit.is_intra_switch() or circuit.current_path)
124
                ):
125 1
            return True
126
        return False
127
128 1
    def execute_consistency(self):
129
        """Execute consistency routine."""
130 1
        circuits_to_check = []
131 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
132 1
            if self.should_be_checked(circuit):
133 1
                circuits_to_check.append(circuit)
134 1
            if (
135 1
                circuit.is_active()
136 1
                and not circuit.failover_path
137 1
                and circuit.is_eligible_for_failover_path()
138 1
            ):
139 1
                emit_event(
140 1
                    self.controller,
141 1
                    "need_failover",
142 1
                    content=map_evc_event_content(circuit)
143 1
                )
144
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
145 1
        for circuit in circuits_to_check:
146 1
            is_checked = circuits_checked.get(circuit.id)
147 1
            if is_checked:
148 1
                circuit.execution_rounds = 0
149 1
                log.info(f"{circuit} enabled but inactive - activating")
150
                with circuit.lock:
151 1
                    circuit.activate()
152
                    circuit.sync()
153
            else:
154
                circuit.execution_rounds += 1
155
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
156
                    log.info(f"{circuit} enabled but inactive - redeploy")
157 1
                    with circuit.lock:
158 1
                        circuit.deploy()
159
160
    def shutdown(self):
161
        """Execute when your napp is unloaded.
162
163
        If you have some cleanup procedure, insert it here.
164 1
        """
165 1
166 1
    @rest("/v2/evc/", methods=["GET"])
167 1
    def list_circuits(self, request: Request) -> JSONResponse:
168 1
        """Endpoint to return circuits stored.
169
170 1
        archive query arg if defined (not null) will be filtered
171 1
        accordingly, by default only non archived evcs will be listed
172
        """
173 1
        log.debug("list_circuits /v2/evc")
174 1
        args = request.query_params
175
        archived = args.get("archived", "false").lower()
176
        args = {k: v for k, v in args.items() if k not in {"archived"}}
177
        circuits = self.mongo_controller.get_circuits(archived=archived,
178
                                                      metadata=args)
179
        circuits = circuits['circuits']
180
        return JSONResponse(circuits)
181
182 1
    @rest("/v2/evc/schedule", methods=["GET"])
183 1
    def list_schedules(self, _request: Request) -> JSONResponse:
184 1
        """Endpoint to return all schedules stored for all circuits.
185 1
186 1
        Return a JSON with the following template:
187 1
        [{"schedule_id": <schedule_id>,
188
         "circuit_id": <circuit_id>,
189 1
         "schedule": <schedule object>}]
190 1
        """
191 1
        log.debug("list_schedules /v2/evc/schedule")
192 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
193 1
        if not circuits:
194 1
            result = {}
195 1
            status = 200
196
            return JSONResponse(result, status_code=status)
197
198
        result = []
199
        status = 200
200 1
        for circuit in circuits:
201
            circuit_scheduler = circuit.get("circuit_scheduler")
202 1
            if circuit_scheduler:
203 1
                for scheduler in circuit_scheduler:
204
                    value = {
205 1
                        "schedule_id": scheduler.get("id"),
206 1
                        "circuit_id": circuit.get("id"),
207
                        "schedule": scheduler,
208 1
                    }
209 1
                    result.append(value)
210 1
211 1
        log.debug("list_schedules result %s %s", result, status)
212 1
        return JSONResponse(result, status_code=status)
213 1
214 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
215 1
    def get_circuit(self, request: Request) -> JSONResponse:
216 1
        """Endpoint to return a circuit based on id."""
217 1
        circuit_id = request.path_params["circuit_id"]
218
        log.debug("get_circuit /v2/evc/%s", circuit_id)
219
        circuit = self.mongo_controller.get_circuit(circuit_id)
220 1
        if not circuit:
221 1
            result = f"circuit_id {circuit_id} not found"
222 1
            log.debug("get_circuit result %s %s", result, 404)
223
            raise HTTPException(404, detail=result)
224
        status = 200
225
        log.debug("get_circuit result %s %s", circuit, status)
226
        return JSONResponse(circuit, status_code=status)
227
228
    # pylint: disable=too-many-branches, too-many-statements
229
    @rest("/v2/evc/", methods=["POST"])
230
    @validate_openapi(spec)
231
    def create_circuit(self, request: Request) -> JSONResponse:
232
        """Try to create a new circuit.
233
234
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
235
        UNI_Z's requested C-VID are available from the interfaces' pools. This
236
        is checked when creating the UNI object.
237
238
        Then, E-Line NApp requests a primary and a backup path to the
239
        Pathfinder NApp using the attributes primary_links and backup_links
240
        submitted via REST
241
242
        # For each link composing paths in #3:
243
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
244
        #  - Using the S-VID obtained, generate abstract flow entries to be
245
        #    sent to FlowManager
246
247 1
        Push abstract flow entries to FlowManager and FlowManager pushes
248 1
        OpenFlow entries to datapaths
249
250 1
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
251 1
        creation
252 1
253 1
        Finnaly, notify user of the status of its request.
254 1
        """
255 1
        # Try to create the circuit object
256 1
        log.debug("create_circuit /v2/evc/")
257 1
        data = get_json_or_400(request, self.controller.loop)
258 1
259 1
        try:
260
            evc = self._evc_from_dict(data)
261
        except (ValueError, KytosTagError) as exception:
262
            log.debug("create_circuit result %s %s", exception, 400)
263
            raise HTTPException(400, detail=str(exception)) from exception
264 1
        try:
265 1
            check_disabled_component(evc.uni_a, evc.uni_z)
266 1
        except DisabledSwitch as exception:
267
            log.debug("create_circuit result %s %s", exception, 409)
268
            raise HTTPException(
269
                    409,
270
                    detail=f"Path is not valid: {exception}"
271 1
                ) from exception
272 1
273
        if evc.primary_path:
274
            try:
275
                evc.primary_path.is_valid(
276 1
                    evc.uni_a.interface.switch,
277 1
                    evc.uni_z.interface.switch,
278 1
                    bool(evc.circuit_scheduler),
279
                )
280
            except InvalidPath as exception:
281
                raise HTTPException(
282
                    400,
283 1
                    detail=f"primary_path is not valid: {exception}"
284 1
                ) from exception
285
        if evc.backup_path:
286
            try:
287
                evc.backup_path.is_valid(
288
                    evc.uni_a.interface.switch,
289 1
                    evc.uni_z.interface.switch,
290 1
                    bool(evc.circuit_scheduler),
291 1
                )
292
            except InvalidPath as exception:
293 1
                raise HTTPException(
294 1
                    400,
295 1
                    detail=f"backup_path is not valid: {exception}"
296 1
                ) from exception
297
298 1
        if not evc._tag_lists_equal():
299 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
300
            raise HTTPException(400, detail=detail)
301
302
        try:
303
            evc._validate_has_primary_or_dynamic()
304 1
        except ValueError as exception:
305 1
            raise HTTPException(400, detail=str(exception)) from exception
306 1
307 1
        try:
308
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
309
        except DuplicatedNoTagUNI as exception:
310 1
            log.debug("create_circuit result %s %s", exception, 409)
311 1
            raise HTTPException(409, detail=str(exception)) from exception
312
313
        try:
314
            self._use_uni_tags(evc)
315
        except KytosTagError as exception:
316 1
            raise HTTPException(400, detail=str(exception)) from exception
317
318
        # save circuit
319 1
        try:
320
            evc.sync()
321
        except ValidationError as exception:
322 1
            raise HTTPException(400, detail=str(exception)) from exception
323 1
324 1
        # store circuit in dictionary
325 1
        self.circuits[evc.id] = evc
326
327
        # Schedule the circuit deploy
328 1
        self.sched.add(evc)
329 1
330 1
        # Circuit has no schedule, deploy now
331 1
        deployed = False
332
        if not evc.circuit_scheduler:
333 1
            with evc.lock:
334
                deployed = evc.deploy()
335 1
336 1
        # Notify users
337 1
        result = {"circuit_id": evc.id, "deployed": deployed}
338 1
        status = 201
339 1
        log.debug("create_circuit result %s %s", result, status)
340 1
        emit_event(self.controller, name="created",
341 1
                   content=map_evc_event_content(evc))
342 1
        return JSONResponse(result, status_code=status)
343 1
344 1
    @staticmethod
345
    def _use_uni_tags(evc):
346 1
        uni_a = evc.uni_a
347 1
        evc._use_uni_vlan(uni_a)
348
        try:
349
            uni_z = evc.uni_z
350
            evc._use_uni_vlan(uni_z)
351 1
        except KytosTagError as err:
352
            evc.make_uni_vlan_available(uni_a)
353 1
            raise err
354 1
355 1
    @listen_to('kytos/flow_manager.flow.removed')
356 1
    def on_flow_delete(self, event):
357 1
        """Capture delete messages to keep track when flows got removed."""
358
        self.handle_flow_delete(event)
359 1
360 1
    def handle_flow_delete(self, event):
361 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
362
        flow = event.content["flow"]
363
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
364
        if evc:
365
            log.debug("Flow removed in EVC %s", evc.id)
366
            evc.set_flow_removed_at()
367 1
368 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
369 1
    @validate_openapi(spec)
370 1
    def update(self, request: Request) -> JSONResponse:
371 1
        """Update a circuit based on payload.
372 1
373 1
        The EVC attributes (creation_time, active, current_path,
374 1
        failover_path, _id, archived) can't be updated.
375 1
        """
376
        data = get_json_or_400(request, self.controller.loop)
377 1
        circuit_id = request.path_params["circuit_id"]
378 1
        log.debug("update /v2/evc/%s", circuit_id)
379 1
        try:
380
            evc = self.circuits[circuit_id]
381
        except KeyError:
382
            result = f"circuit_id {circuit_id} not found"
383 1
            log.debug("update result %s %s", result, 404)
384 1
            raise HTTPException(404, detail=result) from KeyError
385 1
386 1
        try:
387 1
            updated_data = self._evc_dict_with_instances(data)
388
            self._check_no_tag_duplication(
389
                circuit_id, updated_data.get("uni_a"),
390 1
                updated_data.get("uni_z")
391 1
            )
392 1
            enable, redeploy = evc.update(**updated_data)
393
        except (ValueError, KytosTagError, ValidationError) as exception:
394
            log.debug("update result %s %s", exception, 400)
395
            raise HTTPException(400, detail=str(exception)) from exception
396 1
        except DuplicatedNoTagUNI as exception:
397 1
            log.debug("update result %s %s", exception, 409)
398
            raise HTTPException(409, detail=str(exception)) from exception
399
        except DisabledSwitch as exception:
400
            log.debug("update result %s %s", exception, 409)
401
            raise HTTPException(
402
                    409,
403
                    detail=f"Path is not valid: {exception}"
404
                ) from exception
405
        redeployed = False
406 1
        if evc.is_active():
407 1
            if enable is False:  # disable if active
408 1
                with evc.lock:
409 1
                    evc.remove()
410 1
            elif redeploy is not None:  # redeploy if active
411 1
                with evc.lock:
412 1
                    evc.remove()
413 1
                    redeployed = evc.deploy()
414 1
        else:
415
            if enable is True:  # enable if inactive
416 1
                with evc.lock:
417 1
                    redeployed = evc.deploy()
418
            elif evc.is_enabled() and redeploy:
419 1
                with evc.lock:
420
                    evc.remove()
421 1
                    redeployed = evc.deploy()
422 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
423
        status = 200
424
425
        log.debug("update result %s %s", result, status)
426
        emit_event(self.controller, "updated",
427
                   content=map_evc_event_content(evc, **data))
428 1
        return JSONResponse(result, status_code=status)
429 1
430 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
431 1
    def delete_circuit(self, request: Request) -> JSONResponse:
432 1
        """Remove a circuit.
433 1
434 1
        First, the flows are removed from the switches, and then the EVC is
435 1
        disabled.
436 1
        """
437
        circuit_id = request.path_params["circuit_id"]
438 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
439 1
        try:
440 1
            evc = self.circuits.pop(circuit_id)
441 1
        except KeyError:
442 1
            result = f"circuit_id {circuit_id} not found"
443 1
            log.debug("delete_circuit result %s %s", result, 404)
444 1
            raise HTTPException(404, detail=result) from KeyError
445 1
        log.info("Removing %s", evc)
446 1
447 1
        with evc.lock:
448 1
            if not evc.archived:
449
                evc.deactivate()
450
                evc.disable()
451
                self.sched.remove(evc)
452
                evc.remove_current_flows(sync=False)
453 1
                evc.remove_failover_flows(sync=False)
454 1
                evc.archive()
455 1
                evc.remove_uni_tags()
456 1
                evc.sync()
457
                emit_event(
458 1
                    self.controller, "deleted",
459
                    content=map_evc_event_content(evc)
460 1
                )
461 1
462
        log.info("EVC removed. %s", evc)
463 1
        result = {"response": f"Circuit {circuit_id} removed"}
464 1
        status = 200
465 1
        log.debug("delete_circuit result %s %s", result, status)
466
467
        return JSONResponse(result, status_code=status)
468
469
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
470
    def get_metadata(self, request: Request) -> JSONResponse:
471
        """Get metadata from an EVC."""
472
        circuit_id = request.path_params["circuit_id"]
473
        try:
474 1
            return (
475 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
476 1
            )
477
        except KeyError as error:
478 1
            raise HTTPException(
479 1
                404,
480
                detail=f"circuit_id {circuit_id} not found."
481 1
            ) from error
482
483 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
484 1
    @validate_openapi(spec)
485 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
486 1
        """Add metadata to a bulk of EVCs."""
487 1
        data = get_json_or_400(request, self.controller.loop)
488 1
        circuit_ids = data.pop("circuit_ids")
489 1
490
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
491 1
492 1
        fail_evcs = []
493 1
        for _id in circuit_ids:
494
            try:
495 1
                evc = self.circuits[_id]
496 1
                evc.extend_metadata(data)
497 1
            except KeyError:
498
                fail_evcs.append(_id)
499 1
500 1
        if fail_evcs:
501 1
            raise HTTPException(404, detail=fail_evcs)
502
        return JSONResponse("Operation successful", status_code=201)
503 1
504 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
505 1
    @validate_openapi(spec)
506 1
    def add_metadata(self, request: Request) -> JSONResponse:
507
        """Add metadata to an EVC."""
508
        circuit_id = request.path_params["circuit_id"]
509
        metadata = get_json_or_400(request, self.controller.loop)
510
        if not isinstance(metadata, dict):
511 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
512 1
        try:
513 1
            evc = self.circuits[circuit_id]
514
        except KeyError as error:
515 1
            raise HTTPException(
516 1
                404,
517 1
                detail=f"circuit_id {circuit_id} not found."
518
            ) from error
519 1
520 1
        evc.extend_metadata(metadata)
521 1
        evc.sync()
522 1
        return JSONResponse("Operation successful", status_code=201)
523
524 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
525
    @validate_openapi(spec)
526 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
527 1
        """Delete metada from a bulk of EVCs"""
528 1
        data = get_json_or_400(request, self.controller.loop)
529 1
        key = request.path_params["key"]
530 1
        circuit_ids = data.pop("circuit_ids")
531 1
        self.mongo_controller.update_evcs_metadata(
532 1
            circuit_ids, {key: ""}, "del"
533
        )
534 1
535 1
        fail_evcs = []
536 1
        for _id in circuit_ids:
537
            try:
538 1
                evc = self.circuits[_id]
539 1
                evc.remove_metadata(key)
540
            except KeyError:
541 1
                fail_evcs.append(_id)
542 1
543 1
        if fail_evcs:
544 1
            raise HTTPException(404, detail=fail_evcs)
545 1
        return JSONResponse("Operation successful")
546 1
547
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
548
    def delete_metadata(self, request: Request) -> JSONResponse:
549
        """Delete metadata from an EVC."""
550
        circuit_id = request.path_params["circuit_id"]
551 1
        key = request.path_params["key"]
552 1
        try:
553 1
            evc = self.circuits[circuit_id]
554
        except KeyError as error:
555 1
            raise HTTPException(
556 1
                404,
557
                detail=f"circuit_id {circuit_id} not found."
558 1
            ) from error
559 1
560
        evc.remove_metadata(key)
561
        evc.sync()
562 1
        return JSONResponse("Operation successful")
563 1
564
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
565
    def redeploy(self, request: Request) -> JSONResponse:
566 1
        """Endpoint to force the redeployment of an EVC."""
567 1
        circuit_id = request.path_params["circuit_id"]
568 1
        try_avoid_same_s_vlan = request.query_params.get(
569 1
            "try_avoid_same_s_vlan", "true"
570 1
        )
571
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
572
        if try_avoid_same_s_vlan not in {"true", "false"}:
573
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
574 1
            raise HTTPException(400, detail=msg)
575 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
576 1
        try:
577 1
            evc = self.circuits[circuit_id]
578
        except KeyError:
579
            raise HTTPException(
580
                404,
581 1
                detail=f"circuit_id {circuit_id} not found"
582 1
            ) from KeyError
583 1
        deployed = False
584 1
        with evc.lock:
585 1
            if evc.is_enabled():
586
                path_dict = evc.remove_current_flows(
587 1
                    sync=False,
588
                    return_path=try_avoid_same_s_vlan == "true"
589
                )
590 1
                evc.remove_failover_flows(sync=True)
591
                deployed = evc.deploy(path_dict)
592 1
        if deployed:
593
            result = {"response": f"Circuit {circuit_id} redeploy received."}
594 1
            status = 202
595 1
        else:
596 1
            result = {
597
                "response": f"Circuit {circuit_id} is disabled."
598
            }
599
            status = 409
600
601
        return JSONResponse(result, status_code=status)
602
603
    @rest("/v2/evc/schedule/", methods=["POST"])
604
    @validate_openapi(spec)
605
    def create_schedule(self, request: Request) -> JSONResponse:
606
        """
607
        Create a new schedule for a given circuit.
608
609
        This service do no check if there are conflicts with another schedule.
610
        Payload example:
611
            {
612 1
              "circuit_id":"aa:bb:cc",
613 1
              "schedule": {
614 1
                "date": "2019-08-07T14:52:10.967Z",
615 1
                "interval": "string",
616
                "frequency": "1 * * * *",
617
                "action": "create"
618 1
              }
619
            }
620
        """
621 1
        log.debug("create_schedule /v2/evc/schedule/")
622
        data = get_json_or_400(request, self.controller.loop)
623
        circuit_id = data["circuit_id"]
624 1
        schedule_data = data["schedule"]
625 1
626 1
        # Get EVC from circuits buffer
627 1
        circuits = self._get_circuits_buffer()
628
629
        # get the circuit
630 1
        evc = circuits.get(circuit_id)
631
632
        # get the circuit
633 1
        if not evc:
634 1
            result = f"circuit_id {circuit_id} not found"
635
            log.debug("create_schedule result %s %s", result, 404)
636
            raise HTTPException(404, detail=result)
637 1
638
        # new schedule from dict
639
        new_schedule = CircuitSchedule.from_dict(schedule_data)
640 1
641
        # If there is no schedule, create the list
642
        if not evc.circuit_scheduler:
643 1
            evc.circuit_scheduler = []
644
645 1
        # Add the new schedule
646 1
        evc.circuit_scheduler.append(new_schedule)
647
648 1
        # Add schedule job
649 1
        self.sched.add_circuit_job(evc, new_schedule)
650
651 1
        # save circuit to mongodb
652 1
        evc.sync()
653 1
654
        result = new_schedule.as_dict()
655
        status = 201
656
657
        log.debug("create_schedule result %s %s", result, status)
658
        return JSONResponse(result, status_code=status)
659
660
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
661
    @validate_openapi(spec)
662
    def update_schedule(self, request: Request) -> JSONResponse:
663
        """Update a schedule.
664
665
        Change all attributes from the given schedule from a EVC circuit.
666 1
        The schedule ID is preserved as default.
667 1
        Payload example:
668 1
            {
669
              "date": "2019-08-07T14:52:10.967Z",
670
              "interval": "string",
671 1
              "frequency": "1 * * *",
672
              "action": "create"
673
            }
674 1
        """
675 1
        data = get_json_or_400(request, self.controller.loop)
676 1
        schedule_id = request.path_params["schedule_id"]
677 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
678
679 1
        # Try to find a circuit schedule
680 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
681
682 1
        # Can not modify circuits deleted and archived
683
        if not found_schedule:
684 1
            result = f"schedule_id {schedule_id} not found"
685
            log.debug("update_schedule result %s %s", result, 404)
686
            raise HTTPException(404, detail=result)
687 1
688
        new_schedule = CircuitSchedule.from_dict(data)
689 1
        new_schedule.id = found_schedule.id
690
        # Remove the old schedule
691 1
        evc.circuit_scheduler.remove(found_schedule)
692
        # Append the modified schedule
693 1
        evc.circuit_scheduler.append(new_schedule)
694 1
695
        # Cancel all schedule jobs
696 1
        self.sched.cancel_job(found_schedule.id)
697 1
        # Add the new circuit schedule
698
        self.sched.add_circuit_job(evc, new_schedule)
699 1
        # Save EVC to mongodb
700 1
        evc.sync()
701
702
        result = new_schedule.as_dict()
703
        status = 200
704
705
        log.debug("update_schedule result %s %s", result, status)
706
        return JSONResponse(result, status_code=status)
707 1
708 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
709 1
    def delete_schedule(self, request: Request) -> JSONResponse:
710
        """Remove a circuit schedule.
711
712 1
        Remove the Schedule from EVC.
713 1
        Remove the Schedule from cron job.
714 1
        Save the EVC to the Storehouse.
715 1
        """
716
        schedule_id = request.path_params["schedule_id"]
717
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
718 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
719
720
        # Can not modify circuits deleted and archived
721 1
        if not found_schedule:
722
            result = f"schedule_id {schedule_id} not found"
723 1
            log.debug("delete_schedule result %s %s", result, 404)
724
            raise HTTPException(404, detail=result)
725 1
726 1
        # Remove the old schedule
727
        evc.circuit_scheduler.remove(found_schedule)
728 1
729 1
        # Cancel all schedule jobs
730
        self.sched.cancel_job(found_schedule.id)
731 1
        # Save EVC to mongodb
732
        evc.sync()
733
734
        result = "Schedule removed"
735
        status = 200
736
737
        log.debug("delete_schedule result %s %s", result, status)
738
        return JSONResponse(result, status_code=status)
739
740
    def _check_no_tag_duplication(
741
        self,
742
        evc_id: str,
743
        uni_a: Optional[UNI] = None,
744 1
        uni_z: Optional[UNI] = None
745 1
    ):
746
        """Check if the given EVC has UNIs with no tag and if these are
747 1
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
748
        Args:
749 1
            evc (dict): EVC to be analyzed.
750 1
        """
751 1
752 1
        # No UNIs
753 1
        if not (uni_a or uni_z):
754 1
            return
755 1
756
        if (not (uni_a and not uni_a.user_tag) and
757 1
                not (uni_z and not uni_z.user_tag)):
758 1
            return
759
        for circuit in self.circuits.copy().values():
760
            if (not circuit.archived and circuit._id != evc_id):
761
                if uni_a and uni_a.user_tag is None:
762 1
                    circuit.check_no_tag_duplicate(uni_a)
763
                if uni_z and uni_z.user_tag is None:
764 1
                    circuit.check_no_tag_duplicate(uni_z)
765 1
766 1
    @listen_to("kytos/topology.link_up")
767 1
    def on_link_up(self, event):
768 1
        """Change circuit when link is up or end_maintenance."""
769
        self.handle_link_up(event)
770
771 1
    def handle_link_up(self, event):
772
        """Change circuit when link is up or end_maintenance."""
773
        log.info("Event handle_link_up %s", event.content["link"])
774 1
        for evc in self.get_evcs_by_svc_level():
775
            if evc.is_enabled() and not evc.archived:
776
                with evc.lock:
777
                    evc.handle_link_up(event.content["link"])
778
779
    # Possibly replace this with interruptions?
780 1
    @listen_to(
781
        '.*.switch.interface.(link_up|link_down|created|deleted)'
782
    )
783
    def on_interface_link_change(self, event: KytosEvent):
784
        """
785
        Handler for interface link_up and link_down events.
786
        """
787
        self.handle_on_interface_link_change(event)
788
789
    def handle_on_interface_link_change(self, event: KytosEvent):
790 1
        """
791 1
        Handler to sort interface events {link_(up, down), create, deleted}
792 1
793
        To avoid multiple database updated (link flap):
794 1
        Every interface is identfied and processed in parallel.
795
        Once an interface event is received a time is started.
796
        While time is running self._intf_events will be updated.
797
        After time has passed last received event will be processed.
798 1
        """
799 1
        iface = event.content.get("interface")
800 1
        with self._lock_interfaces[iface.id]:
801 1
            _now = event.timestamp
802 1
            # Return out of order events
803 1
            if (
804 1
                iface.id in self._intf_events
805 1
                and self._intf_events[iface.id]["event"].timestamp > _now
806 1
            ):
807 1
                return
808 1
            self._intf_events[iface.id].update({"event": event})
809 1
            if "last_acquired" in self._intf_events[iface.id]:
810 1
                return
811 1
            self._intf_events[iface.id].update({"last_acquired": now()})
812
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
813 1
        with self._lock_interfaces[iface.id]:
814
            event = self._intf_events[iface.id]["event"]
815
            self._intf_events[iface.id].pop('last_acquired', None)
816
            _, _, event_type = event.name.rpartition('.')
817
            if event_type in ('link_up', 'created'):
818
                self.handle_interface_link_up(iface)
819
            elif event_type in ('link_down', 'deleted'):
820
                self.handle_interface_link_down(iface)
821
822
    def handle_interface_link_up(self, interface):
823
        """
824
        Handler for interface link_up events
825 1
        """
826
        log.info("Event handle_interface_link_up %s", interface)
827
        for evc in self.get_evcs_by_svc_level():
828
            if _does_uni_affect_evc(evc, interface, "up"):
829
                with evc.lock:
830
                    evc.handle_interface_link_up(
831
                        interface
832
                    )
833
834
    def handle_interface_link_down(self, interface):
835
        """
836
        Handler for interface link_down events
837 1
        """
838 1
        log.info("Event handle_interface_link_down %s", interface)
839
        for evc in self.get_evcs_by_svc_level():
840
            if _does_uni_affect_evc(evc, interface, "down"):
841
                with evc.lock:
842 1
                    evc.handle_interface_link_down(
843
                        interface
844
                    )
845
846
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
847
    def on_link_down(self, event):
848
        """Change circuit when link is down or under_mantenance."""
849
        self.handle_link_down(event)
850
851
    def prepare_swap_to_failover_flow(self, evc: EVC):
852
        """Prepare an evc for switching to failover."""
853
        install_flows = {}
854
        try:
855
            install_flows = evc.get_failover_flows()
856 1
        # pylint: disable=broad-except
857
        except Exception:
858
            err = traceback.format_exc()
859
            log.error(
860
                "Ignore Failover path for "
861
                f"{evc} due to error: {err}"
862
            )
863 1
        return install_flows
864
865
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
866
        """Prepare event contents for swap to failover."""
867
        return map_evc_event_content(
868 1
            evc,
869 1
            flows=deepcopy(install_flow)
870 1
        )
871 1
872
    def execute_swap_to_failover(
873 1
        self,
874 1
        evcs: list[EVC]
875 1
    ) -> tuple[list[EVC], list[EVC]]:
876 1
        """Process changes needed to commit a swap to failover."""
877 1
        event_contents = {}
878
        install_flows = {}
879 1
        swapped_evcs = list[EVC]()
880
        not_swapped_evcs = list[EVC]()
881 1
882
        for evc in evcs:
883 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
884 1
            if install_flow:
885
                install_flows = merge_flow_dicts(install_flows, install_flow)
886
                event_contents[evc.id] =\
887
                    self.prepare_swap_to_failover_event(evc, install_flow)
888 1
                swapped_evcs.append(evc)
889 1
            else:
890 1
                not_swapped_evcs.append(evc)
891 1
892 1
        try:
893
            send_flow_mods_http(
894
                install_flows,
895
                "install"
896 1
            )
897 1
            for evc in swapped_evcs:
898 1
                temp_path = evc.current_path
899 1
                evc.current_path = evc.failover_path
900
                evc.failover_path = temp_path
901 1
            emit_event(
902
                self.controller, "failover_link_down",
903
                content=deepcopy(event_contents)
904
            )
905
            return swapped_evcs, not_swapped_evcs
906
        except FlowModException as exc:
907
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
908
            return [], [*swapped_evcs, *not_swapped_evcs]
909
910
    def prepare_clear_failover_flow(self, evc: EVC):
911
        """Prepare an evc for clearing the old path."""
912
        del_flows = {}
913
        try:
914
            del_flows = prepare_delete_flow(
915
                merge_flow_dicts(
916
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
917 1
                    evc._prepare_nni_flows(evc.failover_path)
918
                )
919
            )
920
        # pylint: disable=broad-except
921
        except Exception:
922
            err = traceback.format_exc()
923
            log.error(f"Fail to remove {evc} old_path: {err}")
924
        return del_flows
925 1
926
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
927
        """Prepare event contents for clearing failover."""
928
        return map_evc_event_content(
929
            evc,
930 1
            current_path=evc.current_path.as_dict(),
931 1
            removed_flows=deepcopy(delete_flow)
932 1
        )
933 1
934
    def execute_clear_failover(
935 1
        self,
936 1
        evcs: list[EVC]
937 1
    ) -> tuple[list[EVC], list[EVC]]:
938 1
        """Process changes needed to commit clearing the failover path"""
939 1
        event_contents = {}
940
        delete_flows = {}
941 1
        cleared_evcs = list[EVC]()
942
        not_cleared_evcs = list[EVC]()
943 1
944
        for evc in evcs:
945 1
            delete_flow = self.prepare_clear_failover_flow(evc)
946 1
            if delete_flow:
947
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
948
                event_contents[evc.id] =\
949
                    self.prepare_clear_failover_event(evc, delete_flow)
950 1
                cleared_evcs.append(evc)
951 1
            else:
952 1
                not_cleared_evcs.append(evc)
953 1
954
        try:
955
            send_flow_mods_http(
956
                delete_flows,
957
                'delete'
958 1
            )
959 1
            for evc in cleared_evcs:
960 1
                evc.failover_path.make_vlans_available(self.controller)
961 1
                evc.failover_path = Path([])
962
            emit_event(
963 1
                self.controller,
964
                "failover_old_path",
965
                content=event_contents
966
            )
967
            return cleared_evcs, not_cleared_evcs
968
        except FlowModException as exc:
969
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
970
            return [], [*cleared_evcs, *not_cleared_evcs]
971
972
    def prepare_undeploy_flow(self, evc: EVC):
973
        """Prepare an evc for undeploying."""
974
        del_flows = {}
975
        try:
976
            del_flows = prepare_delete_flow(
977
                merge_flow_dicts(
978
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
979
                    evc._prepare_nni_flows(evc.current_path),
980 1
                    evc._prepare_nni_flows(evc.failover_path)
981
                )
982 1
            )
983 1
        # pylint: disable=broad-except
984 1
        except Exception:
985
            err = traceback.format_exc()
986 1
            log.error(f"Fail to undeploy {evc}: {err}")
987 1
        return del_flows
988 1
989 1
    def execute_undeploy(self, evcs: list[EVC]):
990 1
        """Process changes needed to commit an undeploy"""
991
        delete_flows = {}
992 1
        undeploy_evcs = list[EVC]()
993
        not_undeploy_evcs = list[EVC]()
994 1
995 1
        for evc in evcs:
996
            delete_flow = self.prepare_undeploy_flow(evc)
997
            if delete_flow:
998
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
999
                undeploy_evcs.append(evc)
1000 1
            else:
1001 1
                not_undeploy_evcs.append(evc)
1002 1
1003 1
        try:
1004 1
            send_flow_mods_http(
1005 1
                delete_flows,
1006 1
                'delete'
1007
            )
1008
1009
            for evc in undeploy_evcs:
1010
                evc.current_path.make_vlans_available(self.controller)
1011 1
                evc.failover_path.make_vlans_available(self.controller)
1012 1
                evc.current_path = Path([])
1013 1
                evc.failover_path = Path([])
1014 1
                evc.deactivate()
1015
                emit_event(
1016
                    self.controller,
1017 1
                    "need_redeploy",
1018
                    content={"evc_id": evc.id}
1019 1
                )
1020
                log.info(f"{evc} scheduled for redeploy")
1021 1
            return undeploy_evcs, not_undeploy_evcs
1022 1
        except FlowModException as exc:
1023
            log.error(
1024 1
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1025 1
            )
1026 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1027 1
1028 1
    def handle_link_down(self, event):
1029 1
        """Change circuit when link is down or under_mantenance."""
1030
        link = event.content["link"]
1031 1
        log.info("Event handle_link_down %s", link)
1032 1
1033 1
        with ExitStack() as exit_stack:
1034 1
            exit_stack.enter_context(self.multi_evc_lock)
1035
            swap_to_failover = list[EVC]()
1036
            undeploy = list[EVC]()
1037
            clear_failover = list[EVC]()
1038
            evcs_to_update = dict[str, EVC]()
1039 1
1040 1
            for evc in self.get_evcs_by_svc_level():
1041
                with ExitStack() as sub_stack:
1042
                    sub_stack.enter_context(evc.lock)
1043
                    if all((
1044
                        evc.is_affected_by_link(link),
1045 1
                        evc.failover_path,
1046 1
                        not evc.is_failover_path_affected_by_link(link)
1047
                    )):
1048
                        swap_to_failover.append(evc)
1049
                    elif all((
1050
                        evc.is_affected_by_link(link),
1051 1
                        not evc.failover_path or
1052
                        evc.is_failover_path_affected_by_link(link)
1053
                    )):
1054
                        undeploy.append(evc)
1055 1
                    elif all((
1056
                        not evc.is_affected_by_link(link),
1057
                        evc.failover_path,
1058
                        evc.is_failover_path_affected_by_link(link)
1059 1
                    )):
1060 1
                        clear_failover.append(evc)
1061
                    else:
1062
                        continue
1063
1064 1
                    exit_stack.push(sub_stack.pop_all())
1065
1066 1
            # Swap from current path to failover path
1067
1068 1
            if swap_to_failover:
1069
                success, failure = self.execute_swap_to_failover(
1070
                    swap_to_failover
1071
                )
1072 1
1073 1
                clear_failover.extend(success)
1074
1075 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1076
1077 1
                undeploy.extend(failure)
1078
1079
            # Clear out failover path
1080
1081 1
            if clear_failover:
1082 1
                success, failure = self.execute_clear_failover(clear_failover)
1083
1084 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1085
1086 1
                undeploy.extend(failure)
1087 1
1088
            # Undeploy the evc, schedule a redeploy
1089
1090
            if undeploy:
1091 1
                success, failure = self.execute_undeploy(undeploy)
1092 1
1093
                evcs_to_update.update((evc.id, evc) for evc in success)
1094
1095
                if failure:
1096 1
                    log.error(f"Failed to handle_link_down for {failure}")
1097 1
1098
            # Push update to DB
1099
1100
            if evcs_to_update:
1101 1
                self.mongo_controller.update_evcs(
1102
                    [evc.as_dict() for evc in evcs_to_update.values()]
1103
                )
1104
1105
    @listen_to("kytos/mef_eline.need_redeploy")
1106
    def on_evc_need_redeploy(self, event):
1107
        """Redeploy evcs that need to be redeployed."""
1108
        self.handle_evc_need_redeploy(event)
1109
1110
    def handle_evc_need_redeploy(self, event):
1111
        """Redeploy evcs that need to be redeployed."""
1112
        evc = self.circuits.get(event.content["evc_id"])
1113
        if evc is None:
1114
            return
1115
        with evc.lock:
1116
            if not evc.is_enabled() or evc.is_active():
1117 1
                return
1118 1
            result = evc.deploy()
1119
        event_name = "error_redeploy_link_down"
1120
        if result:
1121
            log.info(f"{evc} redeployed")
1122 1
            event_name = "redeployed_link_down"
1123
        emit_event(self.controller, event_name,
1124 1
                   content=map_evc_event_content(evc))
1125 1
1126 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1127 1
    def on_evc_affected_by_link_down(self, event):
1128 1
        """Change circuit when link is down or under_mantenance."""
1129 1
        self.handle_evc_affected_by_link_down(event)
1130
1131 1
    def handle_evc_affected_by_link_down(self, event):
1132 1
        """Change circuit when link is down or under_mantenance."""
1133 1
        evc = self.circuits.get(event.content["evc_id"])
1134 1
        link = event.content['link']
1135 1
        if not evc:
1136 1
            return
1137
        with evc.lock:
1138
            if not evc.is_affected_by_link(link):
1139 1
                return
1140 1
            result = evc.handle_link_down()
1141
        event_name = "error_redeploy_link_down"
1142
        if result:
1143
            log.info(f"{evc} redeployed due to link down {link.id}")
1144 1
            event_name = "redeployed_link_down"
1145
        emit_event(self.controller, event_name,
1146
                   content=map_evc_event_content(evc))
1147
1148
    @listen_to(
1149
        "kytos/mef_eline.(redeployed_link_(up|down)|deployed|need_failover)"
1150
    )
1151 1
    def on_evc_deployed(self, event):
1152 1
        """Handle EVC deployed|redeployed_link_down."""
1153
        self.handle_evc_deployed(event)
1154
1155
    def handle_evc_deployed(self, event):
1156 1
        """Setup failover path on evc deployed."""
1157
        evc = self.circuits.get(event.content["evc_id"])
1158 1
        if evc is None:
1159 1
            return
1160 1
        with evc.lock:
1161 1
            if (
1162 1
                not evc.is_eligible_for_failover_path()
1163
                or not evc.is_active()
1164
                or evc.failover_path
1165 1
                or not evc.current_path
1166
            ):
1167 1
                return
1168 1
            evc.setup_failover_path()
1169 1
1170 1
    @listen_to("kytos/topology.topology_loaded")
1171
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1172
        """Load EVCs once the topology is available."""
1173 1
        self.load_all_evcs()
1174 1
1175 1
    def load_all_evcs(self):
1176
        """Try to load all EVCs on startup."""
1177 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1178 1
        for circuit_id, circuit in circuits:
1179 1
            if circuit_id not in self.circuits:
1180
                self._load_evc(circuit)
1181 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1182 1
                   timeout=1)
1183
1184
    def _load_evc(self, circuit_dict):
1185
        """Load one EVC from mongodb to memory."""
1186 1
        try:
1187
            evc = self._evc_from_dict(circuit_dict)
1188 1
        except (ValueError, KytosTagError) as exception:
1189 1
            log.error(
1190 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1191 1
            )
1192 1
            return None
1193 1
        if evc.archived:
1194 1
            return None
1195 1
1196 1
        self.circuits.setdefault(evc.id, evc)
1197 1
        self.sched.add(evc)
1198
        return evc
1199 1
1200
    @listen_to("kytos/flow_manager.flow.error")
1201
    def on_flow_mod_error(self, event):
1202
        """Handle flow mod errors related to an EVC."""
1203
        self.handle_flow_mod_error(event)
1204 1
1205 1
    def handle_flow_mod_error(self, event):
1206
        """Handle flow mod errors related to an EVC."""
1207
        flow = event.content["flow"]
1208 1
        command = event.content.get("error_command")
1209 1
        if command != "add":
1210 1
            return
1211 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1212 1
        if not evc or evc.archived or not evc.is_enabled():
1213 1
            return
1214
        with evc.lock:
1215 1
            evc.remove_current_flows(sync=False)
1216 1
            evc.remove_failover_flows(sync=True)
1217 1
1218 1
    def _evc_dict_with_instances(self, evc_dict):
1219
        """Convert some dict values to instance of EVC classes.
1220
1221
        This method will convert: [UNI, Link]
1222
        """
1223
        data = evc_dict.copy()  # Do not modify the original dict
1224
        for attribute, value in data.items():
1225
            # Get multiple attributes.
1226 1
            # Ex: uni_a, uni_z
1227 1
            if "uni" in attribute:
1228
                try:
1229
                    data[attribute] = self._uni_from_dict(value)
1230
                except ValueError as exception:
1231
                    result = "Error creating UNI: Invalid value"
1232
                    raise ValueError(result) from exception
1233
1234 1
            if attribute == "circuit_scheduler":
1235
                data[attribute] = []
1236 1
                for schedule in value:
1237
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1238
1239
            # Get multiple attributes.
1240 1
            # Ex: primary_links,
1241
            #     backup_links,
1242 1
            #     current_links_cache,
1243 1
            #     primary_links_cache,
1244 1
            #     backup_links_cache
1245 1
            if "links" in attribute:
1246
                data[attribute] = [
1247 1
                    self._link_from_dict(link, attribute) for link in value
1248
                ]
1249 1
1250 1
            # Ex: current_path,
1251
            #     primary_path,
1252 1
            #     backup_path
1253 1
            if (attribute.endswith("path") and
1254 1
                    attribute != "dynamic_backup_path"):
1255 1
                data[attribute] = Path(
1256
                    [self._link_from_dict(link, attribute) for link in value]
1257
                )
1258
1259 1
        return data
1260 1
1261 1
    def _evc_from_dict(self, evc_dict):
1262 1
        data = self._evc_dict_with_instances(evc_dict)
1263 1
        data["table_group"] = self.table_group
1264 1
        return EVC(self.controller, **data)
1265 1
1266 1
    def _uni_from_dict(self, uni_dict):
1267 1
        """Return a UNI object from python dict."""
1268 1
        if uni_dict is None:
1269 1
            return False
1270
1271 1
        interface_id = uni_dict.get("interface_id")
1272
        interface = self.controller.get_interface_by_id(interface_id)
1273 1
        if interface is None:
1274 1
            result = (
1275 1
                "Error creating UNI:"
1276
                + f"Could not instantiate interface {interface_id}"
1277 1
            )
1278
            raise ValueError(result) from ValueError
1279 1
        tag_convert = {1: "vlan"}
1280 1
        tag_dict = uni_dict.get("tag", None)
1281
        if tag_dict:
1282 1
            tag_type = tag_dict.get("tag_type")
1283 1
            tag_type = tag_convert.get(tag_type, tag_type)
1284 1
            tag_value = tag_dict.get("value")
1285 1
            if isinstance(tag_value, list):
1286 1
                tag_value = get_tag_ranges(tag_value)
1287 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1288
                tag = TAGRange(tag_type, tag_value, mask_list)
1289
            else:
1290
                tag = TAG(tag_type, tag_value)
1291 1
        else:
1292 1
            tag = None
1293 1
        uni = UNI(interface, tag)
1294 1
        return uni
1295
1296 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1297 1
        """Return a Link object from python dict."""
1298 1
        id_a = link_dict.get("endpoint_a").get("id")
1299 1
        id_b = link_dict.get("endpoint_b").get("id")
1300
1301
        endpoint_a = self.controller.get_interface_by_id(id_a)
1302 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1303 1
        if not endpoint_a:
1304
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1305 1
            raise ValueError(error_msg)
1306
        if not endpoint_b:
1307
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1308
            raise ValueError(error_msg)
1309
1310
        link = Link(endpoint_a, endpoint_b)
1311
        allowed_paths = {"current_path", "failover_path"}
1312 1
        if "metadata" in link_dict and attribute in allowed_paths:
1313 1
            link.extend_metadata(link_dict.get("metadata"))
1314 1
1315
        s_vlan = link.get_metadata("s_vlan")
1316
        if s_vlan:
1317 1
            tag = TAG.from_dict(s_vlan)
1318 1
            if tag is False:
1319 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1320 1
                raise ValueError(error_msg)
1321 1
            link.update_metadata("s_vlan", tag)
1322 1
        return link
1323 1
1324 1
    def _find_evc_by_schedule_id(self, schedule_id):
1325 1
        """
1326
        Find an EVC and CircuitSchedule based on schedule_id.
1327 1
1328
        :param schedule_id: Schedule ID
1329
        :return: EVC and Schedule
1330
        """
1331
        circuits = self._get_circuits_buffer()
1332
        found_schedule = None
1333 1
        evc = None
1334
1335 1
        # pylint: disable=unused-variable
1336 1
        for c_id, circuit in circuits.items():
1337 1
            for schedule in circuit.circuit_scheduler:
1338 1
                if schedule.id == schedule_id:
1339 1
                    found_schedule = schedule
1340
                    evc = circuit
1341
                    break
1342 1
            if found_schedule:
1343 1
                break
1344
        return evc, found_schedule
1345 1
1346 1
    def _get_circuits_buffer(self):
1347 1
        """
1348 1
        Return the circuit buffer.
1349 1
1350 1
        If the buffer is empty, try to load data from mongodb.
1351
        """
1352
        if not self.circuits:
1353 1
            # Load circuits from mongodb to buffer
1354 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1355 1
            for c_id, circuit in circuits.items():
1356 1
                evc = self._evc_from_dict(circuit)
1357 1
                self.circuits[c_id] = evc
1358
        return self.circuits
1359
1360
    # pylint: disable=attribute-defined-outside-init
1361
    @alisten_to("kytos/of_multi_table.enable_table")
1362
    async def on_table_enabled(self, event):
1363
        """Handle a recently table enabled."""
1364
        table_group = event.content.get("mef_eline", None)
1365
        if not table_group:
1366
            return
1367
        for group in table_group:
1368
            if group not in settings.TABLE_GROUP_ALLOWED:
1369
                log.error(f'The table group "{group}" is not allowed for '
1370
                          f'mef_eline. Allowed table groups are '
1371
                          f'{settings.TABLE_GROUP_ALLOWED}')
1372
                return
1373
        self.table_group.update(table_group)
1374
        content = {"group_table": self.table_group}
1375
        name = "kytos/mef_eline.enable_table"
1376
        await aemit_event(self.controller, name, content)
1377