Passed
Pull Request — master (#583)
by
unknown
07:27
created

build.main.Main.load_all_evcs()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 1
dl 0
loc 8
ccs 6
cts 6
cp 1
crap 3
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI, InvalidPath)
30 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
31
                                          Path)
32 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
33 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
34
                                         emit_event, get_vlan_tags_and_masks,
35
                                         map_evc_event_content,
36
                                         merge_flow_dicts, prepare_delete_flow,
37
                                         send_flow_mods_http)
38
39
40
# pylint: disable=too-many-public-methods
41 1
class Main(KytosNApp):
42
    """Main class of amlight/mef_eline NApp.
43
44
    This class is the entry point for this napp.
45
    """
46
47 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
48
49 1
    def setup(self):
50
        """Replace the '__init__' method for the KytosNApp subclass.
51
52
        The setup method is automatically called by the controller when your
53
        application is loaded.
54
55
        So, if you have any setup routine, insert it here.
56
        """
57
        # object used to scheduler circuit events
58 1
        self.sched = Scheduler()
59
60
        # object to save and load circuits
61 1
        self.mongo_controller = self.get_eline_controller()
62 1
        self.mongo_controller.bootstrap_indexes()
63
64
        # set the controller that will manager the dynamic paths
65 1
        DynamicPathManager.set_controller(self.controller)
66
67
        # dictionary of EVCs created. It acts as a circuit buffer.
68
        # Every create/update/delete must be synced to mongodb.
69 1
        self.circuits = dict[str, EVC]()
70
71 1
        self._intf_events = defaultdict(dict)
72 1
        self._lock_interfaces = defaultdict(Lock)
73 1
        self.table_group = {"epl": 0, "evpl": 0}
74 1
        self._lock = Lock()
75 1
        self.multi_evc_lock = Lock()
76 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
77
78 1
        self.load_all_evcs()
79 1
        self._topology_updated_at = None
80
81 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
82
        """Get circuits sorted by desc service level and asc creation_time.
83
84
        In the future, as more ops are offloaded it should be get from the DB.
85
        """
86 1
        if enable_filter:
87 1
            return sorted(
88
                          [circuit for circuit in self.circuits.values()
89
                           if circuit.is_enabled()],
90
                          key=lambda x: (-x.service_level, x.creation_time),
91
            )
92 1
        return sorted(self.circuits.values(),
93
                      key=lambda x: (-x.service_level, x.creation_time))
94
95 1
    @staticmethod
96 1
    def get_eline_controller():
97
        """Return the ELineController instance."""
98
        return controllers.ELineController()
99
100 1
    def execute(self):
101
        """Execute once when the napp is running."""
102 1
        if self._lock.locked():
103 1
            return
104 1
        log.debug("Starting consistency routine")
105 1
        with self._lock:
106 1
            self.execute_consistency()
107 1
        log.debug("Finished consistency routine")
108
109 1
    def should_be_checked(self, circuit):
110
        "Verify if the circuit meets the necessary conditions to be checked"
111
        # pylint: disable=too-many-boolean-expressions
112 1
        if (
113
                circuit.is_enabled()
114
                and not circuit.is_active()
115
                and not circuit.lock.locked()
116
                and not circuit.has_recent_removed_flow()
117
                and not circuit.is_recent_updated()
118
                and circuit.are_unis_active(self.controller.switches)
119
                # if a inter-switch EVC does not have current_path, it does not
120
                # make sense to run sdntrace on it
121
                and (circuit.is_intra_switch() or circuit.current_path)
122
                ):
123 1
            return True
124
        return False
125
126 1
    def execute_consistency(self):
127
        """Execute consistency routine."""
128 1
        circuits_to_check = []
129 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
130 1
            if self.should_be_checked(circuit):
131 1
                circuits_to_check.append(circuit)
132 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
133 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
134 1
        for circuit in circuits_to_check:
135 1
            is_checked = circuits_checked.get(circuit.id)
136 1
            if is_checked:
137 1
                circuit.execution_rounds = 0
138 1
                log.info(f"{circuit} enabled but inactive - activating")
139 1
                with circuit.lock:
140 1
                    circuit.activate()
141 1
                    circuit.sync()
142
            else:
143 1
                circuit.execution_rounds += 1
144 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
145 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
146 1
                    with circuit.lock:
147 1
                        circuit.deploy()
148
149 1
    def shutdown(self):
150
        """Execute when your napp is unloaded.
151
152
        If you have some cleanup procedure, insert it here.
153
        """
154
155 1
    @rest("/v2/evc/", methods=["GET"])
156 1
    def list_circuits(self, request: Request) -> JSONResponse:
157
        """Endpoint to return circuits stored.
158
159
        archive query arg if defined (not null) will be filtered
160
        accordingly, by default only non archived evcs will be listed
161
        """
162 1
        log.debug("list_circuits /v2/evc")
163 1
        args = request.query_params
164 1
        archived = args.get("archived", "false").lower()
165 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
166 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
167
                                                      metadata=args)
168 1
        circuits = circuits['circuits']
169 1
        return JSONResponse(circuits)
170
171 1
    @rest("/v2/evc/schedule", methods=["GET"])
172 1
    def list_schedules(self, _request: Request) -> JSONResponse:
173
        """Endpoint to return all schedules stored for all circuits.
174
175
        Return a JSON with the following template:
176
        [{"schedule_id": <schedule_id>,
177
         "circuit_id": <circuit_id>,
178
         "schedule": <schedule object>}]
179
        """
180 1
        log.debug("list_schedules /v2/evc/schedule")
181 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
182 1
        if not circuits:
183 1
            result = {}
184 1
            status = 200
185 1
            return JSONResponse(result, status_code=status)
186
187 1
        result = []
188 1
        status = 200
189 1
        for circuit in circuits:
190 1
            circuit_scheduler = circuit.get("circuit_scheduler")
191 1
            if circuit_scheduler:
192 1
                for scheduler in circuit_scheduler:
193 1
                    value = {
194
                        "schedule_id": scheduler.get("id"),
195
                        "circuit_id": circuit.get("id"),
196
                        "schedule": scheduler,
197
                    }
198 1
                    result.append(value)
199
200 1
        log.debug("list_schedules result %s %s", result, status)
