Test Failed
Pull Request — master (#697)
by
unknown
06:22 queued 01:48
created

build.main.Main.get_circuit()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 2
dl 0
loc 13
ccs 11
cts 11
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40
41
# pylint: disable=too-many-public-methods
42
class Main(KytosNApp):
43 1
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46
    """
47
48
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49 1
50
    def setup(self):
51 1
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57
        """
58
        # object used to scheduler circuit events
59
        self.sched = Scheduler()
60 1
61
        # object to save and load circuits
62
        self.mongo_controller = self.get_eline_controller()
63 1
        self.mongo_controller.bootstrap_indexes()
64 1
65
        # set the controller that will manager the dynamic paths
66
        DynamicPathManager.set_controller(self.controller)
67 1
68
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70
        self.circuits = dict[str, EVC]()
71 1
72
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75 1
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78 1
79
        self.load_all_evcs()
80 1
        self._topology_updated_at = None
81 1
82
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83 1
        """Get circuits sorted by desc service level and asc creation_time.
84
85
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87
        if enable_filter:
88 1
            return sorted(
89 1
                          [circuit for circuit in self.circuits.values()
90
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93
        return sorted(self.circuits.values(),
94 1
                      key=lambda x: (-x.service_level, x.creation_time))
95
96
    @staticmethod
97 1
    def get_eline_controller():
98 1
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100
101
    def execute(self):
102 1
        """Execute once when the napp is running."""
103
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106 1
        with self._lock:
107 1
            self.execute_consistency()
108 1
        log.debug("Finished consistency routine")
109 1
110
    def should_be_checked(self, circuit):
111 1
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113
        if (
114 1
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124
            return True
125 1
        return False
126
127
    def execute_consistency(self):
128 1
        """Execute consistency routine."""
129
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
134 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
135 1
        for circuit in circuits_to_check:
136 1
            is_checked = circuits_checked.get(circuit.id)
137 1
            if is_checked:
138 1
                circuit.execution_rounds = 0
139 1
                log.info(f"{circuit} enabled but inactive - activating")
140 1
                with circuit.lock:
141 1
                    circuit.activate()
142 1
                    circuit.sync()
143 1
            else:
144
                circuit.execution_rounds += 1
145 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
146 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
147 1
                    with circuit.lock:
148 1
                        circuit.deploy()
149 1
150
    def shutdown(self):
151 1
        """Execute when your napp is unloaded.
152
153
        If you have some cleanup procedure, insert it here.
154
        """
155
156
    @rest("/v2/evc/", methods=["GET"])
157 1
    def list_circuits(self, request: Request) -> JSONResponse:
158 1
        """Endpoint to return circuits stored.
159
160
        archive query arg if defined (not null) will be filtered
161
        accordingly, by default only non archived evcs will be listed
162
        """
163
        log.debug("list_circuits /v2/evc")
164 1
        args = request.query_params
165 1
        archived = args.get("archived", "false").lower()
166 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
167 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
168 1
                                                      metadata=args)
169
        circuits = circuits['circuits']
170 1
        return JSONResponse(circuits)
171 1
172
    @rest("/v2/evc/schedule", methods=["GET"])
173 1
    def list_schedules(self, _request: Request) -> JSONResponse:
174 1
        """Endpoint to return all schedules stored for all circuits.
175
176
        Return a JSON with the following template:
177
        [{"schedule_id": <schedule_id>,
178
         "circuit_id": <circuit_id>,
179
         "schedule": <schedule object>}]
180
        """
181
        log.debug("list_schedules /v2/evc/schedule")
182 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
183 1
        if not circuits:
184 1
            result = {}
185 1
            status = 200
186 1
            return JSONResponse(result, status_code=status)
187 1
188
        result = []
189 1
        status = 200
190 1
        for circuit in circuits:
191 1
            circuit_scheduler = circuit.get("circuit_scheduler")
192 1
            if circuit_scheduler:
193 1
                for scheduler in circuit_scheduler:
194 1
                    value = {
195 1
                        "schedule_id": scheduler.get("id"),
196
                        "circuit_id": circuit.get("id"),
197
                        "schedule": scheduler,
198
                    }
199
                    result.append(value)
200 1
201
        log.debug("list_schedules result %s %s", result, status)
202 1
        return JSONResponse(result, status_code=status)
203 1
204
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
205 1
    def get_circuit(self, request: Request) -> JSONResponse:
206 1
        """Endpoint to return a circuit based on id."""
207
        circuit_id = request.path_params["circuit_id"]
208 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
209 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
210 1
        if not circuit:
211 1
            result = f"circuit_id {circuit_id} not found"
212 1
            log.debug("get_circuit result %s %s", result, 404)
213 1
            raise HTTPException(404, detail=result)
214 1
        status = 200
215 1
        log.debug("get_circuit result %s %s", circuit, status)
216 1
        return JSONResponse(circuit, status_code=status)
217 1
218
    # pylint: disable=too-many-branches, too-many-statements
219
    @rest("/v2/evc/", methods=["POST"])
220 1
    @validate_openapi(spec)
221 1
    def create_circuit(self, request: Request) -> JSONResponse:
222 1
        """Try to create a new circuit.
223
224
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
225
        UNI_Z's requested C-VID are available from the interfaces' pools. This
226
        is checked when creating the UNI object.
227
228
        Then, E-Line NApp requests a primary and a backup path to the
229
        Pathfinder NApp using the attributes primary_links and backup_links
230
        submitted via REST
231
232
        # For each link composing paths in #3:
233
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
234
        #  - Using the S-VID obtained, generate abstract flow entries to be
235
        #    sent to FlowManager
236
237
        Push abstract flow entries to FlowManager and FlowManager pushes
238
        OpenFlow entries to datapaths
239
240
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
241
        creation
242
243
        Finnaly, notify user of the status of its request.
244
        """
245
        # Try to create the circuit object
246
        log.debug("create_circuit /v2/evc/")
247 1
        data = get_json_or_400(request, self.controller.loop)
248 1
249
        try:
250 1
            evc = self._evc_from_dict(data)
251 1
        except (ValueError, KytosTagError) as exception:
252 1
            log.debug("create_circuit result %s %s", exception, 400)
253 1
            raise HTTPException(400, detail=str(exception)) from exception
254 1
        if evc.primary_path:
255 1
            try:
256 1
                evc.primary_path.is_valid(
257 1
                    evc.uni_a.interface.switch,
258 1
                    evc.uni_z.interface.switch,
259 1
                    bool(evc.circuit_scheduler),
260
                )
261
            except InvalidPath as exception:
262
                raise HTTPException(
263
                    400,
264 1
                    detail=f"primary_path is not valid: {exception}"
265 1
                ) from exception
266 1
        if evc.backup_path:
267
            try:
268
                evc.backup_path.is_valid(
269
                    evc.uni_a.interface.switch,
270
                    evc.uni_z.interface.switch,
271 1
                    bool(evc.circuit_scheduler),
272 1
                )
273
            except InvalidPath as exception:
274
                raise HTTPException(
275
                    400,
276 1
                    detail=f"backup_path is not valid: {exception}"
277 1
                ) from exception
278 1
279
        if not evc._tag_lists_equal():
280
            detail = "UNI_A and UNI_Z tag lists should be the same."
281
            raise HTTPException(400, detail=detail)
282
283 1
        try:
284 1
            evc._validate_has_primary_or_dynamic()
285
        except ValueError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288
        try:
289 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
290 1
        except DuplicatedNoTagUNI as exception:
291 1
            log.debug("create_circuit result %s %s", exception, 409)
292
            raise HTTPException(409, detail=str(exception)) from exception
293 1
294 1
        try:
295 1
            self._use_uni_tags(evc)
296 1
        except KytosTagError as exception:
297
            raise HTTPException(400, detail=str(exception)) from exception
298 1
299 1
        # save circuit
300
        try:
301
            evc.sync()
302
        except ValidationError as exception:
303
            raise HTTPException(400, detail=str(exception)) from exception
304 1
305 1
        # store circuit in dictionary
306 1
        self.circuits[evc.id] = evc
307 1
308
        # Schedule the circuit deploy
309
        self.sched.add(evc)
310 1
311 1
        # Circuit has no schedule, deploy now
312
        deployed = False
313
        if not evc.circuit_scheduler:
314
            with evc.lock:
315
                deployed = evc.deploy()
316 1
317
        # Notify users
318
        result = {"circuit_id": evc.id, "deployed": deployed}
319 1
        status = 201
320
        log.debug("create_circuit result %s %s", result, status)
321
        emit_event(self.controller, name="created",
322 1
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324 1
325 1
    @staticmethod
326
    def _use_uni_tags(evc):
327
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334
            raise err
335 1
336 1
    @listen_to('kytos/flow_manager.flow.removed')
337 1
    def on_flow_delete(self, event):
338 1
        """Capture delete messages to keep track when flows got removed."""
339 1
        self.handle_flow_delete(event)
340 1
341 1
    def handle_flow_delete(self, event):
342 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348
349
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350
    @validate_openapi(spec)
351 1
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353 1
354 1
        The EVC attributes (creation_time, active, current_path,
355 1
        failover_path, _id, archived) can't be updated.
356 1
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362
        except KeyError:
363
            result = f"circuit_id {circuit_id} not found"
364
            log.debug("update result %s %s", result, 404)
365
            raise HTTPException(404, detail=result) from KeyError
366
367 1
        try:
368 1
            updated_data = self._evc_dict_with_instances(data)
369 1
            self._check_no_tag_duplication(
370 1
                circuit_id, updated_data.get("uni_a"),
371 1
                updated_data.get("uni_z")
372 1
            )
373 1
            enable, redeploy = evc.update(**updated_data)
374 1
        except (ValueError, KytosTagError, ValidationError) as exception:
375 1
            log.debug("update result %s %s", exception, 400)
376
            raise HTTPException(400, detail=str(exception)) from exception
377 1
        except DuplicatedNoTagUNI as exception:
378 1
            log.debug("update result %s %s", exception, 409)
379 1
            raise HTTPException(409, detail=str(exception)) from exception
380
        except DisabledSwitch as exception:
381
            log.debug("update result %s %s", exception, 409)
382
            raise HTTPException(
383 1
                    409,
384 1
                    detail=f"Path is not valid: {exception}"
385 1
                ) from exception
386 1
        redeployed = False
387 1
        if evc.is_active():
388
            if enable is False:  # disable if active
389
                with evc.lock:
390 1
                    evc.remove()
391 1
            elif redeploy is not None:  # redeploy if active
392 1
                with evc.lock:
393
                    evc.remove()
394
                    redeployed = evc.deploy()
395
        else:
396 1
            if enable is True:  # enable if inactive
397 1
                with evc.lock:
398
                    redeployed = evc.deploy()
399
            elif evc.is_enabled() and redeploy:
400
                with evc.lock:
401
                    evc.remove()
402
                    redeployed = evc.deploy()
403
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
404
        status = 200
405
406 1
        log.debug("update result %s %s", result, status)
407 1
        emit_event(self.controller, "updated",
408 1
                   content=map_evc_event_content(evc, **data))
409 1
        return JSONResponse(result, status_code=status)
410 1
411 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
412 1
    def delete_circuit(self, request: Request) -> JSONResponse:
413 1
        """Remove a circuit.
414 1
415
        First, the flows are removed from the switches, and then the EVC is
416 1
        disabled.
417 1
        """
418
        circuit_id = request.path_params["circuit_id"]
419 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
420
        try:
421 1
            evc = self.circuits.pop(circuit_id)
422 1
        except KeyError:
423
            result = f"circuit_id {circuit_id} not found"
424
            log.debug("delete_circuit result %s %s", result, 404)
425
            raise HTTPException(404, detail=result) from KeyError
426
        log.info("Removing %s", evc)
427
428 1
        with evc.lock:
429 1
            if not evc.archived:
430 1
                evc.deactivate()
431 1
                evc.disable()
432 1
                self.sched.remove(evc)
433 1
                evc.remove_current_flows(sync=False)
434 1
                evc.remove_failover_flows(sync=False)
435 1
                evc.archive()
436 1
                evc.remove_uni_tags()
437
                evc.sync()
438 1
                emit_event(
439 1
                    self.controller, "deleted",
440 1
                    content=map_evc_event_content(evc)
441 1
                )
442 1
443 1
        log.info("EVC removed. %s", evc)
444 1
        result = {"response": f"Circuit {circuit_id} removed"}
445 1
        status = 200
446 1
        log.debug("delete_circuit result %s %s", result, status)
447 1
448 1
        return JSONResponse(result, status_code=status)
449
450
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
451
    def get_metadata(self, request: Request) -> JSONResponse:
452
        """Get metadata from an EVC."""
453 1
        circuit_id = request.path_params["circuit_id"]
454 1
        try:
455 1
            return (
456 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
457
            )
458 1
        except KeyError as error:
459
            raise HTTPException(
460 1
                404,
461 1
                detail=f"circuit_id {circuit_id} not found."
462
            ) from error
463 1
464 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
465 1
    @validate_openapi(spec)
466
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
467
        """Add metadata to a bulk of EVCs."""
468
        data = get_json_or_400(request, self.controller.loop)
469
        circuit_ids = data.pop("circuit_ids")
470
471
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
472
473
        fail_evcs = []
474 1
        for _id in circuit_ids:
475 1
            try:
476 1
                evc = self.circuits[_id]
477
                evc.extend_metadata(data)
478 1
            except KeyError:
479 1
                fail_evcs.append(_id)
480
481 1
        if fail_evcs:
482
            raise HTTPException(404, detail=fail_evcs)
483 1
        return JSONResponse("Operation successful", status_code=201)
484 1
485 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
486 1
    @validate_openapi(spec)
487 1
    def add_metadata(self, request: Request) -> JSONResponse:
488 1
        """Add metadata to an EVC."""
489 1
        circuit_id = request.path_params["circuit_id"]
490
        metadata = get_json_or_400(request, self.controller.loop)
491 1
        if not isinstance(metadata, dict):
492 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
493 1
        try:
494
            evc = self.circuits[circuit_id]
495 1
        except KeyError as error:
496 1
            raise HTTPException(
497 1
                404,
498
                detail=f"circuit_id {circuit_id} not found."
499 1
            ) from error
500 1
501 1
        evc.extend_metadata(metadata)
502
        evc.sync()
503 1
        return JSONResponse("Operation successful", status_code=201)
504 1
505 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
506 1
    @validate_openapi(spec)
507
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
508
        """Delete metada from a bulk of EVCs"""
509
        data = get_json_or_400(request, self.controller.loop)
510
        key = request.path_params["key"]
511 1
        circuit_ids = data.pop("circuit_ids")
512 1
        self.mongo_controller.update_evcs_metadata(
513 1
            circuit_ids, {key: ""}, "del"
514
        )
515 1
516 1
        fail_evcs = []
517 1
        for _id in circuit_ids:
518
            try:
519 1
                evc = self.circuits[_id]
520 1
                evc.remove_metadata(key)
521 1
            except KeyError:
522 1
                fail_evcs.append(_id)
523
524
        if fail_evcs:
525
            raise HTTPException(404, detail=fail_evcs)
526 1
        return JSONResponse("Operation successful")
527 1
528 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
529 1
    def delete_metadata(self, request: Request) -> JSONResponse:
530 1
        """Delete metadata from an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532 1
        key = request.path_params["key"]
533
        try:
534 1
            evc = self.circuits[circuit_id]
535 1
        except KeyError as error:
536 1
            raise HTTPException(
537
                404,
538 1
                detail=f"circuit_id {circuit_id} not found."
539 1
            ) from error
540
541 1
        evc.remove_metadata(key)
542 1
        evc.sync()
543 1
        return JSONResponse("Operation successful")
544 1
545 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
546 1
    def redeploy(self, request: Request) -> JSONResponse:
547
        """Endpoint to force the redeployment of an EVC."""
548
        circuit_id = request.path_params["circuit_id"]
549
        try_avoid_same_s_vlan = request.query_params.get(
550
            "try_avoid_same_s_vlan", "true"
551 1
        )
552 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
553 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
554
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
555 1
            raise HTTPException(400, detail=msg)
556 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
557
        try:
558 1
            evc = self.circuits[circuit_id]
559 1
        except KeyError:
560
            raise HTTPException(
561
                404,
562 1
                detail=f"circuit_id {circuit_id} not found"
563 1
            ) from KeyError
564
        deployed = False
565
        if evc.is_enabled():
566 1
            with evc.lock:
567 1
                path_dict = evc.remove_current_flows(
568 1
                    sync=False,
569 1
                    return_path=try_avoid_same_s_vlan == "true"
570 1
                )
571
                evc.remove_failover_flows(sync=True)
572
                deployed = evc.deploy(path_dict)
573
        if deployed:
574 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
575 1
            status = 202
576 1
        else:
577 1
            result = {
578
                "response": f"Circuit {circuit_id} is disabled."
579
            }
580
            status = 409
581 1
582 1
        return JSONResponse(result, status_code=status)
583 1
584 1
    @rest("/v2/evc/schedule/", methods=["POST"])
585 1
    @validate_openapi(spec)
586
    def create_schedule(self, request: Request) -> JSONResponse:
587 1
        """
588
        Create a new schedule for a given circuit.
589
590 1
        This service do no check if there are conflicts with another schedule.
591
        Payload example:
592 1
            {
593
              "circuit_id":"aa:bb:cc",
594 1
              "schedule": {
595 1
                "date": "2019-08-07T14:52:10.967Z",
596 1
                "interval": "string",
597
                "frequency": "1 * * * *",
598
                "action": "create"
599
              }
600
            }
601
        """
602
        log.debug("create_schedule /v2/evc/schedule/")
603
        data = get_json_or_400(request, self.controller.loop)
604
        circuit_id = data["circuit_id"]
605
        schedule_data = data["schedule"]
606
607
        # Get EVC from circuits buffer
608
        circuits = self._get_circuits_buffer()
609
610
        # get the circuit
611
        evc = circuits.get(circuit_id)
612 1
613 1
        # get the circuit
614 1
        if not evc:
615 1
            result = f"circuit_id {circuit_id} not found"
616
            log.debug("create_schedule result %s %s", result, 404)
617
            raise HTTPException(404, detail=result)
618 1
619
        # new schedule from dict
620
        new_schedule = CircuitSchedule.from_dict(schedule_data)
621 1
622
        # If there is no schedule, create the list
623
        if not evc.circuit_scheduler:
624 1
            evc.circuit_scheduler = []
625 1
626 1
        # Add the new schedule
627 1
        evc.circuit_scheduler.append(new_schedule)
628
629
        # Add schedule job
630 1
        self.sched.add_circuit_job(evc, new_schedule)
631
632
        # save circuit to mongodb
633 1
        evc.sync()
634 1
635
        result = new_schedule.as_dict()
636
        status = 201
637 1
638
        log.debug("create_schedule result %s %s", result, status)
639
        return JSONResponse(result, status_code=status)
640 1
641
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
642
    @validate_openapi(spec)
643 1
    def update_schedule(self, request: Request) -> JSONResponse:
644
        """Update a schedule.
645 1
646 1
        Change all attributes from the given schedule from a EVC circuit.
647
        The schedule ID is preserved as default.
648 1
        Payload example:
649 1
            {
650
              "date": "2019-08-07T14:52:10.967Z",
651 1
              "interval": "string",
652 1
              "frequency": "1 * * *",
653 1
              "action": "create"
654
            }
655
        """
656
        data = get_json_or_400(request, self.controller.loop)
657
        schedule_id = request.path_params["schedule_id"]
658
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
659
660
        # Try to find a circuit schedule
661
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
662
663
        # Can not modify circuits deleted and archived
664
        if not found_schedule:
665
            result = f"schedule_id {schedule_id} not found"
666 1
            log.debug("update_schedule result %s %s", result, 404)
667 1
            raise HTTPException(404, detail=result)
668 1
669
        new_schedule = CircuitSchedule.from_dict(data)
670
        new_schedule.id = found_schedule.id
671 1
        # Remove the old schedule
672
        evc.circuit_scheduler.remove(found_schedule)
673
        # Append the modified schedule
674 1
        evc.circuit_scheduler.append(new_schedule)
675 1
676 1
        # Cancel all schedule jobs
677 1
        self.sched.cancel_job(found_schedule.id)
678
        # Add the new circuit schedule
679 1
        self.sched.add_circuit_job(evc, new_schedule)
680 1
        # Save EVC to mongodb
681
        evc.sync()
682 1
683
        result = new_schedule.as_dict()
684 1
        status = 200
685
686
        log.debug("update_schedule result %s %s", result, status)
687 1
        return JSONResponse(result, status_code=status)
688
689 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
690
    def delete_schedule(self, request: Request) -> JSONResponse:
691 1
        """Remove a circuit schedule.
692
693 1
        Remove the Schedule from EVC.
694 1
        Remove the Schedule from cron job.
695
        Save the EVC to the Storehouse.
696 1
        """
697 1
        schedule_id = request.path_params["schedule_id"]
698
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
699 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
700 1
701
        # Can not modify circuits deleted and archived
702
        if not found_schedule:
703
            result = f"schedule_id {schedule_id} not found"
704
            log.debug("delete_schedule result %s %s", result, 404)
705
            raise HTTPException(404, detail=result)
706
707 1
        # Remove the old schedule
708 1
        evc.circuit_scheduler.remove(found_schedule)
709 1
710
        # Cancel all schedule jobs
711
        self.sched.cancel_job(found_schedule.id)
712 1
        # Save EVC to mongodb
713 1
        evc.sync()
714 1
715 1
        result = "Schedule removed"
716
        status = 200
717
718 1
        log.debug("delete_schedule result %s %s", result, status)
719
        return JSONResponse(result, status_code=status)
720
721 1
    def _check_no_tag_duplication(
722
        self,
723 1
        evc_id: str,
724
        uni_a: Optional[UNI] = None,
725 1
        uni_z: Optional[UNI] = None
726 1
    ):
727
        """Check if the given EVC has UNIs with no tag and if these are
728 1
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
729 1
        Args:
730
            evc (dict): EVC to be analyzed.
731 1
        """
732
733
        # No UNIs
734
        if not (uni_a or uni_z):
735
            return
736
737
        if (not (uni_a and not uni_a.user_tag) and
738
                not (uni_z and not uni_z.user_tag)):
739
            return
740
        for circuit in self.circuits.copy().values():
741
            if (not circuit.archived and circuit._id != evc_id):
742
                if uni_a and uni_a.user_tag is None:
743
                    circuit.check_no_tag_duplicate(uni_a)
744 1
                if uni_z and uni_z.user_tag is None:
745 1
                    circuit.check_no_tag_duplicate(uni_z)
746
747 1
    @listen_to("kytos/topology.link_up")
748
    def on_link_up(self, event):
749 1
        """Change circuit when link is up or end_maintenance."""
750 1
        self.handle_link_up(event)
751 1
752 1
    def handle_link_up(self, event):
753 1
        """Change circuit when link is up or end_maintenance."""
754 1
        log.info("Event handle_link_up %s", event.content["link"])
755 1
        for evc in self.get_evcs_by_svc_level():
756
            if evc.is_enabled() and not evc.archived:
757 1
                with evc.lock:
758 1
                    evc.handle_link_up(event.content["link"])
759
760
    # Possibly replace this with interruptions?
761
    @listen_to(
762 1
        '.*.switch.interface.(link_up|link_down|created|deleted)'
763
    )
764 1
    def on_interface_link_change(self, event: KytosEvent):
765 1
        """
766 1
        Handler for interface link_up and link_down events.
767 1
        """
768 1
        self.handle_on_interface_link_change(event)
769
770
    def handle_on_interface_link_change(self, event: KytosEvent):
771 1
        """
772
        Handler to sort interface events {link_(up, down), create, deleted}
773
774 1
        To avoid multiple database updated (link flap):
775
        Every interface is identfied and processed in parallel.
776
        Once an interface event is received a time is started.
777
        While time is running self._intf_events will be updated.
778
        After time has passed last received event will be processed.
779
        """
780 1
        iface = event.content.get("interface")
781
        with self._lock_interfaces[iface.id]:
782
            _now = event.timestamp
783
            # Return out of order events
784
            if (
785
                iface.id in self._intf_events
786
                and self._intf_events[iface.id]["event"].timestamp > _now
787
            ):
788
                return
789
            self._intf_events[iface.id].update({"event": event})
790 1
            if "last_acquired" in self._intf_events[iface.id]:
791 1
                return
792 1
            self._intf_events[iface.id].update({"last_acquired": now()})
793
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
794 1
        with self._lock_interfaces[iface.id]:
795
            event = self._intf_events[iface.id]["event"]
796
            self._intf_events[iface.id].pop('last_acquired', None)
797
            _, _, event_type = event.name.rpartition('.')
798 1
            if event_type in ('link_up', 'created'):
799 1
                self.handle_interface_link_up(iface)
800 1
            elif event_type in ('link_down', 'deleted'):
801 1
                self.handle_interface_link_down(iface)
802 1
803 1
    def handle_interface_link_up(self, interface):
804 1
        """
805 1
        Handler for interface link_up events
806 1
        """
807 1
        log.info("Event handle_interface_link_up %s", interface)
808 1
        for evc in self.get_evcs_by_svc_level():
809 1
            if _does_uni_affect_evc(evc, interface, "up"):
810 1
                with evc.lock:
811 1
                    evc.handle_interface_link_up(
812
                        interface
813 1
                    )
814
815
    def handle_interface_link_down(self, interface):
816
        """
817
        Handler for interface link_down events
818
        """
819
        log.info("Event handle_interface_link_down %s", interface)
820
        for evc in self.get_evcs_by_svc_level():
821
            if _does_uni_affect_evc(evc, interface, "down"):
822
                with evc.lock:
823
                    evc.handle_interface_link_down(
824
                        interface
825 1
                    )
826
827
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
828
    def on_link_down(self, event):
829
        """Change circuit when link is down or under_mantenance."""
830
        self.handle_link_down(event)
831
832
    def prepare_swap_to_failover_flow(self, evc: EVC):
833
        """Prepare an evc for switching to failover."""
834
        install_flows = {}
835
        try:
836
            install_flows = evc.get_failover_flows()
837 1
        # pylint: disable=broad-except
838 1
        except Exception:
839
            err = traceback.format_exc()
840
            log.error(
841
                "Ignore Failover path for "
842 1
                f"{evc} due to error: {err}"
843
            )
844
        return install_flows
845
846
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
847
        """Prepare event contents for swap to failover."""
848
        return map_evc_event_content(
849
            evc,
850
            flows=deepcopy(install_flow)
851
        )
852
853
    def execute_swap_to_failover(
854
        self,
855
        evcs: list[EVC]
856 1
    ) -> tuple[list[EVC], list[EVC]]:
857
        """Process changes needed to commit a swap to failover."""
858
        event_contents = {}
859
        install_flows = {}
860
        swapped_evcs = list[EVC]()
861
        not_swapped_evcs = list[EVC]()
862
863 1
        for evc in evcs:
864
            install_flow = self.prepare_swap_to_failover_flow(evc)
865
            if install_flow:
866
                install_flows = merge_flow_dicts(install_flows, install_flow)
867
                event_contents[evc.id] =\
868 1
                    self.prepare_swap_to_failover_event(evc, install_flow)
869 1
                swapped_evcs.append(evc)
870 1
            else:
871 1
                not_swapped_evcs.append(evc)
872
873 1
        try:
874 1
            send_flow_mods_http(
875 1
                install_flows,
876 1
                "install"
877 1
            )
878
            for evc in swapped_evcs:
879 1
                temp_path = evc.current_path
880
                evc.current_path = evc.failover_path
881 1
                evc.failover_path = temp_path
882
            emit_event(
883 1
                self.controller, "failover_link_down",
884 1
                content=deepcopy(event_contents)
885
            )
886
            return swapped_evcs, not_swapped_evcs
887
        except FlowModException as exc:
888 1
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
889 1
            return [], [*swapped_evcs, *not_swapped_evcs]
890 1
891 1
    def prepare_clear_failover_flow(self, evc: EVC):
892 1
        """Prepare an evc for clearing the old path."""
893
        del_flows = {}
894
        try:
895
            del_flows = prepare_delete_flow(
896 1
                merge_flow_dicts(
897 1
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
898 1
                    evc._prepare_nni_flows(evc.failover_path)
899 1
                )
900
            )
901 1
        # pylint: disable=broad-except
902
        except Exception:
903
            err = traceback.format_exc()
904
            log.error(f"Fail to remove {evc} old_path: {err}")
905
        return del_flows
906
907
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
908
        """Prepare event contents for clearing failover."""
909
        return map_evc_event_content(
910
            evc,
911
            current_path=evc.current_path.as_dict(),
912
            removed_flows=deepcopy(delete_flow)
913
        )
914
915
    def execute_clear_failover(
916
        self,
917 1
        evcs: list[EVC]
918
    ) -> tuple[list[EVC], list[EVC]]:
919
        """Process changes needed to commit clearing the failover path"""
920
        event_contents = {}
921
        delete_flows = {}
922
        cleared_evcs = list[EVC]()
923
        not_cleared_evcs = list[EVC]()
924
925 1
        for evc in evcs:
926
            delete_flow = self.prepare_clear_failover_flow(evc)
927
            if delete_flow:
928
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
929
                event_contents[evc.id] =\
930 1
                    self.prepare_clear_failover_event(evc, delete_flow)
931 1
                cleared_evcs.append(evc)
932 1
            else:
933 1
                not_cleared_evcs.append(evc)
934
935 1
        try:
936 1
            send_flow_mods_http(
937 1
                delete_flows,
938 1
                'delete'
939 1
            )
940
            for evc in cleared_evcs:
941 1
                evc.failover_path.make_vlans_available(self.controller)
942
                evc.failover_path = Path([])
943 1
            emit_event(
944
                self.controller,
945 1
                "failover_old_path",
946 1
                content=event_contents
947
            )
948
            return cleared_evcs, not_cleared_evcs
949
        except FlowModException as exc:
950 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
951 1
            return [], [*cleared_evcs, *not_cleared_evcs]
952 1
953 1
    def prepare_undeploy_flow(self, evc: EVC):
954
        """Prepare an evc for undeploying."""
955
        del_flows = {}
956
        try:
957
            del_flows = prepare_delete_flow(
958 1
                merge_flow_dicts(
959 1
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
960 1
                    evc._prepare_nni_flows(evc.current_path),
961 1
                    evc._prepare_nni_flows(evc.failover_path)
962
                )
963 1
            )
964
        # pylint: disable=broad-except
965
        except Exception:
966
            err = traceback.format_exc()
967
            log.error(f"Fail to undeploy {evc}: {err}")
968
        return del_flows
969
970
    def execute_undeploy(self, evcs: list[EVC]):
971
        """Process changes needed to commit an undeploy"""
972
        delete_flows = {}
973
        undeploy_evcs = list[EVC]()
974
        not_undeploy_evcs = list[EVC]()
975
976
        for evc in evcs:
977
            delete_flow = self.prepare_undeploy_flow(evc)
978
            if delete_flow:
979
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
980 1
                undeploy_evcs.append(evc)
981
            else:
982 1
                not_undeploy_evcs.append(evc)
983 1
984 1
        try:
985
            send_flow_mods_http(
986 1
                delete_flows,
987 1
                'delete'
988 1
            )
989 1
990 1
            for evc in undeploy_evcs:
991
                evc.current_path.make_vlans_available(self.controller)
992 1
                evc.failover_path.make_vlans_available(self.controller)
993
                evc.current_path = Path([])
994 1
                evc.failover_path = Path([])
995 1
                evc.deactivate()
996
                emit_event(
997
                    self.controller,
998
                    "need_redeploy",
999
                    content={"evc_id": evc.id}
1000 1
                )
1001 1
                log.info(f"{evc} scheduled for redeploy")
1002 1
            return undeploy_evcs, not_undeploy_evcs
1003 1
        except FlowModException as exc:
1004 1
            log.error(
1005 1
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1006 1
            )
1007
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1008
1009
    def handle_link_down(self, event):
1010
        """Change circuit when link is down or under_mantenance."""
1011 1
        link = event.content["link"]
1012 1
        log.info("Event handle_link_down %s", link)
1013 1
1014 1
        with ExitStack() as exit_stack:
1015
            exit_stack.enter_context(self.multi_evc_lock)
1016
            swap_to_failover = list[EVC]()
1017 1
            undeploy = list[EVC]()
1018
            clear_failover = list[EVC]()
1019 1
            evcs_to_update = dict[str, EVC]()
1020
1021 1
            for evc in self.get_evcs_by_svc_level():
1022 1
                with ExitStack() as sub_stack:
1023
                    sub_stack.enter_context(evc.lock)
1024 1
                    if all((
1025 1
                        evc.is_affected_by_link(link),
1026 1
                        evc.failover_path,
1027 1
                        not evc.is_failover_path_affected_by_link(link)
1028 1
                    )):
1029 1
                        swap_to_failover.append(evc)
1030
                    elif all((
1031 1
                        evc.is_affected_by_link(link),
1032 1
                        not evc.failover_path or
1033 1
                        evc.is_failover_path_affected_by_link(link)
1034 1
                    )):
1035
                        undeploy.append(evc)
1036
                    elif all((
1037
                        not evc.is_affected_by_link(link),
1038
                        evc.failover_path,
1039 1
                        evc.is_failover_path_affected_by_link(link)
1040 1
                    )):
1041
                        clear_failover.append(evc)
1042
                    else:
1043
                        continue
1044
1045 1
                    exit_stack.push(sub_stack.pop_all())
1046 1
1047
            # Swap from current path to failover path
1048
1049
            if swap_to_failover:
1050
                success, failure = self.execute_swap_to_failover(
1051 1
                    swap_to_failover
1052
                )
1053
1054
                clear_failover.extend(success)
1055 1
1056
                evcs_to_update.update((evc.id, evc) for evc in success)
1057
1058
                undeploy.extend(failure)
1059 1
1060 1
            # Clear out failover path
1061
1062
            if clear_failover:
1063
                success, failure = self.execute_clear_failover(clear_failover)
1064 1
1065
                evcs_to_update.update((evc.id, evc) for evc in success)
1066 1
1067
                undeploy.extend(failure)
1068 1
1069
            # Undeploy the evc, schedule a redeploy
1070
1071
            if undeploy:
1072 1
                success, failure = self.execute_undeploy(undeploy)
1073 1
1074
                evcs_to_update.update((evc.id, evc) for evc in success)
1075 1
1076
                if failure:
1077 1
                    log.error(f"Failed to handle_link_down for {failure}")
1078
1079
            # Push update to DB
1080
1081 1
            if evcs_to_update:
1082 1
                self.mongo_controller.update_evcs(
1083
                    [evc.as_dict() for evc in evcs_to_update.values()]
1084 1
                )
1085
1086 1
    @listen_to("kytos/mef_eline.need_redeploy")
1087 1
    def on_evc_need_redeploy(self, event):
1088
        """Redeploy evcs that need to be redeployed."""
1089
        self.handle_evc_need_redeploy(event)
1090
1091 1
    def handle_evc_need_redeploy(self, event):
1092 1
        """Redeploy evcs that need to be redeployed."""
1093
        evc = self.circuits.get(event.content["evc_id"])
1094
        if evc is None:
1095
            return
1096 1
        with evc.lock:
1097 1
            if not evc.is_enabled() or evc.is_active():
1098
                return
1099
            result = evc.deploy()
1100
        event_name = "error_redeploy_link_down"
1101 1
        if result:
1102
            log.info(f"{evc} redeployed")
1103
            event_name = "redeployed_link_down"
1104
        emit_event(self.controller, event_name,
1105
                   content=map_evc_event_content(evc))
1106
1107
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1108
    def on_evc_affected_by_link_down(self, event):
1109
        """Change circuit when link is down or under_mantenance."""
1110
        self.handle_evc_affected_by_link_down(event)
1111
1112
    def handle_evc_affected_by_link_down(self, event):
1113
        """Change circuit when link is down or under_mantenance."""
1114
        evc = self.circuits.get(event.content["evc_id"])
1115
        link = event.content['link']
1116
        if not evc:
1117 1
            return
1118 1
        with evc.lock:
1119
            if not evc.is_affected_by_link(link):
1120
                return
1121
            result = evc.handle_link_down()
1122 1
        event_name = "error_redeploy_link_down"
1123
        if result:
1124 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1125 1
            event_name = "redeployed_link_down"
1126 1
        emit_event(self.controller, event_name,
1127 1
                   content=map_evc_event_content(evc))
1128 1
1129 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1130
    def on_evc_deployed(self, event):
1131 1
        """Handle EVC deployed|redeployed_link_down."""
1132 1
        self.handle_evc_deployed(event)
1133 1
1134 1
    def handle_evc_deployed(self, event):
1135 1
        """Setup failover path on evc deployed."""
1136 1
        evc = self.circuits.get(event.content["evc_id"])
1137
        if not evc:
1138
            return
1139 1
        evc.try_setup_failover_path()
1140 1
1141
    @listen_to("kytos/topology.topology_loaded")
1142
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1143
        """Load EVCs once the topology is available."""
1144 1
        self.load_all_evcs()
1145
1146
    def load_all_evcs(self):
1147
        """Try to load all EVCs on startup."""
1148
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1149
        for circuit_id, circuit in circuits:
1150
            if circuit_id not in self.circuits:
1151 1
                self._load_evc(circuit)
1152 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1153
                   timeout=1)
