Test Failed
Pull Request — master (#583)
by
unknown
04:26
created

build.main.Main.handle_flow_mod_error()   A

Complexity

Conditions 4

Size

Total Lines 11
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 10
nop 2
dl 0
loc 11
ccs 8
cts 8
cp 1
crap 4
rs 9.9
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13
from typing import Optional
14 1
15
from pydantic import ValidationError
16 1
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21 1
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25 1
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29 1
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32 1
                                          Path)
33
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40 1
41
# pylint: disable=too-many-public-methods
42
class Main(KytosNApp):
43
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46 1
    """
47
48 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49
50
    def setup(self):
51
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57 1
        """
58
        # object used to scheduler circuit events
59
        self.sched = Scheduler()
60 1
61 1
        # object to save and load circuits
62
        self.mongo_controller = self.get_eline_controller()
63
        self.mongo_controller.bootstrap_indexes()
64 1
65
        # set the controller that will manager the dynamic paths
66
        DynamicPathManager.set_controller(self.controller)
67
68 1
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70 1
        self.circuits = dict[str, EVC]()
71 1
72 1
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78
79 1
        self.load_all_evcs()
80
        self._topology_updated_at = None
81
82
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83
        """Get circuits sorted by desc service level and asc creation_time.
84 1
85 1
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87
        if enable_filter:
88
            return sorted(
89
                          [circuit for circuit in self.circuits.values()
90 1
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93 1
        return sorted(self.circuits.values(),
94 1
                      key=lambda x: (-x.service_level, x.creation_time))
95
96
    @staticmethod
97
    def get_eline_controller():
98 1
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100 1
101 1
    def execute(self):
102 1
        """Execute once when the napp is running."""
103 1
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106
        with self._lock:
107 1
            self.execute_consistency()
108
        log.debug("Finished consistency routine")
109
110 1
    def should_be_checked(self, circuit):
111
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113
        if (
114
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active(self.controller.switches)
120
                # if a inter-switch EVC does not have current_path, it does not
121 1
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124 1
            return True
125
        return False
126 1
127 1
    def execute_consistency(self):
128 1
        """Execute consistency routine."""
129 1
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
134 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
135 1
        for circuit in circuits_to_check:
136 1
            is_checked = circuits_checked.get(circuit.id)
137 1
            if is_checked:
138 1
                circuit.execution_rounds = 0
139 1
                log.info(f"{circuit} enabled but inactive - activating")
140
                with circuit.lock:
141 1
                    circuit.activate()
142 1
                    circuit.sync()
143 1
            else:
144 1
                circuit.execution_rounds += 1
145 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
146
                    log.info(f"{circuit} enabled but inactive - redeploy")
147 1
                    with circuit.lock:
148
                        circuit.deploy()
149
150
    def shutdown(self):
151
        """Execute when your napp is unloaded.
152
153 1
        If you have some cleanup procedure, insert it here.
154 1
        """
155
156
    @rest("/v2/evc/", methods=["GET"])
157
    def list_circuits(self, request: Request) -> JSONResponse:
158
        """Endpoint to return circuits stored.
159
160 1
        archive query arg if defined (not null) will be filtered
161 1
        accordingly, by default only non archived evcs will be listed
162 1
        """
163 1
        log.debug("list_circuits /v2/evc")
164 1
        args = request.query_params
165
        archived = args.get("archived", "false").lower()
166 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
167 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
168
                                                      metadata=args)
169 1
        circuits = circuits['circuits']
170 1
        return JSONResponse(circuits)
171
172
    @rest("/v2/evc/schedule", methods=["GET"])
173
    def list_schedules(self, _request: Request) -> JSONResponse:
174
        """Endpoint to return all schedules stored for all circuits.
175
176
        Return a JSON with the following template:
177
        [{"schedule_id": <schedule_id>,
178 1
         "circuit_id": <circuit_id>,
179 1
         "schedule": <schedule object>}]
180 1
        """
181 1
        log.debug("list_schedules /v2/evc/schedule")
182 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
183 1
        if not circuits:
184
            result = {}
185 1
            status = 200
186 1
            return JSONResponse(result, status_code=status)
187 1
188 1
        result = []
189 1
        status = 200
190 1
        for circuit in circuits:
191 1
            circuit_scheduler = circuit.get("circuit_scheduler")
192
            if circuit_scheduler:
193
                for scheduler in circuit_scheduler:
194
                    value = {
195
                        "schedule_id": scheduler.get("id"),
196 1
                        "circuit_id": circuit.get("id"),
197
                        "schedule": scheduler,
198 1
                    }
199 1
                    result.append(value)
200
201 1
        log.debug("list_schedules result %s %s", result, status)
202 1
        return JSONResponse(result, status_code=status)
203
204 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
205 1
    def get_circuit(self, request: Request) -> JSONResponse:
206 1
        """Endpoint to return a circuit based on id."""
207 1
        circuit_id = request.path_params["circuit_id"]
208 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
209 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
210 1
        if not circuit:
211 1
            result = f"circuit_id {circuit_id} not found"
212 1
            log.debug("get_circuit result %s %s", result, 404)
213 1
            raise HTTPException(404, detail=result)
214
        status = 200
215
        log.debug("get_circuit result %s %s", circuit, status)
216 1
        return JSONResponse(circuit, status_code=status)
217 1
218 1
    # pylint: disable=too-many-branches, too-many-statements
219
    @rest("/v2/evc/", methods=["POST"])
220
    @validate_openapi(spec)
221
    def create_circuit(self, request: Request) -> JSONResponse:
222
        """Try to create a new circuit.
223
224
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
225
        UNI_Z's requested C-VID are available from the interfaces' pools. This
226
        is checked when creating the UNI object.
227
228
        Then, E-Line NApp requests a primary and a backup path to the
229
        Pathfinder NApp using the attributes primary_links and backup_links
230
        submitted via REST
231
232
        # For each link composing paths in #3:
