build.main.Main.create_circuit()   D
last analyzed

Complexity

Conditions 13

Size

Total Lines 105
Code Lines 64

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 36
CRAP Score 17.299

Importance

Changes 0
Metric Value
cc 13
eloc 64
nop 2
dl 0
loc 105
ccs 36
cts 51
cp 0.7059
crap 17.299
rs 4.1181
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.create_circuit() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         emit_event, get_vlan_tags_and_masks,
36
                                         map_evc_event_content,
37
                                         merge_flow_dicts, prepare_delete_flow,
38
                                         send_flow_mods_http)
39
40
41
# pylint: disable=too-many-public-methods
42 1
class Main(KytosNApp):
43
    """Main class of amlight/mef_eline NApp.
44
45
    This class is the entry point for this napp.
46
    """
47
48 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
49
50 1
    def setup(self):
51
        """Replace the '__init__' method for the KytosNApp subclass.
52
53
        The setup method is automatically called by the controller when your
54
        application is loaded.
55
56
        So, if you have any setup routine, insert it here.
57
        """
58
        # object used to scheduler circuit events
59 1
        self.sched = Scheduler()
60
61
        # object to save and load circuits
62 1
        self.mongo_controller = self.get_eline_controller()
63 1
        self.mongo_controller.bootstrap_indexes()
64
65
        # set the controller that will manager the dynamic paths
66 1
        DynamicPathManager.set_controller(self.controller)
67
68
        # dictionary of EVCs created. It acts as a circuit buffer.
69
        # Every create/update/delete must be synced to mongodb.
70 1
        self.circuits = dict[str, EVC]()
71
72 1
        self._intf_events = defaultdict(dict)
73 1
        self._lock_interfaces = defaultdict(Lock)
74 1
        self.table_group = {"epl": 0, "evpl": 0}
75 1
        self._lock = Lock()
76 1
        self.multi_evc_lock = Lock()
77 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
78
79 1
        self.load_all_evcs()
80 1
        self._topology_updated_at = None
81
82 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
83
        """Get circuits sorted by desc service level and asc creation_time.
84
85
        In the future, as more ops are offloaded it should be get from the DB.
86
        """
87 1
        if enable_filter:
88 1
            return sorted(
89
                          [circuit for circuit in self.circuits.values()
90
                           if circuit.is_enabled()],
91
                          key=lambda x: (-x.service_level, x.creation_time),
92
            )
93 1
        return sorted(self.circuits.values(),
94
                      key=lambda x: (-x.service_level, x.creation_time))
95
96 1
    @staticmethod
97 1
    def get_eline_controller():
98
        """Return the ELineController instance."""
99
        return controllers.ELineController()
100
101 1
    def execute(self):
102
        """Execute once when the napp is running."""
103 1
        if self._lock.locked():
104 1
            return
105 1
        log.debug("Starting consistency routine")
106 1
        with self._lock:
107 1
            self.execute_consistency()
108 1
        log.debug("Finished consistency routine")
109
110 1
    def should_be_checked(self, circuit):
111
        "Verify if the circuit meets the necessary conditions to be checked"
112
        # pylint: disable=too-many-boolean-expressions
113 1
        if (
114
                circuit.is_enabled()
115
                and not circuit.is_active()
116
                and not circuit.lock.locked()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124 1
            return True
125
        return False
126
127 1
    def execute_consistency(self):
128
        """Execute consistency routine."""
129 1
        circuits_to_check = []
130 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
131 1
            if self.should_be_checked(circuit):
132 1
                circuits_to_check.append(circuit)
133 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
134 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
135 1
        for circuit in circuits_to_check:
136 1
            is_checked = circuits_checked.get(circuit.id)
137 1
            if is_checked:
138 1
                circuit.execution_rounds = 0
139 1
                log.info(f"{circuit} enabled but inactive - activating")
140 1
                with circuit.lock:
141 1
                    circuit.activate()
142 1
                    circuit.sync()
143
            else:
144 1
                circuit.execution_rounds += 1
145 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
146 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
147 1
                    with circuit.lock:
148 1
                        circuit.deploy()
149
150 1
    def shutdown(self):
151
        """Execute when your napp is unloaded.
152
153
        If you have some cleanup procedure, insert it here.
154
        """
155
156 1
    @rest("/v2/evc/", methods=["GET"])
157 1
    def list_circuits(self, request: Request) -> JSONResponse:
158
        """Endpoint to return circuits stored.
159
160
        archive query arg if defined (not null) will be filtered
161
        accordingly, by default only non archived evcs will be listed
162
        """
163 1
        log.debug("list_circuits /v2/evc")
164 1
        args = request.query_params
165 1
        archived = args.get("archived", "false").lower()
166 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
167 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
168
                                                      metadata=args)
169 1
        circuits = circuits['circuits']
170 1
        return JSONResponse(circuits)
171
172 1
    @rest("/v2/evc/schedule", methods=["GET"])
173 1
    def list_schedules(self, _request: Request) -> JSONResponse:
174
        """Endpoint to return all schedules stored for all circuits.
175
176
        Return a JSON with the following template:
177
        [{"schedule_id": <schedule_id>,
178
         "circuit_id": <circuit_id>,
179
         "schedule": <schedule object>}]
180
        """
181 1
        log.debug("list_schedules /v2/evc/schedule")
182 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
183 1
        if not circuits:
184 1
            result = {}
185 1
            status = 200
186 1
            return JSONResponse(result, status_code=status)
187
188 1
        result = []
189 1
        status = 200
190 1
        for circuit in circuits:
191 1
            circuit_scheduler = circuit.get("circuit_scheduler")
192 1
            if circuit_scheduler:
193 1
                for scheduler in circuit_scheduler:
194 1
                    value = {
195
                        "schedule_id": scheduler.get("id"),
196
                        "circuit_id": circuit.get("id"),
197
                        "schedule": scheduler,
198
                    }
199 1
                    result.append(value)
200
201 1
        log.debug("list_schedules result %s %s", result, status)
202 1
        return JSONResponse(result, status_code=status)