1154
1155
    def _load_evc(self, circuit_dict):
1156 1
        """Load one EVC from mongodb to memory."""
1157
        try:
1158 1
            evc = self._evc_from_dict(circuit_dict)
1159 1
        except (ValueError, KytosTagError) as exception:
1160 1
            log.error(
1161 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1162 1
            )
1163
            return None
1164
        if evc.archived:
1165 1
            return None
1166
1167 1
        self.circuits.setdefault(evc.id, evc)
1168 1
        self.sched.add(evc)
1169 1
        return evc
1170 1
1171
    @listen_to("kytos/flow_manager.flow.error")
1172
    def on_flow_mod_error(self, event):
1173 1
        """Handle flow mod errors related to an EVC."""
1174 1
        self.handle_flow_mod_error(event)
1175 1
1176
    def handle_flow_mod_error(self, event):
1177 1
        """Handle flow mod errors related to an EVC."""
1178 1
        flow = event.content["flow"]
1179 1
        command = event.content.get("error_command")
1180
        if command != "add":
1181 1
            return
1182 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1183
        if not evc or evc.archived or not evc.is_enabled():
1184
            return
1185
        with evc.lock:
1186 1
            evc.remove_current_flows(sync=False)
1187
            evc.remove_failover_flows(sync=True)