233
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
234
        #  - Using the S-VID obtained, generate abstract flow entries to be
235
        #    sent to FlowManager
236
237
        Push abstract flow entries to FlowManager and FlowManager pushes
238
        OpenFlow entries to datapaths
239
240
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
241
        creation
242
243 1
        Finnaly, notify user of the status of its request.
244 1
        """
245
        # Try to create the circuit object
246 1
        log.debug("create_circuit /v2/evc/")
247 1
        data = get_json_or_400(request, self.controller.loop)
248 1
249 1
        try:
250 1
            evc = self._evc_from_dict(data)
251 1
        except (ValueError, KytosTagError) as exception:
252 1
            log.debug("create_circuit result %s %s", exception, 400)
253 1
            raise HTTPException(400, detail=str(exception)) from exception
254 1
        try:
255 1
            check_disabled_component(evc.uni_a, evc.uni_z)
256
        except DisabledSwitch as exception:
257
            log.debug("create_circuit result %s %s", exception, 409)
258
            raise HTTPException(
259
                    409,
260 1
                    detail=f"Path is not valid: {exception}"
261 1
                ) from exception
262 1
263
        if evc.primary_path:
264
            try:
265
                evc.primary_path.is_valid(
266
                    evc.uni_a.interface.switch,
267 1
                    evc.uni_z.interface.switch,
268 1
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272 1
                    400,
273 1
                    detail=f"primary_path is not valid: {exception}"
274 1
                ) from exception
275
        if evc.backup_path:
276
            try:
277
                evc.backup_path.is_valid(
278
                    evc.uni_a.interface.switch,
279 1
                    evc.uni_z.interface.switch,
280 1
                    bool(evc.circuit_scheduler),
281
                )
282
            except InvalidPath as exception:
283
                raise HTTPException(
284
                    400,
285 1
                    detail=f"backup_path is not valid: {exception}"
286 1
                ) from exception
287 1
288
        if not evc._tag_lists_equal():
289 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
290 1
            raise HTTPException(400, detail=detail)
291 1
292 1
        try:
293
            evc._validate_has_primary_or_dynamic()
294 1
        except ValueError as exception:
295 1
            raise HTTPException(400, detail=str(exception)) from exception
296
297
        try:
298
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
299
        except DuplicatedNoTagUNI as exception:
300 1
            log.debug("create_circuit result %s %s", exception, 409)
301 1
            raise HTTPException(409, detail=str(exception)) from exception
302 1
303 1
        try:
304
            self._use_uni_tags(evc)
305
        except KytosTagError as exception:
306 1
            raise HTTPException(400, detail=str(exception)) from exception
307 1
308
        # save circuit
309
        try:
310
            evc.sync()
311
        except ValidationError as exception:
312 1
            raise HTTPException(400, detail=str(exception)) from exception
313
314
        # store circuit in dictionary
315 1
        self.circuits[evc.id] = evc
316
317
        # Schedule the circuit deploy
318 1
        self.sched.add(evc)
319 1
320 1
        # Circuit has no schedule, deploy now
321 1
        deployed = False
322
        if not evc.circuit_scheduler:
323
            with evc.lock:
324 1
                deployed = evc.deploy()
325 1
326 1
        # Notify users
327 1
        result = {"circuit_id": evc.id, "deployed": deployed}
328
        status = 201
329 1
        log.debug("create_circuit result %s %s", result, status)
330
        emit_event(self.controller, name="created",
331 1
                   content=map_evc_event_content(evc))
332 1
        return JSONResponse(result, status_code=status)
333 1
334 1
    @staticmethod
335 1
    def _use_uni_tags(evc):
336 1
        uni_a = evc.uni_a
337 1
        evc._use_uni_vlan(uni_a)
338 1
        try:
339 1
            uni_z = evc.uni_z
340 1
            evc._use_uni_vlan(uni_z)
341
        except KytosTagError as err:
342 1
            evc.make_uni_vlan_available(uni_a)
343 1
            raise err
344
345
    @listen_to('kytos/flow_manager.flow.removed')
346
    def on_flow_delete(self, event):
347 1
        """Capture delete messages to keep track when flows got removed."""
348
        self.handle_flow_delete(event)
349 1
350 1
    def handle_flow_delete(self, event):
351 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
352 1
        flow = event.content["flow"]
353 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
354
        if evc:
355 1
            log.debug("Flow removed in EVC %s", evc.id)
356 1
            evc.set_flow_removed_at()
357 1
358
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
359
    @validate_openapi(spec)
360
    def update(self, request: Request) -> JSONResponse:
361
        """Update a circuit based on payload.
362
363 1
        The EVC attributes (creation_time, active, current_path,
364 1
        failover_path, _id, archived) can't be updated.
365 1
        """
366 1
        data = get_json_or_400(request, self.controller.loop)
367 1
        circuit_id = request.path_params["circuit_id"]
368 1
        log.debug("update /v2/evc/%s", circuit_id)
369 1
        try:
370 1
            evc = self.circuits[circuit_id]
371 1
        except KeyError:
372
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("update result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375 1
376
        try:
377
            updated_data = self._evc_dict_with_instances(data)
378
            self._check_no_tag_duplication(
379 1
                circuit_id, updated_data.get("uni_a"),
380 1
                updated_data.get("uni_z")
381 1
            )
382 1
            enable, redeploy = evc.update(**updated_data)
383 1
        except (ValueError, KytosTagError, ValidationError) as exception:
384
            log.debug("update result %s %s", exception, 400)
385
            raise HTTPException(400, detail=str(exception)) from exception
386 1
        except DuplicatedNoTagUNI as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(409, detail=str(exception)) from exception
389
        except DisabledSwitch as exception:
390
            log.debug("update result %s %s", exception, 409)
391
            raise HTTPException(
392 1
                    409,
393 1
                    detail=f"Path is not valid: {exception}"
394
                ) from exception
395
        redeployed = False
396
        if evc.is_active():
397
            if enable is False:  # disable if active
398
                with evc.lock:
399
                    evc.remove()
400
            elif redeploy is not None:  # redeploy if active
401
                with evc.lock:
402 1
                    evc.remove()
403 1
                    redeployed = evc.deploy()
404 1
        else:
405 1
            if enable is True:  # enable if inactive
406 1
                with evc.lock:
407 1
                    redeployed = evc.deploy()
408 1
            elif evc.is_enabled() and redeploy:
409 1
                with evc.lock:
410 1
                    evc.remove()
411
                    redeployed = evc.deploy()
412 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
413 1
        status = 200
414
415 1
        log.debug("update result %s %s", result, status)
416
        emit_event(self.controller, "updated",
417 1
                   content=map_evc_event_content(evc, **data))
418 1
        return JSONResponse(result, status_code=status)
419
420
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
421
    def delete_circuit(self, request: Request) -> JSONResponse:
422
        """Remove a circuit.