203
204 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
205 1
    def get_circuit(self, request: Request) -> JSONResponse:
206
        """Endpoint to return a circuit based on id."""
207 1
        circuit_id = request.path_params["circuit_id"]
208 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
209 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
210 1
        if not circuit:
211 1
            result = f"circuit_id {circuit_id} not found"
212 1
            log.debug("get_circuit result %s %s", result, 404)
213 1
            raise HTTPException(404, detail=result)
214 1
        status = 200
215 1
        log.debug("get_circuit result %s %s", circuit, status)
216 1
        return JSONResponse(circuit, status_code=status)
217
218
    # pylint: disable=too-many-branches, too-many-statements
219 1
    @rest("/v2/evc/", methods=["POST"])
220 1
    @validate_openapi(spec)
221 1
    def create_circuit(self, request: Request) -> JSONResponse:
222
        """Try to create a new circuit.
223
224
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
225
        UNI_Z's requested C-VID are available from the interfaces' pools. This
226
        is checked when creating the UNI object.
227
228
        Then, E-Line NApp requests a primary and a backup path to the
229
        Pathfinder NApp using the attributes primary_links and backup_links
230
        submitted via REST
231
232
        # For each link composing paths in #3:
233
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
234
        #  - Using the S-VID obtained, generate abstract flow entries to be
235
        #    sent to FlowManager
236
237
        Push abstract flow entries to FlowManager and FlowManager pushes
238
        OpenFlow entries to datapaths
239
240
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
241
        creation
242
243
        Finnaly, notify user of the status of its request.
244
        """
245
        # Try to create the circuit object
246 1
        log.debug("create_circuit /v2/evc/")
247 1
        data = get_json_or_400(request, self.controller.loop)
248
249 1
        try:
250 1
            evc = self._evc_from_dict(data)
251 1
        except (ValueError, KytosTagError) as exception:
252 1
            log.debug("create_circuit result %s %s", exception, 400)
253 1
            raise HTTPException(400, detail=str(exception)) from exception
254 1
        if evc.primary_path:
255
            try:
256
                evc.primary_path.is_valid(
257
                    evc.uni_a.interface.switch,
258
                    evc.uni_z.interface.switch,
259
                    bool(evc.circuit_scheduler),
260
                )
261
            except InvalidPath as exception:
262
                raise HTTPException(
263
                    400,
264
                    detail=f"primary_path is not valid: {exception}"
265
                ) from exception
266 1
        if evc.backup_path:
267
            try:
268
                evc.backup_path.is_valid(
269
                    evc.uni_a.interface.switch,
270
                    evc.uni_z.interface.switch,
271
                    bool(evc.circuit_scheduler),
272
                )
273
            except InvalidPath as exception:
274
                raise HTTPException(
275
                    400,
276
                    detail=f"backup_path is not valid: {exception}"
277
                ) from exception
278
279 1
        if not evc._tag_lists_equal():
280
            detail = "UNI_A and UNI_Z tag lists should be the same."
281
            raise HTTPException(400, detail=detail)
282
283 1
        try:
284 1
            evc._validate_has_primary_or_dynamic()
285 1
        except ValueError as exception:
286 1
            raise HTTPException(400, detail=str(exception)) from exception
287
288 1
        try:
289 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
290
        except DuplicatedNoTagUNI as exception:
291
            log.debug("create_circuit result %s %s", exception, 409)