201 1
        return JSONResponse(result, status_code=status)
202
203 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
204 1
    def get_circuit(self, request: Request) -> JSONResponse:
205
        """Endpoint to return a circuit based on id."""
206 1
        circuit_id = request.path_params["circuit_id"]
207 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
208 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
209 1
        if not circuit:
210 1
            result = f"circuit_id {circuit_id} not found"
211 1
            log.debug("get_circuit result %s %s", result, 404)
212 1
            raise HTTPException(404, detail=result)
213 1
        status = 200
214 1
        log.debug("get_circuit result %s %s", circuit, status)
215 1
        return JSONResponse(circuit, status_code=status)
216
217
    # pylint: disable=too-many-branches, too-many-statements
218 1
    @rest("/v2/evc/", methods=["POST"])
219 1
    @validate_openapi(spec)
220 1
    def create_circuit(self, request: Request) -> JSONResponse:
221
        """Try to create a new circuit.
222
223
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
224
        UNI_Z's requested C-VID are available from the interfaces' pools. This
225
        is checked when creating the UNI object.
226
227
        Then, E-Line NApp requests a primary and a backup path to the
228
        Pathfinder NApp using the attributes primary_links and backup_links
229
        submitted via REST
230
231
        # For each link composing paths in #3:
232
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
233
        #  - Using the S-VID obtained, generate abstract flow entries to be
234
        #    sent to FlowManager
235
236
        Push abstract flow entries to FlowManager and FlowManager pushes
237
        OpenFlow entries to datapaths
238
239
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
240
        creation
241
242
        Finnaly, notify user of the status of its request.
243
        """
244
        # Try to create the circuit object
245 1
        log.debug("create_circuit /v2/evc/")
246 1
        data = get_json_or_400(request, self.controller.loop)
247
248 1
        try:
249 1
            evc = self._evc_from_dict(data)
250 1
        except (ValueError, KytosTagError) as exception:
251 1
            log.debug("create_circuit result %s %s", exception, 400)
252 1
            raise HTTPException(400, detail=str(exception)) from exception
253 1
        try:
254 1
            check_disabled_component(evc.uni_a, evc.uni_z)
255 1
        except DisabledSwitch as exception:
256 1
            log.debug("create_circuit result %s %s", exception, 409)
257 1
            raise HTTPException(
258
                    409,
259
                    detail=f"Path is not valid: {exception}"
260
                ) from exception
261
262 1
        if evc.primary_path:
263 1
            try:
264 1
                evc.primary_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"primary_path is not valid: {exception}"
273
                ) from exception
274 1
        if evc.backup_path:
275 1
            try:
276 1
                evc.backup_path.is_valid(
277
                    evc.uni_a.interface.switch,
278
                    evc.uni_z.interface.switch,
279
                    bool(evc.circuit_scheduler),
280
                )
281 1
            except InvalidPath as exception:
282 1
                raise HTTPException(
283
                    400,
284
                    detail=f"backup_path is not valid: {exception}"
285
                ) from exception
286
287 1
        if not evc._tag_lists_equal():
288 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
289 1
            raise HTTPException(400, detail=detail)
290
291 1
        try:
292 1
            evc._validate_has_primary_or_dynamic()
293 1
        except ValueError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295
296 1
        try:
297 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
298
        except DuplicatedNoTagUNI as exception:
299
            log.debug("create_circuit result %s %s", exception, 409)
300
            raise HTTPException(409, detail=str(exception)) from exception
301
302 1
        try:
303 1
            self._use_uni_tags(evc)
304 1
        except KytosTagError as exception:
305 1
            raise HTTPException(400, detail=str(exception)) from exception
306
307
        # save circuit
308 1
        try:
309 1
            evc.sync()
310
        except ValidationError as exception:
311
            raise HTTPException(400, detail=str(exception)) from exception
312
313
        # store circuit in dictionary
314 1
        self.circuits[evc.id] = evc
315
316
        # Schedule the circuit deploy
317 1
        self.sched.add(evc)
318
319
        # Circuit has no schedule, deploy now
320 1
        deployed = False
321 1
        if not evc.circuit_scheduler:
322 1
            with evc.lock:
323 1
                deployed = evc.deploy()
324
325
        # Notify users
326 1
        result = {"circuit_id": evc.id, "deployed": deployed}
327 1
        status = 201
328 1
        log.debug("create_circuit result %s %s", result, status)
329 1
        emit_event(self.controller, name="created",
330
                   content=map_evc_event_content(evc))
331 1
        return JSONResponse(result, status_code=status)
332
333 1
    @staticmethod
334 1
    def _use_uni_tags(evc):
335 1
        uni_a = evc.uni_a
336 1
        evc._use_uni_vlan(uni_a)
337 1
        try:
338 1
            uni_z = evc.uni_z
339 1
            evc._use_uni_vlan(uni_z)
340 1
        except KytosTagError as err:
341 1
            evc.make_uni_vlan_available(uni_a)
342 1
            raise err
343
344 1
    @listen_to('kytos/flow_manager.flow.removed')
345 1
    def on_flow_delete(self, event):
346
        """Capture delete messages to keep track when flows got removed."""
347
        self.handle_flow_delete(event)
348
349 1
    def handle_flow_delete(self, event):
350
        """Keep track when the EVC got flows removed by deriving its cookie."""
351 1
        flow = event.content["flow"]
352 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
353 1
        if evc:
354 1
            log.debug("Flow removed in EVC %s", evc.id)
355 1
            evc.set_flow_removed_at()
356
357 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
358 1
    @validate_openapi(spec)
359 1
    def update(self, request: Request) -> JSONResponse:
360
        """Update a circuit based on payload.
361
362
        The EVC attributes (creation_time, active, current_path,
363
        failover_path, _id, archived) can't be updated.
364
        """
365 1
        data = get_json_or_400(request, self.controller.loop)
366 1
        circuit_id = request.path_params["circuit_id"]
367 1
        log.debug("update /v2/evc/%s", circuit_id)
368 1
        try:
369 1
            evc = self.circuits[circuit_id]
370 1
        except KeyError:
371 1
            result = f"circuit_id {circuit_id} not found"