423
424 1
        First, the flows are removed from the switches, and then the EVC is
425 1
        disabled.
426 1
        """
427 1
        circuit_id = request.path_params["circuit_id"]
428 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
429 1
        try:
430 1
            evc = self.circuits.pop(circuit_id)
431 1
        except KeyError:
432 1
            result = f"circuit_id {circuit_id} not found"
433
            log.debug("delete_circuit result %s %s", result, 404)
434 1
            raise HTTPException(404, detail=result) from KeyError
435 1
        log.info("Removing %s", evc)
436 1
437 1
        with evc.lock:
438 1
            if not evc.archived:
439 1
                evc.deactivate()
440 1
                evc.disable()
441 1
                self.sched.remove(evc)
442 1
                evc.remove_current_flows(sync=False)
443 1
                evc.remove_failover_flows(sync=False)
444 1
                evc.archive()
445
                evc.remove_uni_tags()
446
                evc.sync()
447
                emit_event(
448
                    self.controller, "deleted",
449 1
                    content=map_evc_event_content(evc)
450 1
                )
451 1
452 1
        log.info("EVC removed. %s", evc)
453
        result = {"response": f"Circuit {circuit_id} removed"}
454 1
        status = 200
455
        log.debug("delete_circuit result %s %s", result, status)
456 1
457 1
        return JSONResponse(result, status_code=status)
458
459 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
460 1
    def get_metadata(self, request: Request) -> JSONResponse:
461 1
        """Get metadata from an EVC."""
462
        circuit_id = request.path_params["circuit_id"]
463
        try:
464
            return (
465
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
466
            )
467
        except KeyError as error:
468
            raise HTTPException(
469
                404,
470 1
                detail=f"circuit_id {circuit_id} not found."
471 1
            ) from error
472 1
473 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
474 1
    @validate_openapi(spec)
475 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
476
        """Add metadata to a bulk of EVCs."""
477 1
        data = get_json_or_400(request, self.controller.loop)
478
        circuit_ids = data.pop("circuit_ids")
479 1
480 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
481 1
482 1
        fail_evcs = []
483 1
        for _id in circuit_ids:
484 1
            try:
485 1
                evc = self.circuits[_id]
486
                evc.extend_metadata(data)
487 1
            except KeyError:
488 1
                fail_evcs.append(_id)
489 1
490
        if fail_evcs:
491 1
            raise HTTPException(404, detail=fail_evcs)
492 1
        return JSONResponse("Operation successful", status_code=201)
493 1
494
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
495 1
    @validate_openapi(spec)
496 1
    def add_metadata(self, request: Request) -> JSONResponse:
497 1
        """Add metadata to an EVC."""
498
        circuit_id = request.path_params["circuit_id"]
499 1
        metadata = get_json_or_400(request, self.controller.loop)
500 1
        if not isinstance(metadata, dict):
501 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
502 1
        try:
503
            evc = self.circuits[circuit_id]
504
        except KeyError as error:
505
            raise HTTPException(
506
                404,
507 1
                detail=f"circuit_id {circuit_id} not found."
508 1
            ) from error
509 1
510
        evc.extend_metadata(metadata)
511 1
        evc.sync()
512 1
        return JSONResponse("Operation successful", status_code=201)
513 1
514 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
515 1
    @validate_openapi(spec)
516 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
517 1
        """Delete metada from a bulk of EVCs"""
518 1
        data = get_json_or_400(request, self.controller.loop)
519
        key = request.path_params["key"]
520
        circuit_ids = data.pop("circuit_ids")
521
        self.mongo_controller.update_evcs_metadata(
522 1
            circuit_ids, {key: ""}, "del"
523 1
        )
524 1
525 1
        fail_evcs = []
526 1
        for _id in circuit_ids:
527 1
            try:
528 1
                evc = self.circuits[_id]
529
                evc.remove_metadata(key)
530 1
            except KeyError:
531 1
                fail_evcs.append(_id)
532 1
533
        if fail_evcs:
534 1
            raise HTTPException(404, detail=fail_evcs)
535 1
        return JSONResponse("Operation successful")
536
537 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
538 1
    def delete_metadata(self, request: Request) -> JSONResponse:
539 1
        """Delete metadata from an EVC."""
540 1
        circuit_id = request.path_params["circuit_id"]
541 1
        key = request.path_params["key"]
542 1
        try:
543
            evc = self.circuits[circuit_id]
544
        except KeyError as error:
545
            raise HTTPException(
546
                404,
547 1
                detail=f"circuit_id {circuit_id} not found."
548 1
            ) from error
549 1
550
        evc.remove_metadata(key)
551 1
        evc.sync()
552 1
        return JSONResponse("Operation successful")
553
554 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
555 1
    def redeploy(self, request: Request) -> JSONResponse:
556
        """Endpoint to force the redeployment of an EVC."""
557
        circuit_id = request.path_params["circuit_id"]
558 1
        try_avoid_same_s_vlan = request.query_params.get(
559 1
            "try_avoid_same_s_vlan", "true"
560
        )
561
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
562 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
563 1
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
564 1
            raise HTTPException(400, detail=msg)
565 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
566 1
        try:
567
            evc = self.circuits[circuit_id]
568
        except KeyError:
569
            raise HTTPException(
570 1
                404,
571 1
                detail=f"circuit_id {circuit_id} not found"
572 1
            ) from KeyError
573 1
        deployed = False
574
        if evc.is_enabled():
575
            with evc.lock:
576
                path_dict = evc.remove_current_flows(
577 1
                    sync=False,
578 1
                    return_path=try_avoid_same_s_vlan == "true"
579 1
                )
580 1
                evc.remove_failover_flows(sync=True)
581 1
                deployed = evc.deploy(path_dict)
582
        if deployed:
583 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
584
            status = 202
585
        else:
586 1
            result = {
587
                "response": f"Circuit {circuit_id} is disabled."
588 1
            }
589
            status = 409
590 1
591 1
        return JSONResponse(result, status_code=status)
592 1
593
    @rest("/v2/evc/schedule/", methods=["POST"])
594
    @validate_openapi(spec)
595
    def create_schedule(self, request: Request) -> JSONResponse:
596
        """