292
            raise HTTPException(409, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._use_uni_tags(evc)
296 1
        except KytosTagError as exception:
297 1
            raise HTTPException(400, detail=str(exception)) from exception
298
299
        # save circuit
300 1
        try:
301 1
            evc.sync()
302
        except ValidationError as exception:
303
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # store circuit in dictionary
306 1
        self.circuits[evc.id] = evc
307
308
        # Schedule the circuit deploy
309 1
        self.sched.add(evc)
310
311
        # Circuit has no schedule, deploy now
312 1
        deployed = False
313 1
        if not evc.circuit_scheduler:
314 1
            with evc.lock:
315 1
                deployed = evc.deploy()
316
317
        # Notify users
318 1
        result = {"circuit_id": evc.id, "deployed": deployed}
319 1
        status = 201
320 1
        log.debug("create_circuit result %s %s", result, status)
321 1
        emit_event(self.controller, name="created",
322
                   content=map_evc_event_content(evc))
323 1
        return JSONResponse(result, status_code=status)
324
325 1
    @staticmethod
326 1
    def _use_uni_tags(evc):
327 1
        uni_a = evc.uni_a
328 1
        evc._use_uni_vlan(uni_a)
329 1
        try:
330 1
            uni_z = evc.uni_z
331 1
            evc._use_uni_vlan(uni_z)
332 1
        except KytosTagError as err:
333 1
            evc.make_uni_vlan_available(uni_a)
334 1
            raise err
335
336 1
    @listen_to('kytos/flow_manager.flow.removed')
337 1
    def on_flow_delete(self, event):
338
        """Capture delete messages to keep track when flows got removed."""
339
        self.handle_flow_delete(event)
340
341 1
    def handle_flow_delete(self, event):
342
        """Keep track when the EVC got flows removed by deriving its cookie."""
343 1
        flow = event.content["flow"]
344 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
345 1
        if evc:
346 1
            log.debug("Flow removed in EVC %s", evc.id)
347 1
            evc.set_flow_removed_at()
348
349 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
350 1
    @validate_openapi(spec)
351 1
    def update(self, request: Request) -> JSONResponse:
352
        """Update a circuit based on payload.
353
354
        The EVC attributes (creation_time, active, current_path,
355
        failover_path, _id, archived) can't be updated.
356
        """
357 1
        data = get_json_or_400(request, self.controller.loop)
358 1
        circuit_id = request.path_params["circuit_id"]
359 1
        log.debug("update /v2/evc/%s", circuit_id)
360 1
        try:
361 1
            evc = self.circuits[circuit_id]
362 1
        except KeyError:
363 1
            result = f"circuit_id {circuit_id} not found"
364 1
            log.debug("update result %s %s", result, 404)
365 1
            raise HTTPException(404, detail=result) from KeyError
366
367 1
        try:
368 1
            updated_data = self._evc_dict_with_instances(data)
369 1
            self._check_no_tag_duplication(
370
                circuit_id, updated_data.get("uni_a"),
371
                updated_data.get("uni_z")
372
            )
373 1
            enable, redeploy = evc.update(**updated_data)
374 1
        except (ValueError, KytosTagError, ValidationError) as exception:
375 1
            log.debug("update result %s %s", exception, 400)
376 1
            raise HTTPException(400, detail=str(exception)) from exception
377
        except DuplicatedNoTagUNI as exception:
378
            log.debug("update result %s %s", exception, 409)
379
            raise HTTPException(409, detail=str(exception)) from exception
380
        except DisabledSwitch as exception:
381
            log.debug("update result %s %s", exception, 409)
382
            raise HTTPException(
383
                    409,
384
                    detail=f"Path is not valid: {exception}"
385
                ) from exception
386 1
        redeployed = False
387 1
        if evc.is_active():
388
            if enable is False:  # disable if active
389
                with evc.lock:
390
                    evc.remove()
391
            elif redeploy is not None:  # redeploy if active
392
                with evc.lock:
393
                    evc.remove()
394
                    redeployed = evc.deploy()
395
        else:
396 1
            if enable is True:  # enable if inactive
397 1
                with evc.lock:
398 1
                    redeployed = evc.deploy()
399 1
            elif evc.is_enabled() and redeploy:
400 1
                with evc.lock:
401 1
                    evc.remove()
402 1
                    redeployed = evc.deploy()
403 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
404 1
        status = 200
405
406 1
        log.debug("update result %s %s", result, status)
407 1
        emit_event(self.controller, "updated",
408
                   content=map_evc_event_content(evc, **data))
409 1
        return JSONResponse(result, status_code=status)
410
411 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
412 1
    def delete_circuit(self, request: Request) -> JSONResponse:
413
        """Remove a circuit.
414
415
        First, the flows are removed from the switches, and then the EVC is
416
        disabled.
417
        """
418 1
        circuit_id = request.path_params["circuit_id"]
419 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
420 1
        try:
421 1
            evc = self.circuits.pop(circuit_id)
422 1
        except KeyError:
423 1
            result = f"circuit_id {circuit_id} not found"
424 1
            log.debug("delete_circuit result %s %s", result, 404)
425 1
            raise HTTPException(404, detail=result) from KeyError
426 1
        log.info("Removing %s", evc)
427
428 1
        with evc.lock:
429 1
            if not evc.archived:
430 1
                evc.deactivate()
431 1
                evc.disable()
432 1
                self.sched.remove(evc)
433 1
                evc.remove_current_flows(sync=False)
434 1
                evc.remove_failover_flows(sync=False)
435 1
                evc.archive()
436 1
                evc.remove_uni_tags()
437 1
                evc.sync()
438 1
                emit_event(
439
                    self.controller, "deleted",
440
                    content=map_evc_event_content(evc)
441
                )
442
443 1
        log.info("EVC removed. %s", evc)
444 1
        result = {"response": f"Circuit {circuit_id} removed"}
445 1
        status = 200
446 1
        log.debug("delete_circuit result %s %s", result, status)
447
448 1
        return JSONResponse(result, status_code=status)
449
450 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
451 1
    def get_metadata(self, request: Request) -> JSONResponse:
452
        """Get metadata from an EVC."""
453 1
        circuit_id = request.path_params["circuit_id"]
454 1
        try:
455 1
            return (
456
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
457
            )
458
        except KeyError as error:
459
            raise HTTPException(
460
                404,
461
                detail=f"circuit_id {circuit_id} not found."
462
            ) from error
463
464 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
465 1
    @validate_openapi(spec)
466 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
467
        """Add metadata to a bulk of EVCs."""
468 1
        data = get_json_or_400(request, self.controller.loop)
469 1
        circuit_ids = data.pop("circuit_ids")
470
471 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
472
473 1
        fail_evcs = []
474 1
        for _id in circuit_ids:
475 1
            try:
476 1
                evc = self.circuits[_id]
477 1
                evc.extend_metadata(data)
478 1
            except KeyError:
479 1
                fail_evcs.append(_id)
480
481 1
        if fail_evcs:
482 1
            raise HTTPException(404, detail=fail_evcs)
483 1
        return JSONResponse("Operation successful", status_code=201)
484
485 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
486 1
    @validate_openapi(spec)
487 1
    def add_metadata(self, request: Request) -> JSONResponse:
488
        """Add metadata to an EVC."""
489 1
        circuit_id = request.path_params["circuit_id"]
490 1
        metadata = get_json_or_400(request, self.controller.loop)
491 1
        if not isinstance(metadata, dict):
492
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
493 1
        try:
494 1
            evc = self.circuits[circuit_id]
495 1
        except KeyError as error:
496 1
            raise HTTPException(
497
                404,
498
                detail=f"circuit_id {circuit_id} not found."
499
            ) from error
500
501 1
        evc.extend_metadata(metadata)
502 1
        evc.sync()
503 1
        return JSONResponse("Operation successful", status_code=201)
504
505 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
506 1
    @validate_openapi(spec)
507 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
508
        """Delete metada from a bulk of EVCs"""
509 1
        data = get_json_or_400(request, self.controller.loop)
510 1
        key = request.path_params["key"]
511 1
        circuit_ids = data.pop("circuit_ids")
512 1
        self.mongo_controller.update_evcs_metadata(
513
            circuit_ids, {key: ""}, "del"
514
        )
515
516 1
        fail_evcs = []
517 1
        for _id in circuit_ids:
518 1
            try:
519 1
                evc = self.circuits[_id]
520 1
                evc.remove_metadata(key)
521 1
            except KeyError:
522 1
                fail_evcs.append(_id)
523
524 1
        if fail_evcs:
525 1
            raise HTTPException(404, detail=fail_evcs)
526 1
        return JSONResponse("Operation successful")
527
528 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
529 1
    def delete_metadata(self, request: Request) -> JSONResponse:
530
        """Delete metadata from an EVC."""
531 1
        circuit_id = request.path_params["circuit_id"]
532 1
        key = request.path_params["key"]
533 1
        try:
534 1
            evc = self.circuits[circuit_id]
535 1
        except KeyError as error:
536 1
            raise HTTPException(
537
                404,
538
                detail=f"circuit_id {circuit_id} not found."
539
            ) from error
540
541 1
        evc.remove_metadata(key)
542 1
        evc.sync()
543 1
        return JSONResponse("Operation successful")
544
545 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
546 1
    def redeploy(self, request: Request) -> JSONResponse:
547
        """Endpoint to force the redeployment of an EVC."""
548 1
        circuit_id = request.path_params["circuit_id"]
549 1
        try_avoid_same_s_vlan = request.query_params.get(
550
            "try_avoid_same_s_vlan", "true"
551
        )
552 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
553 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
554
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
555
            raise HTTPException(400, detail=msg)
556 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
557 1
        try:
558 1
            evc = self.circuits[circuit_id]
559 1
        except KeyError:
560 1
            raise HTTPException(
561
                404,
562
                detail=f"circuit_id {circuit_id} not found"
563
            ) from KeyError
564 1
        deployed = False
565 1
        if evc.is_enabled():
566 1
            with evc.lock:
567 1
                path_dict = evc.remove_current_flows(
568
                    sync=False,
569
                    return_path=try_avoid_same_s_vlan == "true"
570
                )
571 1
                evc.remove_failover_flows(sync=True)
572 1
                deployed = evc.deploy(path_dict)
573 1
        if deployed:
574 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
575 1
            status = 202
576
        else:
577 1
            result = {
578
                "response": f"Circuit {circuit_id} is disabled."
579
            }
580 1
            status = 409
581
582 1
        return JSONResponse(result, status_code=status)
583
584 1
    @rest("/v2/evc/schedule/", methods=["POST"])
585 1
    @validate_openapi(spec)
586 1
    def create_schedule(self, request: Request) -> JSONResponse:
587
        """
588
        Create a new schedule for a given circuit.
589
590
        This service do no check if there are conflicts with another schedule.
591
        Payload example:
592
            {
593
              "circuit_id":"aa:bb:cc",
594
              "schedule": {
595
                "date": "2019-08-07T14:52:10.967Z",
596
                "interval": "string",
597
                "frequency": "1 * * * *",
598
                "action": "create"
599
              }
600
            }
601
        """
602 1
        log.debug("create_schedule /v2/evc/schedule/")
603 1
        data = get_json_or_400(request, self.controller.loop)
604 1
        circuit_id = data["circuit_id"]
605 1
        schedule_data = data["schedule"]
606
607
        # Get EVC from circuits buffer
608 1
        circuits = self._get_circuits_buffer()
609
610
        # get the circuit
611 1
        evc = circuits.get(circuit_id)
612
613
        # get the circuit
614 1
        if not evc:
615 1
            result = f"circuit_id {circuit_id} not found"
616 1
            log.debug("create_schedule result %s %s", result, 404)
617 1
            raise HTTPException(404, detail=result)
618
619
        # new schedule from dict
620 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
621
622
        # If there is no schedule, create the list
623 1
        if not evc.circuit_scheduler:
624 1
            evc.circuit_scheduler = []
625
626
        # Add the new schedule
627 1
        evc.circuit_scheduler.append(new_schedule)
628
629
        # Add schedule job
630 1
        self.sched.add_circuit_job(evc, new_schedule)
631
632
        # save circuit to mongodb
633 1
        evc.sync()
634
635 1
        result = new_schedule.as_dict()
636 1
        status = 201
637
638 1
        log.debug("create_schedule result %s %s", result, status)
639 1
        return JSONResponse(result, status_code=status)
640
641 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
642 1
    @validate_openapi(spec)
643 1
    def update_schedule(self, request: Request) -> JSONResponse:
644
        """Update a schedule.
645
646
        Change all attributes from the given schedule from a EVC circuit.
647
        The schedule ID is preserved as default.
648
        Payload example:
649
            {
650
              "date": "2019-08-07T14:52:10.967Z",
651
              "interval": "string",
652
              "frequency": "1 * * *",
653
              "action": "create"
654
            }
655
        """
656 1
        data = get_json_or_400(request, self.controller.loop)
657 1
        schedule_id = request.path_params["schedule_id"]
658 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
659
660
        # Try to find a circuit schedule
661 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
662
663
        # Can not modify circuits deleted and archived
664 1
        if not found_schedule:
665 1
            result = f"schedule_id {schedule_id} not found"
666 1
            log.debug("update_schedule result %s %s", result, 404)
667 1
            raise HTTPException(404, detail=result)
668
669 1
        new_schedule = CircuitSchedule.from_dict(data)
670 1
        new_schedule.id = found_schedule.id
671
        # Remove the old schedule
672 1
        evc.circuit_scheduler.remove(found_schedule)
673
        # Append the modified schedule
674 1
        evc.circuit_scheduler.append(new_schedule)
675
676
        # Cancel all schedule jobs
677 1
        self.sched.cancel_job(found_schedule.id)
678
        # Add the new circuit schedule
679 1
        self.sched.add_circuit_job(evc, new_schedule)
680
        # Save EVC to mongodb
681 1
        evc.sync()
682
683 1
        result = new_schedule.as_dict()
684 1
        status = 200
685
686 1
        log.debug("update_schedule result %s %s", result, status)
687 1
        return JSONResponse(result, status_code=status)
688
689 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
690 1
    def delete_schedule(self, request: Request) -> JSONResponse:
691
        """Remove a circuit schedule.
692
693
        Remove the Schedule from EVC.
694
        Remove the Schedule from cron job.
695
        Save the EVC to the Storehouse.
696
        """
697 1
        schedule_id = request.path_params["schedule_id"]
698 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
699 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
700
701
        # Can not modify circuits deleted and archived
702 1
        if not found_schedule:
703 1
            result = f"schedule_id {schedule_id} not found"
704 1
            log.debug("delete_schedule result %s %s", result, 404)
705 1
            raise HTTPException(404, detail=result)
706
707
        # Remove the old schedule
708 1
        evc.circuit_scheduler.remove(found_schedule)
709
710
        # Cancel all schedule jobs
711 1
        self.sched.cancel_job(found_schedule.id)
712
        # Save EVC to mongodb
713 1
        evc.sync()
714
715 1
        result = "Schedule removed"
716 1
        status = 200
717
718 1
        log.debug("delete_schedule result %s %s", result, status)
719 1
        return JSONResponse(result, status_code=status)
720
721 1
    def _check_no_tag_duplication(
722
        self,
723
        evc_id: str,
724
        uni_a: Optional[UNI] = None,
725
        uni_z: Optional[UNI] = None
726
    ):
727
        """Check if the given EVC has UNIs with no tag and if these are
728
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
729
        Args:
730
            evc (dict): EVC to be analyzed.
731
        """
732
733
        # No UNIs
734 1
        if not (uni_a or uni_z):
735 1
            return
736
737 1
        if (not (uni_a and not uni_a.user_tag) and
738
                not (uni_z and not uni_z.user_tag)):
739 1
            return
740 1
        for circuit in self.circuits.copy().values():
741 1
            if (not circuit.archived and circuit._id != evc_id):
742 1
                if uni_a and uni_a.user_tag is None:
743 1
                    circuit.check_no_tag_duplicate(uni_a)
744 1
                if uni_z and uni_z.user_tag is None:
745 1
                    circuit.check_no_tag_duplicate(uni_z)
746
747 1
    @listen_to("kytos/topology.link_up")
748 1
    def on_link_up(self, event):
749
        """Change circuit when link is up or end_maintenance."""
750
        self.handle_link_up(event)
751
752 1
    def handle_link_up(self, event):
753
        """Change circuit when link is up or end_maintenance."""
754 1
        log.info("Event handle_link_up %s", event.content["link"])
755 1
        for evc in self.get_evcs_by_svc_level():
756 1
            if evc.is_enabled() and not evc.archived:
757 1
                with evc.lock:
758 1
                    evc.handle_link_up(event.content["link"])
759
760
    # Possibly replace this with interruptions?
761 1
    @listen_to(
762
        '.*.switch.interface.(link_up|link_down|created|deleted)',
763
        '.*.interface.(disabled|enabled|up|down)'
764
    )
765 1
    def on_interface_link_change(self, event: KytosEvent):
766
        """
767
        Handler for interface link_up and link_down events.
768
        """
769
        self.handle_on_interface_link_change(event)
770
771 1
    def handle_on_interface_link_change(self, event: KytosEvent):
772
        """
773
        Handler to sort interface events {link_(up, down), create, deleted}
774
775
        To avoid multiple database updated (link flap):
776
        Every interface is identfied and processed in parallel.
777
        Once an interface event is received a time is started.
778
        While time is running self._intf_events will be updated.
779
        After time has passed last received event will be processed.
780
        """
781 1
        iface = event.content.get("interface")
782 1
        with self._lock_interfaces[iface.id]:
783 1
            _now = event.timestamp
784
            # Return out of order events
785 1
            if (
786
                iface.id in self._intf_events
787
                and self._intf_events[iface.id]["event"].timestamp > _now
788
            ):
789 1
                return
790 1
            self._intf_events[iface.id].update({"event": event})
791 1
            if "last_acquired" in self._intf_events[iface.id]:
792 1
                return
793 1
            self._intf_events[iface.id].update({"last_acquired": now()})
794 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
795 1
        with self._lock_interfaces[iface.id]:
796 1
            event = self._intf_events[iface.id]["event"]
797 1
            self._intf_events[iface.id].pop('last_acquired', None)
798 1
            _, _, event_type = event.name.rpartition('.')
799 1
            if event_type in ('link_up', 'created', 'enabled', 'up'):
800 1
                self.handle_interface_link_up(iface)
801 1
            elif event_type in ('link_down', 'deleted', 'disabled', 'down'):
802 1
                self.handle_interface_link_down(iface)
803
804 1
    def handle_interface_link_up(self, interface):
805
        """
806
        Handler for interface link_up events
807
        """
808
        log.info("Event handle_interface_link_up %s", interface)
809
        for evc in self.get_evcs_by_svc_level():
810
            if _does_uni_affect_evc(evc, interface, "up"):
811
                with evc.lock:
812
                    evc.handle_interface_link_up(
813
                        interface
814
                    )
815
816 1
    def handle_interface_link_down(self, interface):
817
        """
818
        Handler for interface link_down events
819
        """
820
        log.info("Event handle_interface_link_down %s", interface)
821
        for evc in self.get_evcs_by_svc_level():
822
            if _does_uni_affect_evc(evc, interface, "down"):
823
                with evc.lock:
824
                    evc.handle_interface_link_down(
825
                        interface
826
                    )
827
828 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
829 1
    def on_link_down(self, event):
830
        """Change circuit when link is down or under_mantenance."""
831
        self.handle_link_down(event)
832
833 1
    def prepare_swap_to_failover_flow(self, evc: EVC):
834
        """Prepare an evc for switching to failover."""
835
        install_flows = {}
836
        try:
837
            install_flows = evc.get_failover_flows()
838
        # pylint: disable=broad-except
839
        except Exception:
840
            err = traceback.format_exc()
841
            log.error(
842
                "Ignore Failover path for "
843
                f"{evc} due to error: {err}"
844
            )
845
        return install_flows
846
847 1
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
848
        """Prepare event contents for swap to failover."""
849
        return map_evc_event_content(
850
            evc,
851
            flows=deepcopy(install_flow)
852
        )
853
854 1
    def execute_swap_to_failover(
855
        self,
856
        evcs: list[EVC]
857
    ) -> tuple[list[EVC], list[EVC]]:
858
        """Process changes needed to commit a swap to failover."""
859 1
        event_contents = {}
860 1
        install_flows = {}
861 1
        swapped_evcs = list[EVC]()
862 1
        not_swapped_evcs = list[EVC]()
863
864 1
        for evc in evcs:
865 1
            install_flow = self.prepare_swap_to_failover_flow(evc)
866 1
            if install_flow:
867 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
868 1
                event_contents[evc.id] =\
869
                    self.prepare_swap_to_failover_event(evc, install_flow)
870 1
                swapped_evcs.append(evc)
871
            else:
872 1
                not_swapped_evcs.append(evc)
873
874 1
        try:
875 1
            send_flow_mods_http(
876
                install_flows,
877
                "install"
878
            )
879 1
            for evc in swapped_evcs:
880 1
                temp_path = evc.current_path
881 1
                evc.current_path = evc.failover_path
882 1
                evc.failover_path = temp_path
883 1
            emit_event(
884
                self.controller, "failover_link_down",
885
                content=deepcopy(event_contents)
886
            )
887 1
            return swapped_evcs, not_swapped_evcs
888 1
        except FlowModException as exc:
889 1
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
890 1
            return [], [*swapped_evcs, *not_swapped_evcs]
891
892 1
    def prepare_clear_failover_flow(self, evc: EVC):
893
        """Prepare an evc for clearing the old path."""
894
        del_flows = {}
895
        try:
896
            del_flows = prepare_delete_flow(
897
                merge_flow_dicts(
898
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
899
                    evc._prepare_nni_flows(evc.failover_path)
900
                )
901
            )
902
        # pylint: disable=broad-except
903
        except Exception:
904
            err = traceback.format_exc()
905
            log.error(f"Fail to remove {evc} old_path: {err}")
906
        return del_flows
907
908 1
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
909
        """Prepare event contents for clearing failover."""
910
        return map_evc_event_content(
911
            evc,
912
            current_path=evc.current_path.as_dict(),
913
            removed_flows=deepcopy(delete_flow)
914
        )
915
916 1
    def execute_clear_failover(
917
        self,
918
        evcs: list[EVC]
919
    ) -> tuple[list[EVC], list[EVC]]:
920
        """Process changes needed to commit clearing the failover path"""
921 1
        event_contents = {}
922 1
        delete_flows = {}
923 1
        cleared_evcs = list[EVC]()
924 1
        not_cleared_evcs = list[EVC]()
925
926 1
        for evc in evcs:
927 1
            delete_flow = self.prepare_clear_failover_flow(evc)
928 1
            if delete_flow:
929 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
930 1
                event_contents[evc.id] =\
931
                    self.prepare_clear_failover_event(evc, delete_flow)
932 1
                cleared_evcs.append(evc)
933
            else:
934 1
                not_cleared_evcs.append(evc)
935
936 1
        try:
937 1
            send_flow_mods_http(
938
                delete_flows,
939
                'delete'
940
            )
941 1
            for evc in cleared_evcs:
942 1
                evc.failover_path.make_vlans_available(self.controller)
943 1
                evc.failover_path = Path([])
944 1
            emit_event(
945
                self.controller,
946
                "failover_old_path",
947
                content=event_contents
948
            )
949 1
            return cleared_evcs, not_cleared_evcs
950 1
        except FlowModException as exc:
951 1
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
952 1
            return [], [*cleared_evcs, *not_cleared_evcs]
953
954 1
    def prepare_undeploy_flow(self, evc: EVC):
955
        """Prepare an evc for undeploying."""
956
        del_flows = {}
957
        try:
958
            del_flows = prepare_delete_flow(
959
                merge_flow_dicts(
960
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
961
                    evc._prepare_nni_flows(evc.current_path),
962
                    evc._prepare_nni_flows(evc.failover_path)
963
                )
964
            )
965
        # pylint: disable=broad-except
966
        except Exception:
967
            err = traceback.format_exc()
968
            log.error(f"Fail to undeploy {evc}: {err}")
969
        return del_flows
970
971 1
    def execute_undeploy(self, evcs: list[EVC]):
972
        """Process changes needed to commit an undeploy"""
973 1
        delete_flows = {}
974 1
        undeploy_evcs = list[EVC]()
975 1
        not_undeploy_evcs = list[EVC]()
976
977 1
        for evc in evcs:
978 1
            delete_flow = self.prepare_undeploy_flow(evc)
979 1
            if delete_flow:
980 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
981 1
                undeploy_evcs.append(evc)
982
            else:
983 1
                not_undeploy_evcs.append(evc)
984
985 1
        try:
986 1
            send_flow_mods_http(
987
                delete_flows,
988
                'delete'
989
            )
990
991 1
            for evc in undeploy_evcs:
992 1
                evc.current_path.make_vlans_available(self.controller)
993 1
                evc.failover_path.make_vlans_available(self.controller)
994 1
                evc.current_path = Path([])
995 1
                evc.failover_path = Path([])
996 1
                evc.deactivate()
997 1
                emit_event(
998
                    self.controller,
999
                    "need_redeploy",
1000
                    content={"evc_id": evc.id}
1001
                )
1002 1
                log.info(f"{evc} scheduled for redeploy")
1003 1
            return undeploy_evcs, not_undeploy_evcs
1004 1
        except FlowModException as exc:
1005 1
            log.error(
1006
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1007
            )
1008 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1009
1010 1
    def handle_link_down(self, event):
1011
        """Change circuit when link is down or under_mantenance."""
1012 1
        link = event.content["link"]
1013 1
        log.info("Event handle_link_down %s", link)
1014
1015 1
        with ExitStack() as exit_stack:
1016 1
            exit_stack.enter_context(self.multi_evc_lock)
1017 1
            swap_to_failover = list[EVC]()
1018 1
            undeploy = list[EVC]()
1019 1
            clear_failover = list[EVC]()
1020 1
            evcs_to_update = dict[str, EVC]()
1021
1022 1
            for evc in self.get_evcs_by_svc_level():
1023 1
                with ExitStack() as sub_stack:
1024 1
                    sub_stack.enter_context(evc.lock)
1025 1
                    if all((
1026
                        evc.is_affected_by_link(link),
1027
                        evc.failover_path,
1028
                        not evc.is_failover_path_affected_by_link(link)
1029
                    )):
1030 1
                        swap_to_failover.append(evc)
1031 1
                    elif all((
1032
                        evc.is_affected_by_link(link),
1033
                        not evc.failover_path or
1034
                        evc.is_failover_path_affected_by_link(link)
1035
                    )):
1036 1
                        undeploy.append(evc)
1037 1
                    elif all((
1038
                        not evc.is_affected_by_link(link),
1039
                        evc.failover_path,
1040
                        evc.is_failover_path_affected_by_link(link)
1041
                    )):
1042 1
                        clear_failover.append(evc)
1043
                    else:
1044
                        continue
1045
1046 1
                    exit_stack.push(sub_stack.pop_all())
1047
1048
            # Swap from current path to failover path
1049
1050 1
            if swap_to_failover:
1051 1
                success, failure = self.execute_swap_to_failover(
1052
                    swap_to_failover
1053
                )
1054
1055 1
                clear_failover.extend(success)
1056
1057 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1058
1059 1
                undeploy.extend(failure)
1060
1061
            # Clear out failover path
1062
1063 1
            if clear_failover:
1064 1
                success, failure = self.execute_clear_failover(clear_failover)
1065
1066 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1067
1068 1
                undeploy.extend(failure)
1069
1070
            # Undeploy the evc, schedule a redeploy
1071
1072 1
            if undeploy:
1073 1
                success, failure = self.execute_undeploy(undeploy)
1074
1075 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1076
1077 1
                if failure:
1078 1
                    log.error(f"Failed to handle_link_down for {failure}")
1079
1080
            # Push update to DB
1081
1082 1
            if evcs_to_update:
1083 1
                self.mongo_controller.update_evcs(
1084
                    [evc.as_dict() for evc in evcs_to_update.values()]
1085
                )
1086
1087 1
    @listen_to("kytos/mef_eline.need_redeploy")
1088 1
    def on_evc_need_redeploy(self, event):
1089
        """Redeploy evcs that need to be redeployed."""
1090
        self.handle_evc_need_redeploy(event)
1091
1092 1
    def handle_evc_need_redeploy(self, event):
1093
        """Redeploy evcs that need to be redeployed."""
1094
        evc = self.circuits.get(event.content["evc_id"])
1095
        if evc is None:
1096
            return
1097
        with evc.lock:
1098
            if not evc.is_enabled() or evc.is_active():
1099
                return
1100
            result = evc.deploy()
1101
        event_name = "error_redeploy_link_down"
1102
        if result:
1103
            log.info(f"{evc} redeployed")
1104
            event_name = "redeployed_link_down"
1105
        emit_event(self.controller, event_name,
1106
                   content=map_evc_event_content(evc))
1107
1108 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1109 1
    def on_evc_affected_by_link_down(self, event):
1110
        """Change circuit when link is down or under_mantenance."""
1111
        self.handle_evc_affected_by_link_down(event)
1112
1113 1
    def handle_evc_affected_by_link_down(self, event):
1114
        """Change circuit when link is down or under_mantenance."""
1115 1
        evc = self.circuits.get(event.content["evc_id"])
1116 1
        link = event.content['link']
1117 1
        if not evc:
1118 1
            return
1119 1
        with evc.lock:
1120 1
            if not evc.is_affected_by_link(link):
1121
                return
1122 1
            result = evc.handle_link_down()
1123 1
        event_name = "error_redeploy_link_down"
1124 1
        if result:
1125 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1126 1
            event_name = "redeployed_link_down"
1127 1
        emit_event(self.controller, event_name,
1128
                   content=map_evc_event_content(evc))
1129
1130 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1131 1
    def on_evc_deployed(self, event):
1132
        """Handle EVC deployed|redeployed_link_down."""
1133
        self.handle_evc_deployed(event)
1134
1135 1
    def handle_evc_deployed(self, event):
1136
        """Setup failover path on evc deployed."""
1137
        evc = self.circuits.get(event.content["evc_id"])
1138
        if not evc:
1139
            return
1140
        evc.try_setup_failover_path()
1141
1142 1
    @listen_to("kytos/topology.topology_loaded")
1143 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1144
        """Load EVCs once the topology is available."""
1145
        self.load_all_evcs()
1146
1147 1
    def load_all_evcs(self):
1148
        """Try to load all EVCs on startup."""
1149 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1150 1
        for circuit_id, circuit in circuits:
1151 1
            if circuit_id not in self.circuits:
1152 1
                self._load_evc(circuit)
1153 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1154
                   timeout=1)