1188 1
1189 1
    def _evc_dict_with_instances(self, evc_dict):
1190 1
        """Convert some dict values to instance of EVC classes.
1191 1
1192 1
        This method will convert: [UNI, Link]
1193 1
        """
1194 1
        data = evc_dict.copy()  # Do not modify the original dict
1195 1
        for attribute, value in data.items():
1196 1
            # Get multiple attributes.
1197 1
            # Ex: uni_a, uni_z
1198
            if "uni" in attribute:
1199 1
                try:
1200
                    data[attribute] = self._uni_from_dict(value)
1201
                except ValueError as exception:
1202
                    result = "Error creating UNI: Invalid value"
1203
                    raise ValueError(result) from exception
1204 1
1205 1
            if attribute == "circuit_scheduler":
1206
                data[attribute] = []
1207
                for schedule in value:
1208 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1209 1
1210 1
            # Get multiple attributes.
1211 1
            # Ex: primary_links,
1212 1
            #     backup_links,
1213 1
            #     current_links_cache,
1214
            #     primary_links_cache,
1215 1
            #     backup_links_cache
1216 1
            if "links" in attribute:
1217 1
                data[attribute] = [
1218 1
                    self._link_from_dict(link, attribute) for link in value
1219
                ]
1220
1221
            # Ex: current_path,