597
        Create a new schedule for a given circuit.
598
599
        This service do no check if there are conflicts with another schedule.
600
        Payload example:
601
            {
602
              "circuit_id":"aa:bb:cc",
603
              "schedule": {
604
                "date": "2019-08-07T14:52:10.967Z",
605
                "interval": "string",
606
                "frequency": "1 * * * *",
607
                "action": "create"
608 1
              }
609 1
            }
610 1
        """
611 1
        log.debug("create_schedule /v2/evc/schedule/")
612
        data = get_json_or_400(request, self.controller.loop)
613
        circuit_id = data["circuit_id"]
614 1
        schedule_data = data["schedule"]
615
616
        # Get EVC from circuits buffer
617 1
        circuits = self._get_circuits_buffer()
618
619
        # get the circuit
620 1
        evc = circuits.get(circuit_id)
621 1
622 1
        # get the circuit
623 1
        if not evc:
624
            result = f"circuit_id {circuit_id} not found"
625
            log.debug("create_schedule result %s %s", result, 404)
626 1
            raise HTTPException(404, detail=result)
627
628
        # new schedule from dict
629 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
630 1
631
        # If there is no schedule, create the list
632
        if not evc.circuit_scheduler:
633 1
            evc.circuit_scheduler = []
634
635
        # Add the new schedule
636 1
        evc.circuit_scheduler.append(new_schedule)
637
638
        # Add schedule job
639 1
        self.sched.add_circuit_job(evc, new_schedule)
640
641 1
        # save circuit to mongodb
642 1
        evc.sync()
643
644 1
        result = new_schedule.as_dict()
645 1
        status = 201
646
647 1
        log.debug("create_schedule result %s %s", result, status)
648 1
        return JSONResponse(result, status_code=status)
649 1
650
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
651
    @validate_openapi(spec)
652
    def update_schedule(self, request: Request) -> JSONResponse:
653
        """Update a schedule.
654
655
        Change all attributes from the given schedule from a EVC circuit.
656
        The schedule ID is preserved as default.
657
        Payload example:
658
            {
659
              "date": "2019-08-07T14:52:10.967Z",
660
              "interval": "string",
661
              "frequency": "1 * * *",
662 1
              "action": "create"
663 1
            }
664 1
        """
665
        data = get_json_or_400(request, self.controller.loop)
666
        schedule_id = request.path_params["schedule_id"]
667 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
668
669
        # Try to find a circuit schedule
670 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
671 1
672 1
        # Can not modify circuits deleted and archived
673 1
        if not found_schedule:
674
            result = f"schedule_id {schedule_id} not found"
675 1
            log.debug("update_schedule result %s %s", result, 404)
676 1
            raise HTTPException(404, detail=result)
677
678 1
        new_schedule = CircuitSchedule.from_dict(data)
679
        new_schedule.id = found_schedule.id
680 1
        # Remove the old schedule
681
        evc.circuit_scheduler.remove(found_schedule)
682
        # Append the modified schedule
683 1
        evc.circuit_scheduler.append(new_schedule)
684
685 1
        # Cancel all schedule jobs
686
        self.sched.cancel_job(found_schedule.id)
687 1
        # Add the new circuit schedule
688
        self.sched.add_circuit_job(evc, new_schedule)
689 1
        # Save EVC to mongodb
690 1
        evc.sync()
691
692 1
        result = new_schedule.as_dict()
693 1
        status = 200
694
695 1
        log.debug("update_schedule result %s %s", result, status)
696 1
        return JSONResponse(result, status_code=status)
697
698
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
699
    def delete_schedule(self, request: Request) -> JSONResponse:
700
        """Remove a circuit schedule.
701
702
        Remove the Schedule from EVC.
703 1
        Remove the Schedule from cron job.
704 1
        Save the EVC to the Storehouse.
705 1
        """
706
        schedule_id = request.path_params["schedule_id"]
707
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
708 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
709 1
710 1
        # Can not modify circuits deleted and archived
711 1
        if not found_schedule:
712
            result = f"schedule_id {schedule_id} not found"
713
            log.debug("delete_schedule result %s %s", result, 404)
714 1
            raise HTTPException(404, detail=result)
715
716
        # Remove the old schedule
717 1
        evc.circuit_scheduler.remove(found_schedule)
718
719 1
        # Cancel all schedule jobs
720
        self.sched.cancel_job(found_schedule.id)
721 1
        # Save EVC to mongodb
722 1
        evc.sync()
723
724 1
        result = "Schedule removed"
725 1
        status = 200
726
727 1
        log.debug("delete_schedule result %s %s", result, status)
728
        return JSONResponse(result, status_code=status)
729
730
    def _check_no_tag_duplication(
731
        self,
732
        evc_id: str,
733
        uni_a: Optional[UNI] = None,
734
        uni_z: Optional[UNI] = None
735
    ):
736
        """Check if the given EVC has UNIs with no tag and if these are