1155
1156 1
    def _load_evc(self, circuit_dict):
1157
        """Load one EVC from mongodb to memory."""
1158 1
        try:
1159 1
            evc = self._evc_from_dict(circuit_dict)
1160 1
        except (ValueError, KytosTagError) as exception:
1161 1
            log.error(
1162
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1163
            )
1164 1
            return None
1165 1
        if evc.archived:
1166 1
            return None
1167
1168 1
        self.circuits.setdefault(evc.id, evc)
1169 1
        self.sched.add(evc)
1170 1
        return evc
1171
1172 1
    @listen_to("kytos/flow_manager.flow.error")
1173 1
    def on_flow_mod_error(self, event):
1174
        """Handle flow mod errors related to an EVC."""
1175
        self.handle_flow_mod_error(event)
1176
1177 1
    def handle_flow_mod_error(self, event):
1178
        """Handle flow mod errors related to an EVC."""
1179 1
        flow = event.content["flow"]
1180 1
        command = event.content.get("error_command")
1181 1
        if command != "add":
1182 1
            return
1183 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1184 1
        if not evc or evc.archived or not evc.is_enabled():
1185 1
            return
1186 1
        with evc.lock:
1187 1
            evc.remove_current_flows(sync=False)
1188 1
            evc.remove_failover_flows(sync=True)