1222
            #     primary_path,
1223
            #     backup_path
1224
            if (attribute.endswith("path") and
1225
                    attribute != "dynamic_backup_path"):
1226 1
                data[attribute] = Path(
1227 1
                    [self._link_from_dict(link, attribute) for link in value]
1228
                )
1229
1230
        return data
1231
1232
    def _evc_from_dict(self, evc_dict):
1233
        data = self._evc_dict_with_instances(evc_dict)
1234 1
        data["table_group"] = self.table_group
1235
        return EVC(self.controller, **data)
1236 1
1237
    def _uni_from_dict(self, uni_dict):
1238
        """Return a UNI object from python dict."""
1239
        if uni_dict is None:
1240 1
            return False
1241
1242 1
        interface_id = uni_dict.get("interface_id")
1243 1
        interface = self.controller.get_interface_by_id(interface_id)
1244 1
        if interface is None:
1245 1
            result = (
1246
                "Error creating UNI:"
1247 1
                + f"Could not instantiate interface {interface_id}"
1248
            )
1249 1
            raise ValueError(result) from ValueError
1250 1
        tag_convert = {1: "vlan"}
1251
        tag_dict = uni_dict.get("tag", None)
1252 1
        if tag_dict:
1253 1
            tag_type = tag_dict.get("tag_type")