372 1
            log.debug("update result %s %s", result, 404)
373 1
            raise HTTPException(404, detail=result) from KeyError
374
375 1
        try:
376 1
            updated_data = self._evc_dict_with_instances(data)
377 1
            self._check_no_tag_duplication(
378
                circuit_id, updated_data.get("uni_a"),
379
                updated_data.get("uni_z")
380
            )
381 1
            enable, redeploy = evc.update(**updated_data)
382 1
        except (ValueError, KytosTagError, ValidationError) as exception:
383 1
            log.debug("update result %s %s", exception, 400)
384 1
            raise HTTPException(400, detail=str(exception)) from exception
385 1
        except DuplicatedNoTagUNI as exception:
386
            log.debug("update result %s %s", exception, 409)
387
            raise HTTPException(409, detail=str(exception)) from exception
388 1
        except DisabledSwitch as exception:
389 1
            log.debug("update result %s %s", exception, 409)
390 1
            raise HTTPException(
391
                    409,
392
                    detail=f"Path is not valid: {exception}"
393
                ) from exception
394 1
        redeployed = False
395 1
        if evc.is_active():
396
            if enable is False:  # disable if active
397
                with evc.lock:
398
                    evc.remove()
399
            elif redeploy is not None:  # redeploy if active
400
                with evc.lock:
401
                    evc.remove()
402
                    redeployed = evc.deploy()
403
        else:
404 1
            if enable is True:  # enable if inactive
405 1
                with evc.lock:
406 1
                    redeployed = evc.deploy()
407 1
            elif evc.is_enabled() and redeploy:
408 1
                with evc.lock:
409 1
                    evc.remove()
410 1
                    redeployed = evc.deploy()
411 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
412 1
        status = 200
413
414 1
        log.debug("update result %s %s", result, status)
415 1
        emit_event(self.controller, "updated",
416
                   content=map_evc_event_content(evc, **data))
417 1
        return JSONResponse(result, status_code=status)
418
419 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
420 1
    def delete_circuit(self, request: Request) -> JSONResponse:
421
        """Remove a circuit.
422
423
        First, the flows are removed from the switches, and then the EVC is
424
        disabled.
425
        """
426 1
        circuit_id = request.path_params["circuit_id"]
427 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
428 1
        try:
429 1
            evc = self.circuits.pop(circuit_id)
430 1
        except KeyError:
431 1
            result = f"circuit_id {circuit_id} not found"
432 1
            log.debug("delete_circuit result %s %s", result, 404)
433 1
            raise HTTPException(404, detail=result) from KeyError
434 1
        log.info("Removing %s", evc)
435
436 1
        with evc.lock:
437 1
            if not evc.archived:
438 1
                evc.deactivate()
439 1
                evc.disable()
440 1
                self.sched.remove(evc)
441 1
                evc.remove_current_flows(sync=False)
442 1
                evc.remove_failover_flows(sync=False)
443 1
                evc.archive()
444 1
                evc.remove_uni_tags()
445 1
                evc.sync()
446 1
                emit_event(
447
                    self.controller, "deleted",
448
                    content=map_evc_event_content(evc)
449
                )
450
451 1
        log.info("EVC removed. %s", evc)
452 1
        result = {"response": f"Circuit {circuit_id} removed"}
453 1
        status = 200
454 1
        log.debug("delete_circuit result %s %s", result, status)
455
456 1
        return JSONResponse(result, status_code=status)
457
458 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
459 1
    def get_metadata(self, request: Request) -> JSONResponse:
460
        """Get metadata from an EVC."""
461 1
        circuit_id = request.path_params["circuit_id"]
462 1
        try:
463 1
            return (
464
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
465
            )
466
        except KeyError as error:
467
            raise HTTPException(
468
                404,
469
                detail=f"circuit_id {circuit_id} not found."
470
            ) from error
471
472 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
473 1
    @validate_openapi(spec)
474 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
475
        """Add metadata to a bulk of EVCs."""
476 1
        data = get_json_or_400(request, self.controller.loop)
477 1
        circuit_ids = data.pop("circuit_ids")
478
479 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
480
481 1
        fail_evcs = []
482 1
        for _id in circuit_ids:
483 1
            try:
484 1
                evc = self.circuits[_id]
485 1
                evc.extend_metadata(data)
486 1
            except KeyError:
487 1
                fail_evcs.append(_id)
488
489 1
        if fail_evcs:
490 1
            raise HTTPException(404, detail=fail_evcs)
491 1
        return JSONResponse("Operation successful", status_code=201)
492
493 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
494 1
    @validate_openapi(spec)
495 1
    def add_metadata(self, request: Request) -> JSONResponse:
496
        """Add metadata to an EVC."""
497 1
        circuit_id = request.path_params["circuit_id"]
498 1
        metadata = get_json_or_400(request, self.controller.loop)
499 1
        if not isinstance(metadata, dict):
500
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
501 1
        try:
502 1
            evc = self.circuits[circuit_id]
503 1
        except KeyError as error:
504 1
            raise HTTPException(
505
                404,
506
                detail=f"circuit_id {circuit_id} not found."
507
            ) from error
508
509 1
        evc.extend_metadata(metadata)
510 1
        evc.sync()
511 1
        return JSONResponse("Operation successful", status_code=201)
512
513 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
514 1
    @validate_openapi(spec)
515 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
516
        """Delete metada from a bulk of EVCs"""
517 1
        data = get_json_or_400(request, self.controller.loop)
518 1
        key = request.path_params["key"]
519 1
        circuit_ids = data.pop("circuit_ids")
520 1
        self.mongo_controller.update_evcs_metadata(
521
            circuit_ids, {key: ""}, "del"
522
        )
523
524 1
        fail_evcs = []
525 1
        for _id in circuit_ids:
526 1
            try:
527 1
                evc = self.circuits[_id]
528 1
                evc.remove_metadata(key)
529 1
            except KeyError:
530 1
                fail_evcs.append(_id)
531
532 1
        if fail_evcs:
533 1
            raise HTTPException(404, detail=fail_evcs)
534 1
        return JSONResponse("Operation successful")
535
536 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
537 1
    def delete_metadata(self, request: Request) -> JSONResponse:
538
        """Delete metadata from an EVC."""