737
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
738
        Args:
739
            evc (dict): EVC to be analyzed.
740 1
        """
741 1
742
        # No UNIs
743 1
        if not (uni_a or uni_z):
744
            return
745 1
746 1
        if (not (uni_a and not uni_a.user_tag) and
747 1
                not (uni_z and not uni_z.user_tag)):
748 1
            return
749 1
        for circuit in self.circuits.copy().values():
750 1
            if (not circuit.archived and circuit._id != evc_id):
751 1
                if uni_a and uni_a.user_tag is None:
752
                    circuit.check_no_tag_duplicate(uni_a)
753 1
                if uni_z and uni_z.user_tag is None:
754 1
                    circuit.check_no_tag_duplicate(uni_z)
755
756
    @listen_to("kytos/topology.link_up")
757
    def on_link_up(self, event):
758 1
        """Change circuit when link is up or end_maintenance."""
759
        self.handle_link_up(event)
760 1
761 1
    def handle_link_up(self, event):
762 1
        """Change circuit when link is up or end_maintenance."""
763 1
        log.info("Event handle_link_up %s", event.content["link"])
764 1
        for evc in self.get_evcs_by_svc_level():
765
            if evc.is_enabled() and not evc.archived:
766
                with evc.lock:
767 1
                    evc.handle_link_up(event.content["link"])
768
769
    # Possibly replace this with interruptions?
770 1
    @listen_to(
771
        '.*.switch.interface.(link_up|link_down|created|deleted)'
772
    )
773
    def on_interface_link_change(self, event: KytosEvent):
774
        """
775
        Handler for interface link_up and link_down events.
776 1
        """
777
        self.handle_on_interface_link_change(event)
778
779
    def handle_on_interface_link_change(self, event: KytosEvent):
780
        """
781
        Handler to sort interface events {link_(up, down), create, deleted}
782
783
        To avoid multiple database updated (link flap):
784
        Every interface is identfied and processed in parallel.
785
        Once an interface event is received a time is started.
786 1
        While time is running self._intf_events will be updated.
787 1
        After time has passed last received event will be processed.
788 1
        """
789
        iface = event.content.get("interface")
790 1
        with self._lock_interfaces[iface.id]:
791
            _now = event.timestamp
792
            # Return out of order events
793
            if (
794 1
                iface.id in self._intf_events
795 1
                and self._intf_events[iface.id]["event"].timestamp > _now
796 1
            ):
797 1
                return
798 1
            self._intf_events[iface.id].update({"event": event})
799 1
            if "last_acquired" in self._intf_events[iface.id]:
800 1
                return
801 1
            self._intf_events[iface.id].update({"last_acquired": now()})
802 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
803 1
        with self._lock_interfaces[iface.id]:
804 1
            event = self._intf_events[iface.id]["event"]
805 1
            self._intf_events[iface.id].pop('last_acquired', None)
806 1
            _, _, event_type = event.name.rpartition('.')
807 1
            if event_type in ('link_up', 'created'):
808
                self.handle_interface_link_up(iface)
809 1
            elif event_type in ('link_down', 'deleted'):
810
                self.handle_interface_link_down(iface)
811
812
    def handle_interface_link_up(self, interface):
813
        """
814
        Handler for interface link_up events
815
        """
816
        log.info("Event handle_interface_link_up %s", interface)
817
        for evc in self.get_evcs_by_svc_level():
818
            with evc.lock:
819
                evc.handle_interface_link_up(
820 1
                    interface
821
                )
822
823
    def handle_interface_link_down(self, interface):
824
        """
825
        Handler for interface link_down events