1254 1
            tag_type = tag_convert.get(tag_type, tag_type)
1255 1
            tag_value = tag_dict.get("value")
1256
            if isinstance(tag_value, list):
1257
                tag_value = get_tag_ranges(tag_value)
1258
                mask_list = get_vlan_tags_and_masks(tag_value)
1259 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1260 1
            else:
1261 1
                tag = TAG(tag_type, tag_value)
1262 1
        else:
1263 1
            tag = None
1264 1
        uni = UNI(interface, tag)
1265 1
        return uni
1266 1
1267 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1268 1
        """Return a Link object from python dict."""
1269 1
        id_a = link_dict.get("endpoint_a").get("id")
1270
        id_b = link_dict.get("endpoint_b").get("id")
1271 1
1272
        endpoint_a = self.controller.get_interface_by_id(id_a)
1273 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1274 1
        if not endpoint_a:
1275 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1276
            raise ValueError(error_msg)
1277 1
        if not endpoint_b:
1278
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1279 1
            raise ValueError(error_msg)
1280 1
1281
        link = Link(endpoint_a, endpoint_b)
1282 1
        allowed_paths = {"current_path", "failover_path"}
1283 1
        if "metadata" in link_dict and attribute in allowed_paths:
1284 1
            link.extend_metadata(link_dict.get("metadata"))