539 1
        circuit_id = request.path_params["circuit_id"]
540 1
        key = request.path_params["key"]
541 1
        try:
542 1
            evc = self.circuits[circuit_id]
543 1
        except KeyError as error:
544 1
            raise HTTPException(
545
                404,
546
                detail=f"circuit_id {circuit_id} not found."
547
            ) from error
548
549 1
        evc.remove_metadata(key)
550 1
        evc.sync()
551 1
        return JSONResponse("Operation successful")
552
553 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
554 1
    def redeploy(self, request: Request) -> JSONResponse:
555
        """Endpoint to force the redeployment of an EVC."""
556 1
        circuit_id = request.path_params["circuit_id"]
557 1
        try_avoid_same_s_vlan = request.query_params.get(
558
            "try_avoid_same_s_vlan", "true"
559
        )
560 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
561 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
562
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
563
            raise HTTPException(400, detail=msg)
564 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
565 1
        try:
566 1
            evc = self.circuits[circuit_id]
567 1
        except KeyError:
568 1
            raise HTTPException(
569
                404,
570
                detail=f"circuit_id {circuit_id} not found"
571
            ) from KeyError
572 1
        deployed = False
573 1
        if evc.is_enabled():
574 1
            with evc.lock:
575 1
                path_dict = evc.remove_current_flows(
576
                    sync=False,
577
                    return_path=try_avoid_same_s_vlan == "true"
578
                )
579 1
                evc.remove_failover_flows(sync=True)
580 1
                deployed = evc.deploy(path_dict)
581 1
        if deployed:
582 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
583 1
            status = 202
584
        else:
585 1
            result = {
586
                "response": f"Circuit {circuit_id} is disabled."
587
            }
588 1
            status = 409
589
590 1
        return JSONResponse(result, status_code=status)
591
592 1
    @rest("/v2/evc/schedule/", methods=["POST"])
593 1
    @validate_openapi(spec)
594 1
    def create_schedule(self, request: Request) -> JSONResponse:
595
        """
596
        Create a new schedule for a given circuit.
597
598
        This service do no check if there are conflicts with another schedule.
599
        Payload example:
600
            {
601
              "circuit_id":"aa:bb:cc",
602
              "schedule": {
603
                "date": "2019-08-07T14:52:10.967Z",
604
                "interval": "string",
605
                "frequency": "1 * * * *",
606
                "action": "create"
607
              }
608
            }
609
        """
610 1
        log.debug("create_schedule /v2/evc/schedule/")
611 1
        data = get_json_or_400(request, self.controller.loop)
612 1
        circuit_id = data["circuit_id"]
613 1
        schedule_data = data["schedule"]
614
615
        # Get EVC from circuits buffer
616 1
        circuits = self._get_circuits_buffer()
617
618
        # get the circuit
619 1
        evc = circuits.get(circuit_id)
620
621
        # get the circuit
622 1
        if not evc:
623 1
            result = f"circuit_id {circuit_id} not found"
624 1
            log.debug("create_schedule result %s %s", result, 404)
625 1
            raise HTTPException(404, detail=result)
626
627
        # new schedule from dict
628 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
629
630
        # If there is no schedule, create the list
631 1
        if not evc.circuit_scheduler:
632 1
            evc.circuit_scheduler = []
633
634
        # Add the new schedule
635 1
        evc.circuit_scheduler.append(new_schedule)
636
637
        # Add schedule job
638 1
        self.sched.add_circuit_job(evc, new_schedule)
639
640
        # save circuit to mongodb
641 1
        evc.sync()
642
643 1
        result = new_schedule.as_dict()
644 1
        status = 201
645
646 1
        log.debug("create_schedule result %s %s", result, status)
647 1
        return JSONResponse(result, status_code=status)
648
649 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
650 1
    @validate_openapi(spec)
651 1
    def update_schedule(self, request: Request) -> JSONResponse:
652
        """Update a schedule.
653
654
        Change all attributes from the given schedule from a EVC circuit.
655
        The schedule ID is preserved as default.
656
        Payload example:
657
            {
658
              "date": "2019-08-07T14:52:10.967Z",
659
              "interval": "string",
660
              "frequency": "1 * * *",
661
              "action": "create"
662
            }
663
        """
664 1
        data = get_json_or_400(request, self.controller.loop)
665 1
        schedule_id = request.path_params["schedule_id"]
666 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
667
668
        # Try to find a circuit schedule
669 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
670
671
        # Can not modify circuits deleted and archived
672 1
        if not found_schedule:
673 1
            result = f"schedule_id {schedule_id} not found"
674 1
            log.debug("update_schedule result %s %s", result, 404)
675 1
            raise HTTPException(404, detail=result)
676
677 1
        new_schedule = CircuitSchedule.from_dict(data)
678 1
        new_schedule.id = found_schedule.id
679
        # Remove the old schedule
680 1
        evc.circuit_scheduler.remove(found_schedule)
681
        # Append the modified schedule
682 1
        evc.circuit_scheduler.append(new_schedule)
683
684
        # Cancel all schedule jobs
685 1
        self.sched.cancel_job(found_schedule.id)
686
        # Add the new circuit schedule
687 1
        self.sched.add_circuit_job(evc, new_schedule)
688
        # Save EVC to mongodb
689 1
        evc.sync()
690
691 1
        result = new_schedule.as_dict()
692 1
        status = 200
693
694 1
        log.debug("update_schedule result %s %s", result, status)
695 1
        return JSONResponse(result, status_code=status)
696
697 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
698 1
    def delete_schedule(self, request: Request) -> JSONResponse:
699
        """Remove a circuit schedule.
700
701
        Remove the Schedule from EVC.
702
        Remove the Schedule from cron job.
703
        Save the EVC to the Storehouse.
704
        """
705 1
        schedule_id = request.path_params["schedule_id"]
706 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
707 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
708
709
        # Can not modify circuits deleted and archived
710 1
        if not found_schedule:
711 1
            result = f"schedule_id {schedule_id} not found"
712 1
            log.debug("delete_schedule result %s %s", result, 404)
713 1
            raise HTTPException(404, detail=result)
714
715
        # Remove the old schedule
716 1
        evc.circuit_scheduler.remove(found_schedule)