826
        """
827
        log.info("Event handle_interface_link_down %s", interface)
828
        for evc in self.get_evcs_by_svc_level():
829
            with evc.lock:
830
                evc.handle_interface_link_down(
831 1
                    interface
832 1
                )
833
834
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
835
    def on_link_down(self, event):
836
        """Change circuit when link is down or under_mantenance."""
837
        self.handle_link_down(event)
838 1
839
    def prepare_swap_to_failover_flow(self, evc: EVC):
840 1
        """Prepare an evc for switching to failover."""
841 1
        install_flows = {}
842 1
        try:
843 1
            install_flows = evc.get_failover_flows()
844 1
        # pylint: disable=broad-except
845 1
        except Exception:
846 1
            err = traceback.format_exc().replace("\n", ", ")
847
            log.error(
848 1
                "Ignore Failover path for "
849 1
                f"{evc} due to error: {err}"
850 1
            )
851 1
        return install_flows
852
853
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
854 1
        """Prepare event contents for swap to failover."""
855
        return map_evc_event_content(
856
            evc,
857
            flows=deepcopy(install_flow)
858 1
        )
859 1
860 1
    def execute_swap_to_failover(
861 1
        self,
862 1
        evcs: list[EVC]
863 1
    ) -> tuple[list[EVC], list[EVC]]:
864 1
        """Process changes needed to commit a swap to failover."""
865
        event_contents = {}
866 1
        install_flows = {}
867 1
        swapped_evcs = list[EVC]()
868 1
        not_swapped_evcs = list[EVC]()
869
870
        for evc in evcs:
871
            install_flow = self.prepare_swap_to_failover_flow(evc)
872 1
            if install_flow:
873 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
874 1
                event_contents[evc.id] =\
875 1
                    self.prepare_swap_to_failover_event(evc, install_flow)
876 1
                swapped_evcs.append(evc)
877 1
            else:
878 1
                not_swapped_evcs.append(evc)
879
880
        try:
881
            send_flow_mods_http(
882 1
                install_flows,
883 1
                "install"
884 1
            )
885 1
            for evc in swapped_evcs:
886
                temp_path = evc.current_path
887 1
                evc.current_path = evc.failover_path
888 1
                evc.failover_path = temp_path
889
            emit_event(
890 1
                self.controller, "failover_link_down",
891
                content=deepcopy(event_contents)
892 1
            )
893 1
            return swapped_evcs, not_swapped_evcs
894
        except FlowModException as exc:
895
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
896
            return [], [*swapped_evcs, *not_swapped_evcs]
897
898
    def prepare_clear_failover_flow(self, evc: EVC):
899 1
        """Prepare an evc for clearing the old path."""
900 1
        del_flows = {}
901 1
        try:
902 1
            del_flows = prepare_delete_flow(
903
                merge_flow_dicts(
904
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
905 1
                    evc._prepare_nni_flows(evc.failover_path)
906 1
                )
907
            )
908 1
        # pylint: disable=broad-except
909
        except Exception:
910 1
            err = traceback.format_exc().replace("\n", ", ")
911
            log.error(f"Fail to remove {evc} old_path: {err}")
912
        return del_flows
913
914
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
915
        """Prepare event contents for clearing failover."""
916 1
        return map_evc_event_content(
917 1
            evc,
918
            current_path=evc.current_path.as_dict(),
919
            removed_flows=deepcopy(delete_flow)
920
        )
921 1
922
    def execute_clear_failover(
923 1
        self,
924 1
        evcs: list[EVC]
925 1
    ) -> tuple[list[EVC], list[EVC]]:
926 1
        """Process changes needed to commit clearing the failover path"""
927 1
        event_contents = {}
928 1
        delete_flows = {}
929
        cleared_evcs = list[EVC]()
930 1
        not_cleared_evcs = list[EVC]()
931 1
932 1
        for evc in evcs:
933 1
            delete_flow = self.prepare_clear_failover_flow(evc)
934 1
            if delete_flow:
935 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
936
                event_contents[evc.id] =\
937
                    self.prepare_clear_failover_event(evc, delete_flow)
938 1
                cleared_evcs.append(evc)
939 1
            else:
940
                not_cleared_evcs.append(evc)
941
942
        try:
943 1
            send_flow_mods_http(
944
                delete_flows,
945
                'delete'
946
            )
947
            for evc in cleared_evcs:
948
                evc.failover_path.make_vlans_available(self.controller)
949
                evc.failover_path = Path([])
950 1
            emit_event(
951 1
                self.controller,
952
                "failover_old_path",
953
                content=event_contents
954
            )
955 1
            return cleared_evcs, not_cleared_evcs
956
        except FlowModException as exc:
957 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
958 1
            return [], [*cleared_evcs, *not_cleared_evcs]
959 1
960 1
    def prepare_undeploy_flow(self, evc: EVC):
961 1
        """Prepare an evc for undeploying."""
962 1
        del_flows = {}
963 1
        try:
964 1
            del_flows = prepare_delete_flow(
965 1
                merge_flow_dicts(
966 1
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
967
                    evc._prepare_nni_flows(evc.current_path),
968
                    evc._prepare_nni_flows(evc.failover_path)
969 1
                )
970
            )
971
        # pylint: disable=broad-except
972 1
        except Exception:
973
            err = traceback.format_exc().replace("\n", ", ")
974
            log.error(f"Fail to undeploy {evc}: {err}")
975
        return del_flows
976
977
    def execute_undeploy(self, evcs: list[EVC]):
978
        """Process changes needed to commit an undeploy"""
979
        delete_flows = {}
980 1
        undeploy_evcs = list[EVC]()
981 1
        not_undeploy_evcs = list[EVC]()
982 1
983
        for evc in evcs:
984
            delete_flow = self.prepare_undeploy_flow(evc)
985
            if delete_flow:
986
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
987 1
                undeploy_evcs.append(evc)
988 1
            else:
989 1
                not_undeploy_evcs.append(evc)
990 1
991 1
        try:
992
            send_flow_mods_http(
993
                delete_flows,
994 1
                'delete'
995 1
            )
996
997
            for evc in undeploy_evcs:
998
                evc.current_path.make_vlans_available(self.controller)
999 1
                evc.failover_path.make_vlans_available(self.controller)
1000
                evc.current_path = Path([])
1001 1
                evc.failover_path = Path([])
1002 1
                evc.deactivate()
1003 1
                emit_event(
1004 1
                    self.controller,
1005 1
                    "need_redeploy",
1006
                    content={"evc_id": evc.id}
1007
                )
1008 1
                log.info(f"{evc} scheduled for redeploy")
1009
            return undeploy_evcs, not_undeploy_evcs
1010 1
        except FlowModException as exc:
1011 1
            log.error(
1012 1
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1013 1
            )
1014
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1015
1016 1
    def handle_link_down(self, event):
1017 1
        """Change circuit when link is down or under_mantenance."""
1018 1
        link = event.content["link"]
1019
        log.info("Event handle_link_down %s", link)
1020 1
1021 1
        with ExitStack() as exit_stack:
1022 1
            exit_stack.enter_context(self.multi_evc_lock)
1023
            swap_to_failover = list[EVC]()
1024 1
            undeploy = list[EVC]()
1025 1
            clear_failover = list[EVC]()
1026
            evcs_to_update = dict[str, EVC]()
1027
1028
            for evc in self.get_evcs_by_svc_level():
1029 1
                with ExitStack() as sub_stack:
1030
                    sub_stack.enter_context(evc.lock)
1031 1
                    if all((
1032 1
                        evc.is_affected_by_link(link),
1033 1
                        evc.failover_path,
1034
                        not evc.is_failover_path_affected_by_link(link)
1035 1
                    )):
1036 1
                        swap_to_failover.append(evc)
1037 1
                    elif all((
1038 1
                        evc.is_affected_by_link(link),
1039 1
                        not evc.failover_path or
1040
                        evc.is_failover_path_affected_by_link(link)
1041 1
                    )):
1042
                        undeploy.append(evc)
1043
                    elif all((
1044
                        not evc.is_affected_by_link(link),
1045
                        evc.failover_path,
1046 1
                        evc.is_failover_path_affected_by_link(link)
1047 1
                    )):
1048
                        clear_failover.append(evc)
1049
                    else:
1050 1
                        continue
1051 1
1052 1
                    exit_stack.push(sub_stack.pop_all())
1053 1
1054 1
            # Swap from current path to failover path
1055 1
1056
            if swap_to_failover:
1057 1
                success, failure = self.execute_swap_to_failover(swap_to_failover)
1058 1
1059 1
                clear_failover.extend(success)
1060 1
1061
                evcs_to_update.update((evc.id, evc) for evc in success)
1062
1063
                undeploy.extend(failure)
1064
1065
            # Clear out failover path
1066
1067
            if clear_failover:
1068 1
                success, failure = self.execute_clear_failover(clear_failover)
1069 1
1070
                evcs_to_update.update((evc.id, evc) for evc in success)
1071
1072
                undeploy.extend(failure)
1073
1074
            # Undeploy the evc, schedule a redeploy
1075
1076 1
            if undeploy:
1077 1
                success, failure = self.execute_undeploy(undeploy)
1078
1079
                evcs_to_update.update((evc.id, evc) for evc in success)
1080
1081 1
            if failure:
0 ignored issues
show
introduced by
The variable failure does not seem to be defined in case swap_to_failover on line 1056 is False. Are you sure this can never be the case?
Loading history...
1082
                log.error(f"Failed to handle_link_down for {failure}")
1083 1
1084 1
            # Push update to DB
1085 1
1086 1
            if evcs_to_update:
1087
                self.mongo_controller.update_evcs(
1088 1
                    [evc.as_dict() for evc in evcs_to_update.values()]
1089
                )
1090 1
1091 1
    @listen_to("kytos/mef_eline.need_redeploy")
1092
    def on_evc_need_redeploy(self, event):
1093 1
        """Redeploy evcs that need to be redeployed."""
1094 1
        self.handle_evc_need_redeploy(event)
1095 1
1096 1
    def handle_evc_need_redeploy(self, event):
1097
        """Redeploy evcs that need to be redeployed."""
1098
        evc = self.circuits.get(event.content["evc_id"])
1099
        if evc is None:
1100 1
            return
1101 1
        with evc.lock:
1102 1
            if not evc.is_enabled() or evc.is_active():
1103 1
                return
1104 1
            result = evc.deploy_to_path()
1105 1
        event_name = "error_redeploy_link_down"
1106 1
        if result:
1107 1
            log.info(f"{evc} redeployed")
1108 1
            event_name = "redeployed_link_down"
1109 1
        emit_event(self.controller, event_name,
1110 1
                   content=map_evc_event_content(evc))
1111
1112 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1113
    def on_evc_affected_by_link_down(self, event):
1114 1
        """Change circuit when link is down or under_mantenance."""
1115 1
        self.handle_evc_affected_by_link_down(event)
1116 1
1117
    def handle_evc_affected_by_link_down(self, event):
1118 1
        """Change circuit when link is down or under_mantenance."""
1119
        evc = self.circuits.get(event.content["evc_id"])
1120 1
        link = event.content['link']
1121 1
        if not evc:
1122
            return
1123 1
        with evc.lock:
1124 1
            if not evc.is_affected_by_link(link):
1125 1
                return
1126 1
            result = evc.handle_link_down()
1127 1
        event_name = "error_redeploy_link_down"
1128 1
        if result:
1129
            log.info(f"{evc} redeployed due to link down {link.id}")
1130
            event_name = "redeployed_link_down"
1131
        emit_event(self.controller, event_name,
1132 1
                   content=map_evc_event_content(evc))
1133 1
1134 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1135 1
    def on_evc_deployed(self, event):
1136
        """Handle EVC deployed|redeployed_link_down."""
1137 1
        self.handle_evc_deployed(event)
1138 1
1139 1
    def handle_evc_deployed(self, event):
1140 1
        """Setup failover path on evc deployed."""
1141
        evc = self.circuits.get(event.content["evc_id"])
1142
        if not evc:
1143 1
            return
1144 1
        evc.try_setup_failover_path()
1145
1146 1
    @listen_to("kytos/topology.topology_loaded")
1147
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1148
        """Load EVCs once the topology is available."""
1149
        self.load_all_evcs()
1150
1151
    def load_all_evcs(self):
1152
        """Try to load all EVCs on startup."""
1153 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1154 1
        for circuit_id, circuit in circuits:
1155 1
            if circuit_id not in self.circuits:
1156
                self._load_evc(circuit)
1157
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1158 1
                   timeout=1)
1159 1
1160 1
    def _load_evc(self, circuit_dict):
1161 1
        """Load one EVC from mongodb to memory."""
1162 1
        try:
1163 1
            evc = self._evc_from_dict(circuit_dict)
1164 1
        except (ValueError, KytosTagError) as exception:
1165 1
            log.error(
1166 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1167
            )
1168 1
            return None
1169
        if evc.archived:
1170
            return None
1171
1172
        self.circuits.setdefault(evc.id, evc)
1173
        self.sched.add(evc)
1174 1
        return evc
1175
1176 1
    @listen_to("kytos/flow_manager.flow.error")
1177 1
    def on_flow_mod_error(self, event):
1178 1
        """Handle flow mod errors related to an EVC."""
1179 1
        self.handle_flow_mod_error(event)
1180 1
1181
    def handle_flow_mod_error(self, event):
1182
        """Handle flow mod errors related to an EVC."""
1183 1
        flow = event.content["flow"]
1184 1
        command = event.content.get("error_command")
1185
        if command != "add":
1186 1
            return
1187 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1188 1
        if evc:
1189 1
            with evc.lock:
1190 1
                evc.remove_current_flows(sync=False)
1191 1
                evc.remove_failover_flows(sync=True)
1192
1193
    def _evc_dict_with_instances(self, evc_dict):
1194 1
        """Convert some dict values to instance of EVC classes.