1189
1190 1
    def _evc_dict_with_instances(self, evc_dict):
1191
        """Convert some dict values to instance of EVC classes.
1192
1193
        This method will convert: [UNI, Link]
1194
        """
1195 1
        data = evc_dict.copy()  # Do not modify the original dict
1196 1
        for attribute, value in data.items():
1197
            # Get multiple attributes.
1198
            # Ex: uni_a, uni_z
1199 1
            if "uni" in attribute:
1200 1
                try:
1201 1
                    data[attribute] = self._uni_from_dict(value)
1202 1
                except ValueError as exception:
1203 1
                    result = "Error creating UNI: Invalid value"
1204 1
                    raise ValueError(result) from exception
1205
1206 1
            if attribute == "circuit_scheduler":
1207 1
                data[attribute] = []
1208 1
                for schedule in value:
1209 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1210
1211
            # Get multiple attributes.
1212
            # Ex: primary_links,
1213
            #     backup_links,
1214
            #     current_links_cache,
1215
            #     primary_links_cache,
1216
            #     backup_links_cache
1217 1
            if "links" in attribute:
1218 1
                data[attribute] = [
1219
                    self._link_from_dict(link, attribute) for link in value
1220
                ]
1221
1222
            # Ex: current_path,
1223
            #     primary_path,