717
718
        # Cancel all schedule jobs
719 1
        self.sched.cancel_job(found_schedule.id)
720
        # Save EVC to mongodb
721 1
        evc.sync()
722
723 1
        result = "Schedule removed"
724 1
        status = 200
725
726 1
        log.debug("delete_schedule result %s %s", result, status)
727 1
        return JSONResponse(result, status_code=status)
728
729 1
    def _check_no_tag_duplication(
730
        self,
731
        evc_id: str,
732
        uni_a: Optional[UNI] = None,
733
        uni_z: Optional[UNI] = None
734
    ):
735
        """Check if the given EVC has UNIs with no tag and if these are
736
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
737
        Args:
738
            evc (dict): EVC to be analyzed.
739
        """
740
741
        # No UNIs
742 1
        if not (uni_a or uni_z):
743 1
            return
744
745 1
        if (not (uni_a and not uni_a.user_tag) and
746
                not (uni_z and not uni_z.user_tag)):
747 1
            return
748 1
        for circuit in self.circuits.copy().values():
749 1
            if (not circuit.archived and circuit._id != evc_id):
750 1
                if uni_a and uni_a.user_tag is None:
751 1
                    circuit.check_no_tag_duplicate(uni_a)
752 1
                if uni_z and uni_z.user_tag is None:
753 1
                    circuit.check_no_tag_duplicate(uni_z)
754
755 1
    @listen_to("kytos/topology.link_up")
756 1
    def on_link_up(self, event):
757
        """Change circuit when link is up or end_maintenance."""
758
        self.handle_link_up(event)
759
760 1
    def handle_link_up(self, event):
761
        """Change circuit when link is up or end_maintenance."""
762 1
        log.info("Event handle_link_up %s", event.content["link"])
763 1
        for evc in self.get_evcs_by_svc_level():
764 1
            if evc.is_enabled() and not evc.archived:
765 1
                with evc.lock:
766 1
                    evc.handle_link_up(event.content["link"])
767
768
    # Possibly replace this with interruptions?
769 1
    @listen_to(
770
        '.*.switch.interface.(link_up|link_down|created|deleted)'
771
    )
772 1
    def on_interface_link_change(self, event: KytosEvent):
773
        """
774
        Handler for interface link_up and link_down events.
775
        """
776
        self.handle_on_interface_link_change(event)
777
778 1
    def handle_on_interface_link_change(self, event: KytosEvent):
779
        """
780
        Handler to sort interface events {link_(up, down), create, deleted}
781
782
        To avoid multiple database updated (link flap):
783
        Every interface is identfied and processed in parallel.
784
        Once an interface event is received a time is started.
785
        While time is running self._intf_events will be updated.
786
        After time has passed last received event will be processed.
787
        """
788 1
        iface = event.content.get("interface")
789 1
        with self._lock_interfaces[iface.id]:
790 1
            _now = event.timestamp
791
            # Return out of order events
792 1
            if (
793
                iface.id in self._intf_events
794
                and self._intf_events[iface.id]["event"].timestamp > _now
795
            ):
796 1
                return
797 1
            self._intf_events[iface.id].update({"event": event})
798 1
            if "last_acquired" in self._intf_events[iface.id]:
799 1
                return
800 1
            self._intf_events[iface.id].update({"last_acquired": now()})
801 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
802 1
        with self._lock_interfaces[iface.id]:
803 1
            event = self._intf_events[iface.id]["event"]
804 1
            self._intf_events[iface.id].pop('last_acquired', None)
805 1
            _, _, event_type = event.name.rpartition('.')
806 1
            if event_type in ('link_up', 'created'):
807 1
                self.handle_interface_link_up(iface)
808 1
            elif event_type in ('link_down', 'deleted'):
809 1
                self.handle_interface_link_down(iface)
810
811 1
    def handle_interface_link_up(self, interface):
812
        """
813
        Handler for interface link_up events
814
        """
815
        log.info("Event handle_interface_link_up %s", interface)
816
        for evc in self.get_evcs_by_svc_level():
817
            with evc.lock:
818
                evc.handle_interface_link_up(
819
                    interface
820
                )
821
822 1
    def handle_interface_link_down(self, interface):
823
        """
824
        Handler for interface link_down events
825
        """
826
        log.info("Event handle_interface_link_down %s", interface)
827
        for evc in self.get_evcs_by_svc_level():
828
            with evc.lock:
829
                evc.handle_interface_link_down(
830
                    interface
831
                )
832
833 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
834 1
    def on_link_down(self, event):
835
        """Change circuit when link is down or under_mantenance."""
836
        self.handle_link_down(event)
837
838 1
    def prepare_swap_to_failover(self, evc: EVC):
839
        """Prepare an evc for switching to failover."""
840 1
        install_flows = {}
841 1
        try:
842 1
            install_flows = evc.get_failover_flows()
843 1
            evc.old_path = evc.current_path
844 1
            evc.current_path = evc.failover_path
845 1
            evc.failover_path = Path([])
846
        # pylint: disable=broad-except
847 1
        except Exception:
848 1
            err = traceback.format_exc().replace("\n", ", ")
849 1
            log.error(
850
                "Ignore Failover path for "
851
                f"{evc} due to error: {err}"
852
            )
853 1
        return install_flows
854
855 1
    def execute_swap_to_failover(self, event_contents, install_flows):
856
        """Process changes needed to commit a swap to failover."""
857 1
        emit_event(
858
            self.controller, "failover_link_down",
859
            content=deepcopy(event_contents)
860
        )
861 1
        send_flow_mods_http(
862
            install_flows,
863
            "install"
864
        )
865
866 1
    def prepare_clear_old_path(self, evc: EVC):
867
        """Prepare an evc for clearing the old path."""
868
        del_flows = {}
869
        try:
870
            del_flows = prepare_delete_flow(
871
                merge_flow_dicts(
872
                    evc._prepare_uni_flows(evc.old_path, skip_in=True),
873
                    evc._prepare_nni_flows(evc.old_path)
874
                )
875
            )
876
        # pylint: disable=broad-except
877
        except Exception:
878
            err = traceback.format_exc().replace("\n", ", ")
879
            log.error(f"Fail to remove {evc} old_path: {err}")