1195 1
1196 1
        This method will convert: [UNI, Link]
1197 1
        """
1198 1
        data = evc_dict.copy()  # Do not modify the original dict
1199
        for attribute, value in data.items():
1200
            # Get multiple attributes.
1201
            # Ex: uni_a, uni_z
1202
            if "uni" in attribute:
1203
                try:
1204
                    data[attribute] = self._uni_from_dict(value)
1205
                except ValueError as exception:
1206
                    result = "Error creating UNI: Invalid value"
1207
                    raise ValueError(result) from exception
1208
1209
            if attribute == "circuit_scheduler":
1210
                data[attribute] = []
1211
                for schedule in value:
1212
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1213
1214
            # Get multiple attributes.
1215
            # Ex: primary_links,
1216
            #     backup_links,
1217
            #     current_links_cache,
1218
            #     primary_links_cache,
1219
            #     backup_links_cache
1220
            if "links" in attribute:
1221
                data[attribute] = [
1222
                    self._link_from_dict(link, attribute) for link in value
1223
                ]
1224
1225
            # Ex: current_path,
1226
            #     primary_path,
1227
            #     backup_path
1228
            if "path" in attribute and attribute != "dynamic_backup_path":
1229
                data[attribute] = Path(
1230
                    [self._link_from_dict(link, attribute) for link in value]
1231
                )
1232
1233
        return data
1234
1235
    def _evc_from_dict(self, evc_dict):
1236
        data = self._evc_dict_with_instances(evc_dict)
1237
        data["table_group"] = self.table_group
1238
        return EVC(self.controller, **data)
1239
1240
    def _uni_from_dict(self, uni_dict):
1241
        """Return a UNI object from python dict."""
1242
        if uni_dict is None:
1243
            return False
1244
1245
        interface_id = uni_dict.get("interface_id")
1246
        interface = self.controller.get_interface_by_id(interface_id)
1247
        if interface is None:
1248
            result = (
1249
                "Error creating UNI:"
1250
                + f"Could not instantiate interface {interface_id}"
1251
            )
1252
            raise ValueError(result) from ValueError
1253
        tag_convert = {1: "vlan"}
1254
        tag_dict = uni_dict.get("tag", None)
1255
        if tag_dict:
1256
            tag_type = tag_dict.get("tag_type")
1257
            tag_type = tag_convert.get(tag_type, tag_type)
1258
            tag_value = tag_dict.get("value")
1259
            if isinstance(tag_value, list):
1260
                tag_value = get_tag_ranges(tag_value)
1261
                mask_list = get_vlan_tags_and_masks(tag_value)
1262
                tag = TAGRange(tag_type, tag_value, mask_list)
1263
            else:
1264
                tag = TAG(tag_type, tag_value)
1265
        else:
1266
            tag = None
1267
        uni = UNI(interface, tag)
1268
        return uni
1269
1270
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1271
        """Return a Link object from python dict."""
1272
        id_a = link_dict.get("endpoint_a").get("id")
1273
        id_b = link_dict.get("endpoint_b").get("id")
1274
1275
        endpoint_a = self.controller.get_interface_by_id(id_a)
1276
        endpoint_b = self.controller.get_interface_by_id(id_b)
1277
        if not endpoint_a:
1278
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1279
            raise ValueError(error_msg)
1280
        if not endpoint_b:
1281
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1282
            raise ValueError(error_msg)
1283
1284
        link = Link(endpoint_a, endpoint_b)
1285
        allowed_paths = {"current_path", "failover_path"}
1286
        if "metadata" in link_dict and attribute in allowed_paths:
1287
            link.extend_metadata(link_dict.get("metadata"))
1288
1289
        s_vlan = link.get_metadata("s_vlan")
1290
        if s_vlan:
1291
            tag = TAG.from_dict(s_vlan)
1292
            if tag is False:
1293
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1294
                raise ValueError(error_msg)
1295
            link.update_metadata("s_vlan", tag)
1296
        return link
1297
1298
    def _find_evc_by_schedule_id(self, schedule_id):
1299
        """