1285 1
1286 1
        s_vlan = link.get_metadata("s_vlan")
1287 1
        if s_vlan:
1288
            tag = TAG.from_dict(s_vlan)
1289
            if tag is False:
1290
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1291 1
                raise ValueError(error_msg)
1292 1
            link.update_metadata("s_vlan", tag)
1293 1
        return link
1294 1
1295
    def _find_evc_by_schedule_id(self, schedule_id):
1296 1
        """
1297 1
        Find an EVC and CircuitSchedule based on schedule_id.
1298 1
1299 1
        :param schedule_id: Schedule ID
1300
        :return: EVC and Schedule
1301
        """
1302 1
        circuits = self._get_circuits_buffer()
1303 1
        found_schedule = None
1304
        evc = None
1305 1
1306
        # pylint: disable=unused-variable
1307
        for c_id, circuit in circuits.items():
1308
            for schedule in circuit.circuit_scheduler:
1309
                if schedule.id == schedule_id:
1310
                    found_schedule = schedule
1311
                    evc = circuit
1312 1
                    break
1313 1
            if found_schedule:
1314 1
                break
1315
        return evc, found_schedule
1316
1317 1
    def _get_circuits_buffer(self):
1318 1
        """
1319 1
        Return the circuit buffer.
1320 1
1321 1
        If the buffer is empty, try to load data from mongodb.
1322 1
        """
1323 1
        if not self.circuits:
1324 1
            # Load circuits from mongodb to buffer
1325 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1326
            for c_id, circuit in circuits.items():
1327 1
                evc = self._evc_from_dict(circuit)
1328
                self.circuits[c_id] = evc
1329
        return self.circuits
1330
1331
    # pylint: disable=attribute-defined-outside-init
1332
    @alisten_to("kytos/of_multi_table.enable_table")
1333 1
    async def on_table_enabled(self, event):
1334
        """Handle a recently table enabled."""
1335 1
        table_group = event.content.get("mef_eline", None)
1336 1
        if not table_group:
1337 1
            return
1338 1
        for group in table_group:
1339 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1340
                log.error(f'The table group "{group}" is not allowed for '
1341
                          f'mef_eline. Allowed table groups are '
1342 1
                          f'{settings.TABLE_GROUP_ALLOWED}')
1343 1
                return
1344
        self.table_group.update(table_group)
1345 1
        content = {"group_table": self.table_group}
1346 1
        name = "kytos/mef_eline.enable_table"
1347
        await aemit_event(self.controller, name, content)
1348