880
        return del_flows
881
882 1
    def execute_clear_old_path(
883
        self,
884
        evcs: list[EVC],
885
        event_contents,
886
        delete_flows
887
    ):
888
        """Process changes needed to commit clearing the old path"""
889 1
        send_flow_mods_http(
890
            delete_flows,
891
            'delete'
892
        )
893 1
        emit_event(
894
            self.controller,
895
            "failover_old_path",
896
            content=event_contents
897
        )
898 1
        for evc in evcs:
899 1
            evc.old_path.make_vlans_available(self.controller)
900 1
            evc.old_path = Path([])
901
902 1
    def prepare_undeploy(self, evc: EVC):
903
        """Prepare an evc for undeploying."""
904
        del_flows = {}
905
        try:
906
            del_flows = prepare_delete_flow(
907
                merge_flow_dicts(
908
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
909
                    evc._prepare_nni_flows(evc.current_path),
910
                    evc._prepare_nni_flows(evc.failover_path)
911
                )
912
            )
913
        # pylint: disable=broad-except
914
        except Exception:
915
            err = traceback.format_exc().replace("\n", ", ")
916
            log.error(f"Fail to undeploy {evc}: {err}")
917
            pass
918
        return del_flows
919
920 1
    def execute_undeploy(self, evcs: list[EVC], remove_flows):
921
        """Process changes needed to commit an undeploy"""
922 1
        send_flow_mods_http(
923
            remove_flows,
924
            'delete'
925
        )
926
927 1
        for evc in evcs:
928 1
            evc.current_path.make_vlans_available(self.controller)
929 1
            evc.failover_path.make_vlans_available(self.controller)
930 1
            evc.current_path = Path([])
931 1
            evc.failover_path = Path([])
932 1
            evc.deactivate()
933 1
            emit_event(
934
                self.controller,
935
                "need_redeploy",
936
                content={"evc_id": evc.id}
937
            )
938 1
            log.info(f"{evc} scheduled for redeploy")
939
940
    # pylint: disable=too-many-branches
941
    # pylint: disable=too-many-locals
942 1
    def handle_link_down(self, event):
943
        """Change circuit when link is down or under_mantenance."""
944 1
        link = event.content["link"]
945 1
        log.info("Event handle_link_down %s", link)
946
947 1
        with ExitStack() as exit_stack:
948 1
            exit_stack.enter_context(self.multi_evc_lock)
949 1
            swap_to_failover = list[EVC]()
950 1
            undeploy = list[EVC]()
951 1
            clear_failover = list[EVC]()
952 1
            clear_old_path = list[EVC]()
953 1
            evc_dict = dict[str, EVC]()
954 1
            evcs_to_update = list[EVC]()
955
956 1
            flow_modifications = defaultdict[str, dict[str, dict[list]]](dict)
957 1
            event_contents = defaultdict[str, dict[str, dict]](dict)
958
959 1
            for evc in self.get_evcs_by_svc_level():
960 1
                evc_dict[evc.id] = evc
961 1
                with ExitStack() as sub_stack:
962 1
                    sub_stack.enter_context(evc.lock)
963 1
                    if all((
964
                        evc.is_affected_by_link(link),
965
                        evc.failover_path,
966
                        not evc.is_failover_path_affected_by_link(link)
967
                    )):
968 1
                        swap_to_failover.append(evc)
969 1
                    elif all((
970
                        evc.is_affected_by_link(link),
971
                        not evc.failover_path or
972
                        evc.is_failover_path_affected_by_link(link)
973
                    )):
974 1
                        undeploy.append(evc)
975 1
                    elif all((
976
                        not evc.is_affected_by_link(link),
977
                        evc.failover_path,
978
                        evc.is_failover_path_affected_by_link(link)
979
                    )):
980 1
                        clear_failover.append(evc)
981
                    else:
982
                        continue
983
984 1
                    exit_stack.push(sub_stack.pop_all())
985
986
            # Swap from current path to failover path
987
988 1
            for evc in swap_to_failover:
989 1
                new_flows = self.prepare_swap_to_failover(evc)
990 1
                if new_flows:
991 1
                    flow_modifications[evc.id]["swap_to_failover"] =\
992
                        new_flows
993 1
                    event_contents[evc.id]["swap_to_failover"] =\
994
                        map_evc_event_content(
995
                            evc,
996
                            flows=deepcopy(new_flows)
997
                    )
998 1
                    clear_old_path.append(evc)
999
                else:
1000 1
                    undeploy.append(evc)
1001
1002 1
            for evc in clear_failover:
1003 1
                evc.old_path = evc.failover_path
1004 1
                evc.failover_path = Path([])
1005 1
                clear_old_path.append(evc)
1006
1007
            # Clear out old flows
1008
1009 1
            for evc in clear_old_path:
1010 1
                del_flows = self.prepare_clear_old_path(evc)
1011 1
                if del_flows:
1012 1
                    flow_modifications[evc.id]["clear_old_path"] = del_flows
1013 1
                    event_contents[evc.id]["clear_old_path"] =\
1014
                        map_evc_event_content(
1015
                            evc,
1016
                            current_path=evc.current_path.as_dict(),
1017
                            removed_flows=deepcopy(del_flows)
1018
                    )
1019
                else:
1020
                    undeploy.append(evc)
1021
                    if evc.id in flow_modifications:
1022
                        if "swap_to_failover" in flow_modifications[evc.id]:
1023
                            evc.failover_path = evc.current_path
1024
                            evc.current_path = evc.old_path
1025
                        else:
1026
                            evc.failover_path = evc.old_path
1027
                        evc.old_path = Path([])
1028
                        del flow_modifications[evc.id]
1029
                        del event_contents[evc.id]
1030
1031 1
            swap_to_failover_flows = {}
1032 1
            swap_to_failover_event_contents = {}
1033
1034 1
            clear_old_path_flows = {}
1035 1
            clear_old_path_event_contents = {}
1036
1037 1
            clear_old_path_reservations = list[EVC]()
1038
1039 1
            for modified_evc_id in flow_modifications:
1040 1
                evc = evc_dict[modified_evc_id]
1041 1
                if "swap_to_failover" in flow_modifications[modified_evc_id]:
1042 1
                    swap_to_failover_flows = merge_flow_dicts(
1043
                        swap_to_failover_flows,
1044
                        flow_modifications[modified_evc_id]["swap_to_failover"]
1045
                    )
1046 1
                    swap_to_failover_event_contents[modified_evc_id] =\
1047
                        event_contents[modified_evc_id]["swap_to_failover"]
1048 1
                if "clear_old_path" in flow_modifications[modified_evc_id]:
1049 1
                    clear_old_path_flows = merge_flow_dicts(
1050
                        clear_old_path_flows,
1051
                        flow_modifications[modified_evc_id]["clear_old_path"]
1052
                    )
1053 1
                    clear_old_path_event_contents[modified_evc_id] =\
1054
                        event_contents[modified_evc_id]["clear_old_path"]
1055 1
                    clear_old_path_reservations.append(evc)
1056 1
                evcs_to_update.append(evc)
1057
1058 1
            if swap_to_failover_flows:
1059 1
                self.execute_swap_to_failover(
1060
                    swap_to_failover_event_contents,
1061
                    swap_to_failover_flows
1062
                )
1063
1064 1
            if clear_old_path_flows:
1065 1
                self.execute_clear_old_path(
1066
                    clear_old_path_reservations,
1067
                    clear_old_path_event_contents,
1068
                    clear_old_path_flows
1069
                )
1070
1071 1
            undeploy_flows = {}
1072 1
            undeploy_ready = list[EVC]()
1073
1074
            # Undeploy the evc, schedule a redeploy
1075
1076 1
            for evc in undeploy:
1077 1
                del_flows = self.prepare_undeploy(evc)
1078 1
                if del_flows:
1079 1
                    undeploy_flows = merge_flow_dicts(
1080
                        undeploy_flows,
1081
                        del_flows
1082
                    )
1083 1
                    undeploy_ready.append(evc)
1084
1085 1
            if undeploy_ready:
1086 1
                self.execute_undeploy(undeploy_ready, undeploy_flows)
1087 1
                evcs_to_update.extend(undeploy_ready)
1088
1089
            # Push update to DB
1090
1091 1
            if evcs_to_update:
1092 1
                self.mongo_controller.update_evcs(
1093
                    [evc.as_dict() for evc in evcs_to_update]
1094
                )
1095
1096 1
    @listen_to("kytos/mef_eline.need_redeploy")
1097 1
    def on_evc_need_redeploy(self, event):
1098
        """Redeploy evcs that need to be redeployed."""
1099
        self.handle_evc_need_redeploy(event)
1100
1101 1
    def handle_evc_need_redeploy(self, event):
1102
        """Redeploy evcs that need to be redeployed."""
1103
        evc = self.circuits.get(event.content["evc_id"])
1104
        if evc is None:
1105
            return
1106
        with evc.lock:
1107
            if not evc.is_enabled() or evc.is_active():
1108
                return
1109
            result = evc.deploy_to_path()
1110
        event_name = "error_redeploy_link_down"
1111
        if result:
1112
            log.info(f"{evc} redeployed")
1113
            event_name = "redeployed_link_down"
1114
        emit_event(self.controller, event_name,
1115
                   content=map_evc_event_content(evc))
1116
1117 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1118 1
    def on_evc_affected_by_link_down(self, event):
1119
        """Change circuit when link is down or under_mantenance."""
1120
        self.handle_evc_affected_by_link_down(event)
1121
1122 1
    def handle_evc_affected_by_link_down(self, event):
1123
        """Change circuit when link is down or under_mantenance."""
1124 1
        evc = self.circuits.get(event.content["evc_id"])
1125 1
        link = event.content['link']
1126 1
        if not evc:
1127 1
            return
1128 1
        with evc.lock:
1129 1
            if not evc.is_affected_by_link(link):
1130
                return
1131 1
            result = evc.handle_link_down()
1132 1
        event_name = "error_redeploy_link_down"
1133 1
        if result:
1134 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1135 1
            event_name = "redeployed_link_down"
1136 1
        emit_event(self.controller, event_name,
1137
                   content=map_evc_event_content(evc))
1138
1139 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1140 1
    def on_evc_deployed(self, event):
1141
        """Handle EVC deployed|redeployed_link_down."""
1142
        self.handle_evc_deployed(event)
1143
1144 1
    def handle_evc_deployed(self, event):
1145
        """Setup failover path on evc deployed."""
1146
        evc = self.circuits.get(event.content["evc_id"])
1147
        if not evc:
1148
            return
1149
        evc.try_setup_failover_path()
1150
1151 1
    @listen_to("kytos/topology.topology_loaded")
1152 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1153
        """Load EVCs once the topology is available."""
1154
        self.load_all_evcs()
1155
1156 1
    def load_all_evcs(self):
1157
        """Try to load all EVCs on startup."""
1158 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1159 1
        for circuit_id, circuit in circuits:
1160 1
            if circuit_id not in self.circuits:
1161 1
                self._load_evc(circuit)
1162 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1163
                   timeout=1)
1164
1165 1
    def _load_evc(self, circuit_dict):
1166
        """Load one EVC from mongodb to memory."""
1167 1
        try:
1168 1
            evc = self._evc_from_dict(circuit_dict)
1169 1
        except (ValueError, KytosTagError) as exception:
1170 1
            log.error(
1171
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1172
            )
1173 1
            return None
1174 1
        if evc.archived:
1175 1
            return None
1176
1177 1
        self.circuits.setdefault(evc.id, evc)
1178 1
        self.sched.add(evc)
1179 1
        return evc
1180
1181 1
    @listen_to("kytos/flow_manager.flow.error")
1182 1
    def on_flow_mod_error(self, event):
1183
        """Handle flow mod errors related to an EVC."""
1184
        self.handle_flow_mod_error(event)
1185
1186 1
    def handle_flow_mod_error(self, event):
1187
        """Handle flow mod errors related to an EVC."""
1188 1
        flow = event.content["flow"]
1189 1
        command = event.content.get("error_command")
1190 1
        if command != "add":
1191
            return
1192 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1193 1
        if evc:
1194 1
            with evc.lock:
1195 1
                evc.remove_current_flows(sync=False)