1300
        Find an EVC and CircuitSchedule based on schedule_id.
1301
1302
        :param schedule_id: Schedule ID
1303
        :return: EVC and Schedule
1304
        """
1305
        circuits = self._get_circuits_buffer()
1306
        found_schedule = None
1307
        evc = None
1308
1309
        # pylint: disable=unused-variable
1310
        for c_id, circuit in circuits.items():
1311
            for schedule in circuit.circuit_scheduler:
1312
                if schedule.id == schedule_id:
1313
                    found_schedule = schedule
1314
                    evc = circuit
1315
                    break
1316
            if found_schedule:
1317
                break
1318
        return evc, found_schedule
1319
1320
    def _get_circuits_buffer(self):
1321
        """
1322
        Return the circuit buffer.
1323
1324
        If the buffer is empty, try to load data from mongodb.
1325
        """
1326
        if not self.circuits:
1327
            # Load circuits from mongodb to buffer
1328
            circuits = self.mongo_controller.get_circuits()['circuits']
1329
            for c_id, circuit in circuits.items():
1330
                evc = self._evc_from_dict(circuit)
1331
                self.circuits[c_id] = evc
1332
        return self.circuits
1333
1334
    # pylint: disable=attribute-defined-outside-init
1335
    @alisten_to("kytos/of_multi_table.enable_table")
1336
    async def on_table_enabled(self, event):
1337
        """Handle a recently table enabled."""
1338
        table_group = event.content.get("mef_eline", None)
1339
        if not table_group:
1340
            return
1341
        for group in table_group:
1342
            if group not in settings.TABLE_GROUP_ALLOWED:
1343
                log.error(f'The table group "{group}" is not allowed for '
1344
                          f'mef_eline. Allowed table groups are '
1345
                          f'{settings.TABLE_GROUP_ALLOWED}')
1346
                return
1347
        self.table_group.update(table_group)
1348
        content = {"group_table": self.table_group}
1349
        name = "kytos/mef_eline.enable_table"
1350
        await aemit_event(self.controller, name, content)
1351