1224
            #     backup_path
1225 1
            if (attribute.endswith("path") and
1226
                    attribute != "dynamic_backup_path"):
1227 1
                data[attribute] = Path(
1228
                    [self._link_from_dict(link, attribute) for link in value]
1229
                )
1230
1231 1
        return data
1232
1233 1
    def _evc_from_dict(self, evc_dict):
1234 1
        data = self._evc_dict_with_instances(evc_dict)
1235 1
        data["table_group"] = self.table_group
1236 1
        return EVC(self.controller, **data)
1237
1238 1
    def _uni_from_dict(self, uni_dict):
1239
        """Return a UNI object from python dict."""
1240 1
        if uni_dict is None:
1241 1
            return False
1242
1243 1
        interface_id = uni_dict.get("interface_id")
1244 1
        interface = self.controller.get_interface_by_id(interface_id)
1245 1
        if interface is None:
1246 1
            result = (
1247
                "Error creating UNI:"
1248
                + f"Could not instantiate interface {interface_id}"
1249
            )
1250 1
            raise ValueError(result) from ValueError
1251 1
        tag_convert = {1: "vlan"}
1252 1
        tag_dict = uni_dict.get("tag", None)
1253 1
        if tag_dict:
1254 1
            tag_type = tag_dict.get("tag_type")