1196 1
                evc.remove_failover_flows(sync=True)
1197
1198 1
    def _evc_dict_with_instances(self, evc_dict):
1199
        """Convert some dict values to instance of EVC classes.
1200
1201
        This method will convert: [UNI, Link]
1202
        """
1203 1
        data = evc_dict.copy()  # Do not modify the original dict
1204 1
        for attribute, value in data.items():
1205
            # Get multiple attributes.
1206
            # Ex: uni_a, uni_z
1207 1
            if "uni" in attribute:
1208 1
                try:
1209 1
                    data[attribute] = self._uni_from_dict(value)
1210 1
                except ValueError as exception:
1211 1
                    result = "Error creating UNI: Invalid value"
1212 1
                    raise ValueError(result) from exception
1213
1214 1
            if attribute == "circuit_scheduler":
1215 1
                data[attribute] = []
1216 1
                for schedule in value:
1217 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1218
1219
            # Get multiple attributes.
1220
            # Ex: primary_links,
1221
            #     backup_links,
1222
            #     current_links_cache,
1223
            #     primary_links_cache,
1224
            #     backup_links_cache
1225 1
            if "links" in attribute:
1226 1
                data[attribute] = [
1227
                    self._link_from_dict(link, attribute) for link in value
1228
                ]
1229
1230
            # Ex: current_path,
1231
            #     primary_path,
1232
            #     backup_path
1233 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1234 1
                data[attribute] = Path(
1235
                    [self._link_from_dict(link, attribute) for link in value]
1236
                )
1237
1238 1
        return data
1239
1240 1
    def _evc_from_dict(self, evc_dict):
1241 1
        data = self._evc_dict_with_instances(evc_dict)
1242 1
        data["table_group"] = self.table_group
1243 1
        return EVC(self.controller, **data)
1244
1245 1
    def _uni_from_dict(self, uni_dict):
1246
        """Return a UNI object from python dict."""
1247 1
        if uni_dict is None:
1248 1
            return False
1249
1250 1
        interface_id = uni_dict.get("interface_id")
1251 1
        interface = self.controller.get_interface_by_id(interface_id)
1252 1
        if interface is None:
1253 1
            result = (
1254
                "Error creating UNI:"
1255
                + f"Could not instantiate interface {interface_id}"
1256
            )
1257 1
            raise ValueError(result) from ValueError
1258 1
        tag_convert = {1: "vlan"}
1259 1
        tag_dict = uni_dict.get("tag", None)
1260 1
        if tag_dict:
1261 1
            tag_type = tag_dict.get("tag_type")
1262 1
            tag_type = tag_convert.get(tag_type, tag_type)
1263 1
            tag_value = tag_dict.get("value")
1264 1
            if isinstance(tag_value, list):
1265 1
                tag_value = get_tag_ranges(tag_value)
1266 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1267 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1268
            else:
1269 1
                tag = TAG(tag_type, tag_value)
1270
        else:
1271 1
            tag = None
1272 1
        uni = UNI(interface, tag)
1273 1
        return uni
1274
1275 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1276
        """Return a Link object from python dict."""
1277 1
        id_a = link_dict.get("endpoint_a").get("id")
1278 1
        id_b = link_dict.get("endpoint_b").get("id")
1279
1280 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1281 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1282 1
        if not endpoint_a:
1283 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1284 1
            raise ValueError(error_msg)
1285 1
        if not endpoint_b:
1286
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1287
            raise ValueError(error_msg)
1288
1289 1
        link = Link(endpoint_a, endpoint_b)
1290 1
        allowed_paths = {"current_path", "failover_path"}
1291 1
        if "metadata" in link_dict and attribute in allowed_paths:
1292 1
            link.extend_metadata(link_dict.get("metadata"))
1293
1294 1
        s_vlan = link.get_metadata("s_vlan")
1295 1
        if s_vlan:
1296 1
            tag = TAG.from_dict(s_vlan)
1297 1
            if tag is False:
1298
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1299
                raise ValueError(error_msg)
1300 1
            link.update_metadata("s_vlan", tag)
1301 1
        return link
1302
1303 1
    def _find_evc_by_schedule_id(self, schedule_id):
1304
        """
1305
        Find an EVC and CircuitSchedule based on schedule_id.
1306
1307
        :param schedule_id: Schedule ID
1308
        :return: EVC and Schedule
1309
        """
1310 1
        circuits = self._get_circuits_buffer()
1311 1
        found_schedule = None
1312 1
        evc = None
1313
1314
        # pylint: disable=unused-variable
1315 1
        for c_id, circuit in circuits.items():
1316 1
            for schedule in circuit.circuit_scheduler:
1317 1
                if schedule.id == schedule_id:
1318 1
                    found_schedule = schedule
1319 1
                    evc = circuit
1320 1
                    break
1321 1
            if found_schedule:
1322 1
                break
1323 1
        return evc, found_schedule
1324
1325 1
    def _get_circuits_buffer(self):
1326
        """
1327
        Return the circuit buffer.
1328
1329
        If the buffer is empty, try to load data from mongodb.
1330
        """
1331 1
        if not self.circuits:
1332
            # Load circuits from mongodb to buffer
1333 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1334 1
            for c_id, circuit in circuits.items():
1335 1
                evc = self._evc_from_dict(circuit)
1336 1
                self.circuits[c_id] = evc
1337 1
        return self.circuits
1338
1339
    # pylint: disable=attribute-defined-outside-init
1340 1
    @alisten_to("kytos/of_multi_table.enable_table")
1341 1
    async def on_table_enabled(self, event):
1342
        """Handle a recently table enabled."""
1343 1
        table_group = event.content.get("mef_eline", None)
1344 1
        if not table_group:
1345 1
            return
1346 1
        for group in table_group:
1347 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1348 1
                log.error(f'The table group "{group}" is not allowed for '
1349
                          f'mef_eline. Allowed table groups are '
1350
                          f'{settings.TABLE_GROUP_ALLOWED}')
1351 1
                return
1352 1
        self.table_group.update(table_group)
1353 1
        content = {"group_table": self.table_group}
1354 1
        name = "kytos/mef_eline.enable_table"
1355
        await aemit_event(self.controller, name, content)
1356