1255 1
            tag_type = tag_convert.get(tag_type, tag_type)
1256 1
            tag_value = tag_dict.get("value")
1257 1
            if isinstance(tag_value, list):
1258 1
                tag_value = get_tag_ranges(tag_value)
1259 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1260 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1261
            else:
1262 1
                tag = TAG(tag_type, tag_value)
1263
        else:
1264 1
            tag = None
1265 1
        uni = UNI(interface, tag)
1266 1
        return uni
1267
1268 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1269
        """Return a Link object from python dict."""
1270 1
        id_a = link_dict.get("endpoint_a").get("id")
1271 1
        id_b = link_dict.get("endpoint_b").get("id")
1272
1273 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1274 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1275 1
        if not endpoint_a:
1276 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1277 1
            raise ValueError(error_msg)
1278 1
        if not endpoint_b:
1279
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1280
            raise ValueError(error_msg)
1281
1282 1
        link = Link(endpoint_a, endpoint_b)
1283 1
        allowed_paths = {"current_path", "failover_path"}
1284 1
        if "metadata" in link_dict and attribute in allowed_paths:
1285 1
            link.extend_metadata(link_dict.get("metadata"))
1286
1287 1
        s_vlan = link.get_metadata("s_vlan")
1288 1
        if s_vlan:
1289 1
            tag = TAG.from_dict(s_vlan)
1290 1
            if tag is False:
1291
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1292
                raise ValueError(error_msg)
1293 1
            link.update_metadata("s_vlan", tag)
1294 1
        return link
1295
1296 1
    def _find_evc_by_schedule_id(self, schedule_id):
1297
        """
1298
        Find an EVC and CircuitSchedule based on schedule_id.
1299
1300
        :param schedule_id: Schedule ID
1301
        :return: EVC and Schedule
1302
        """
1303 1
        circuits = self._get_circuits_buffer()
1304 1
        found_schedule = None
1305 1
        evc = None
1306
1307
        # pylint: disable=unused-variable
1308 1
        for c_id, circuit in circuits.items():
1309 1
            for schedule in circuit.circuit_scheduler:
1310 1
                if schedule.id == schedule_id:
1311 1
                    found_schedule = schedule
1312 1
                    evc = circuit
1313 1
                    break
1314 1
            if found_schedule:
1315 1
                break
1316 1
        return evc, found_schedule
1317
1318 1
    def _get_circuits_buffer(self):
1319
        """
1320
        Return the circuit buffer.
1321
1322
        If the buffer is empty, try to load data from mongodb.
1323
        """
1324 1
        if not self.circuits:
1325
            # Load circuits from mongodb to buffer
1326 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1327 1
            for c_id, circuit in circuits.items():
1328 1
                evc = self._evc_from_dict(circuit)
1329 1
                self.circuits[c_id] = evc
1330 1
        return self.circuits
1331
1332
    # pylint: disable=attribute-defined-outside-init
1333 1
    @alisten_to("kytos/of_multi_table.enable_table")
1334 1
    async def on_table_enabled(self, event):
1335
        """Handle a recently table enabled."""
1336 1
        table_group = event.content.get("mef_eline", None)
1337 1
        if not table_group:
1338 1
            return
1339 1
        for group in table_group:
1340 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1341 1
                log.error(f'The table group "{group}" is not allowed for '
1342
                          f'mef_eline. Allowed table groups are '
1343
                          f'{settings.TABLE_GROUP_ALLOWED}')
1344 1
                return
1345 1
        self.table_group.update(table_group)
1346 1
        content = {"group_table": self.table_group}
1347 1
        name = "kytos/mef_eline.enable_table"
1348
        await aemit_event(self.controller, name, content)
1349