Test Failed
Pull Request — master (#583)
by
unknown
05:22
created

build.main.Main.handle_link_down()   F

Complexity

Conditions 22

Size

Total Lines 173
Code Lines 128

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 98
CRAP Score 22.7771

Importance

Changes 0
Metric Value
cc 22
eloc 128
nop 2
dl 0
loc 173
ccs 98
cts 111
cp 0.8829
crap 22.7771
rs 0
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.handle_link_down() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13
from typing import Optional
14 1
15
from pydantic import ValidationError
16 1
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21 1
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25 1
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29 1
                                              DuplicatedNoTagUNI, InvalidPath)
30
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
31 1
                                          Path)
32 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
33
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
34
                                         emit_event, get_vlan_tags_and_masks,
35
                                         map_evc_event_content,
36
                                         merge_flow_dicts, prepare_delete_flow,
37
                                         send_flow_mods_event)
38
39
40 1
# pylint: disable=too-many-public-methods
41
class Main(KytosNApp):
42
    """Main class of amlight/mef_eline NApp.
43
44
    This class is the entry point for this napp.
45
    """
46 1
47
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
48 1
49
    def setup(self):
50
        """Replace the '__init__' method for the KytosNApp subclass.
51
52
        The setup method is automatically called by the controller when your
53
        application is loaded.
54
55
        So, if you have any setup routine, insert it here.
56
        """
57 1
        # object used to scheduler circuit events
58
        self.sched = Scheduler()
59
60 1
        # object to save and load circuits
61 1
        self.mongo_controller = self.get_eline_controller()
62
        self.mongo_controller.bootstrap_indexes()
63
64 1
        # set the controller that will manager the dynamic paths
65
        DynamicPathManager.set_controller(self.controller)
66
67
        # dictionary of EVCs created. It acts as a circuit buffer.
68 1
        # Every create/update/delete must be synced to mongodb.
69
        self.circuits = {}
70 1
71 1
        self._intf_events = defaultdict(dict)
72 1
        self._lock_interfaces = defaultdict(Lock)
73 1
        self.table_group = {"epl": 0, "evpl": 0}
74 1
        self._lock = Lock()
75
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
76 1
77 1
        self.load_all_evcs()
78
        self._topology_updated_at = None
79 1
80
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
81
        """Get circuits sorted by desc service level and asc creation_time.
82
83
        In the future, as more ops are offloaded it should be get from the DB.
84 1
        """
85 1
        if enable_filter:
86
            return sorted(
87
                          [circuit for circuit in self.circuits.values()
88
                           if circuit.is_enabled()],
89
                          key=lambda x: (-x.service_level, x.creation_time),
90 1
            )
91
        return sorted(self.circuits.values(),
92
                      key=lambda x: (-x.service_level, x.creation_time))
93 1
94 1
    @staticmethod
95
    def get_eline_controller():
96
        """Return the ELineController instance."""
97
        return controllers.ELineController()
98 1
99
    def execute(self):
100 1
        """Execute once when the napp is running."""
101 1
        if self._lock.locked():
102 1
            return
103 1
        log.debug("Starting consistency routine")
104 1
        with self._lock:
105 1
            self.execute_consistency()
106
        log.debug("Finished consistency routine")
107 1
108
    def should_be_checked(self, circuit):
109
        "Verify if the circuit meets the necessary conditions to be checked"
110 1
        # pylint: disable=too-many-boolean-expressions
111
        if (
112
                circuit.is_enabled()
113
                and not circuit.is_active()
114
                and not circuit.lock.locked()
115
                and not circuit.has_recent_removed_flow()
116
                and not circuit.is_recent_updated()
117
                and circuit.are_unis_active(self.controller.switches)
118
                # if a inter-switch EVC does not have current_path, it does not
119
                # make sense to run sdntrace on it
120
                and (circuit.is_intra_switch() or circuit.current_path)
121 1
                ):
122
            return True
123
        return False
124 1
125
    def execute_consistency(self):
126 1
        """Execute consistency routine."""
127 1
        circuits_to_check = []
128 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
129 1
            if self.should_be_checked(circuit):
130 1
                circuits_to_check.append(circuit)
131 1
            circuit.try_setup_failover_path()
132 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
133 1
        for circuit in circuits_to_check:
134 1
            is_checked = circuits_checked.get(circuit.id)
135 1
            if is_checked:
136 1
                circuit.execution_rounds = 0
137 1
                log.info(f"{circuit} enabled but inactive - activating")
138 1
                with circuit.lock:
139 1
                    circuit.activate()
140
                    circuit.sync()
141 1
            else:
142 1
                circuit.execution_rounds += 1
143 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
144 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
145 1
                    with circuit.lock:
146
                        circuit.deploy()
147 1
148
    def shutdown(self):
149
        """Execute when your napp is unloaded.
150
151
        If you have some cleanup procedure, insert it here.
152
        """
153 1
154 1
    @rest("/v2/evc/", methods=["GET"])
155
    def list_circuits(self, request: Request) -> JSONResponse:
156
        """Endpoint to return circuits stored.
157
158
        archive query arg if defined (not null) will be filtered
159
        accordingly, by default only non archived evcs will be listed
160 1
        """
161 1
        log.debug("list_circuits /v2/evc")
162 1
        args = request.query_params
163 1
        archived = args.get("archived", "false").lower()
164 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
165
        circuits = self.mongo_controller.get_circuits(archived=archived,
166 1
                                                      metadata=args)
167 1
        circuits = circuits['circuits']
168
        return JSONResponse(circuits)
169 1
170 1
    @rest("/v2/evc/schedule", methods=["GET"])
171
    def list_schedules(self, _request: Request) -> JSONResponse:
172
        """Endpoint to return all schedules stored for all circuits.
173
174
        Return a JSON with the following template:
175
        [{"schedule_id": <schedule_id>,
176
         "circuit_id": <circuit_id>,
177
         "schedule": <schedule object>}]
178 1
        """
179 1
        log.debug("list_schedules /v2/evc/schedule")
180 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
181 1
        if not circuits:
182 1
            result = {}
183 1
            status = 200
184
            return JSONResponse(result, status_code=status)
185 1
186 1
        result = []
187 1
        status = 200
188 1
        for circuit in circuits:
189 1
            circuit_scheduler = circuit.get("circuit_scheduler")
190 1
            if circuit_scheduler:
191 1
                for scheduler in circuit_scheduler:
192
                    value = {
193
                        "schedule_id": scheduler.get("id"),
194
                        "circuit_id": circuit.get("id"),
195
                        "schedule": scheduler,
196 1
                    }
197
                    result.append(value)
198 1
199 1
        log.debug("list_schedules result %s %s", result, status)
200
        return JSONResponse(result, status_code=status)
201 1
202 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
203
    def get_circuit(self, request: Request) -> JSONResponse:
204 1
        """Endpoint to return a circuit based on id."""
205 1
        circuit_id = request.path_params["circuit_id"]
206 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
207 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
208 1
        if not circuit:
209 1
            result = f"circuit_id {circuit_id} not found"
210 1
            log.debug("get_circuit result %s %s", result, 404)
211 1
            raise HTTPException(404, detail=result)
212 1
        status = 200
213 1
        log.debug("get_circuit result %s %s", circuit, status)
214
        return JSONResponse(circuit, status_code=status)
215
216 1
    # pylint: disable=too-many-branches, too-many-statements
217 1
    @rest("/v2/evc/", methods=["POST"])
218 1
    @validate_openapi(spec)
219
    def create_circuit(self, request: Request) -> JSONResponse:
220
        """Try to create a new circuit.
221
222
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
223
        UNI_Z's requested C-VID are available from the interfaces' pools. This
224
        is checked when creating the UNI object.
225
226
        Then, E-Line NApp requests a primary and a backup path to the
227
        Pathfinder NApp using the attributes primary_links and backup_links
228
        submitted via REST
229
230
        # For each link composing paths in #3:
231
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
232
        #  - Using the S-VID obtained, generate abstract flow entries to be
233
        #    sent to FlowManager
234
235
        Push abstract flow entries to FlowManager and FlowManager pushes
236
        OpenFlow entries to datapaths
237
238
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
239
        creation
240
241
        Finnaly, notify user of the status of its request.
242
        """
243 1
        # Try to create the circuit object
244 1
        log.debug("create_circuit /v2/evc/")
245
        data = get_json_or_400(request, self.controller.loop)
246 1
247 1
        try:
248 1
            evc = self._evc_from_dict(data)
249 1
        except (ValueError, KytosTagError) as exception:
250 1
            log.debug("create_circuit result %s %s", exception, 400)
251 1
            raise HTTPException(400, detail=str(exception)) from exception
252 1
        try:
253 1
            check_disabled_component(evc.uni_a, evc.uni_z)
254 1
        except DisabledSwitch as exception:
255 1
            log.debug("create_circuit result %s %s", exception, 409)
256
            raise HTTPException(
257
                    409,
258
                    detail=f"Path is not valid: {exception}"
259
                ) from exception
260 1
261 1
        if evc.primary_path:
262 1
            try:
263
                evc.primary_path.is_valid(
264
                    evc.uni_a.interface.switch,
265
                    evc.uni_z.interface.switch,
266
                    bool(evc.circuit_scheduler),
267 1
                )
268 1
            except InvalidPath as exception:
269
                raise HTTPException(
270
                    400,
271
                    detail=f"primary_path is not valid: {exception}"
272 1
                ) from exception
273 1
        if evc.backup_path:
274 1
            try:
275
                evc.backup_path.is_valid(
276
                    evc.uni_a.interface.switch,
277
                    evc.uni_z.interface.switch,
278
                    bool(evc.circuit_scheduler),
279 1
                )
280 1
            except InvalidPath as exception:
281
                raise HTTPException(
282
                    400,
283
                    detail=f"backup_path is not valid: {exception}"
284
                ) from exception
285 1
286 1
        if not evc._tag_lists_equal():
287 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
288
            raise HTTPException(400, detail=detail)
289 1
290 1
        try:
291 1
            evc._validate_has_primary_or_dynamic()
292 1
        except ValueError as exception:
293
            raise HTTPException(400, detail=str(exception)) from exception
294 1
295 1
        try:
296
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
297
        except DuplicatedNoTagUNI as exception:
298
            log.debug("create_circuit result %s %s", exception, 409)
299
            raise HTTPException(409, detail=str(exception)) from exception
300 1
301 1
        try:
302 1
            self._use_uni_tags(evc)
303 1
        except KytosTagError as exception:
304
            raise HTTPException(400, detail=str(exception)) from exception
305
306 1
        # save circuit
307 1
        try:
308
            evc.sync()
309
        except ValidationError as exception:
310
            raise HTTPException(400, detail=str(exception)) from exception
311
312 1
        # store circuit in dictionary
313
        self.circuits[evc.id] = evc
314
315 1
        # Schedule the circuit deploy
316
        self.sched.add(evc)
317
318 1
        # Circuit has no schedule, deploy now
319 1
        deployed = False
320 1
        if not evc.circuit_scheduler:
321 1
            with evc.lock:
322
                deployed = evc.deploy()
323
324 1
        # Notify users
325 1
        result = {"circuit_id": evc.id, "deployed": deployed}
326 1
        status = 201
327 1
        log.debug("create_circuit result %s %s", result, status)
328
        emit_event(self.controller, name="created",
329 1
                   content=map_evc_event_content(evc))
330
        return JSONResponse(result, status_code=status)
331 1
332 1
    @staticmethod
333 1
    def _use_uni_tags(evc):
334 1
        uni_a = evc.uni_a
335 1
        evc._use_uni_vlan(uni_a)
336 1
        try:
337 1
            uni_z = evc.uni_z
338 1
            evc._use_uni_vlan(uni_z)
339 1
        except KytosTagError as err:
340 1
            evc.make_uni_vlan_available(uni_a)
341
            raise err
342 1
343 1
    @listen_to('kytos/flow_manager.flow.removed')
344
    def on_flow_delete(self, event):
345
        """Capture delete messages to keep track when flows got removed."""
346
        self.handle_flow_delete(event)
347 1
348
    def handle_flow_delete(self, event):
349 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
350 1
        flow = event.content["flow"]
351 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
352 1
        if evc:
353 1
            log.debug("Flow removed in EVC %s", evc.id)
354
            evc.set_flow_removed_at()
355 1
356 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
357 1
    @validate_openapi(spec)
358
    def update(self, request: Request) -> JSONResponse:
359
        """Update a circuit based on payload.
360
361
        The EVC attributes (creation_time, active, current_path,
362
        failover_path, _id, archived) can't be updated.
363 1
        """
364 1
        data = get_json_or_400(request, self.controller.loop)
365 1
        circuit_id = request.path_params["circuit_id"]
366 1
        log.debug("update /v2/evc/%s", circuit_id)
367 1
        try:
368 1
            evc = self.circuits[circuit_id]
369 1
        except KeyError:
370 1
            result = f"circuit_id {circuit_id} not found"
371 1
            log.debug("update result %s %s", result, 404)
372
            raise HTTPException(404, detail=result) from KeyError
373 1
374 1
        try:
375 1
            updated_data = self._evc_dict_with_instances(data)
376
            self._check_no_tag_duplication(
377
                circuit_id, updated_data.get("uni_a"),
378
                updated_data.get("uni_z")
379 1
            )
380 1
            enable, redeploy = evc.update(**updated_data)
381 1
        except (ValueError, KytosTagError, ValidationError) as exception:
382 1
            log.debug("update result %s %s", exception, 400)
383 1
            raise HTTPException(400, detail=str(exception)) from exception
384
        except DuplicatedNoTagUNI as exception:
385
            log.debug("update result %s %s", exception, 409)
386 1
            raise HTTPException(409, detail=str(exception)) from exception
387 1
        except DisabledSwitch as exception:
388 1
            log.debug("update result %s %s", exception, 409)
389
            raise HTTPException(
390
                    409,
391
                    detail=f"Path is not valid: {exception}"
392 1
                ) from exception
393 1
        redeployed = False
394
        if evc.is_active():
395
            if enable is False:  # disable if active
396
                with evc.lock:
397
                    evc.remove()
398
            elif redeploy is not None:  # redeploy if active
399
                with evc.lock:
400
                    evc.remove()
401
                    redeployed = evc.deploy()
402 1
        else:
403 1
            if enable is True:  # enable if inactive
404 1
                with evc.lock:
405 1
                    redeployed = evc.deploy()
406 1
            elif evc.is_enabled() and redeploy:
407 1
                with evc.lock:
408 1
                    evc.remove()
409 1
                    redeployed = evc.deploy()
410 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
411
        status = 200
412 1
413 1
        log.debug("update result %s %s", result, status)
414
        emit_event(self.controller, "updated",
415 1
                   content=map_evc_event_content(evc, **data))
416
        return JSONResponse(result, status_code=status)
417 1
418 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
419
    def delete_circuit(self, request: Request) -> JSONResponse:
420
        """Remove a circuit.
421
422
        First, the flows are removed from the switches, and then the EVC is
423
        disabled.
424 1
        """
425 1
        circuit_id = request.path_params["circuit_id"]
426 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
427 1
        try:
428 1
            evc = self.circuits.pop(circuit_id)
429 1
        except KeyError:
430 1
            result = f"circuit_id {circuit_id} not found"
431 1
            log.debug("delete_circuit result %s %s", result, 404)
432 1
            raise HTTPException(404, detail=result) from KeyError
433
        log.info("Removing %s", evc)
434 1
435 1
        with evc.lock:
436 1
            if not evc.archived:
437 1
                evc.deactivate()
438 1
                evc.disable()
439 1
                self.sched.remove(evc)
440 1
                evc.remove_current_flows(sync=False)
441 1
                evc.remove_failover_flows(sync=False)
442 1
                evc.archive()
443 1
                evc.remove_uni_tags()
444 1
                evc.sync()
445
                emit_event(
446
                    self.controller, "deleted",
447
                    content=map_evc_event_content(evc)
448
                )
449 1
450 1
        log.info("EVC removed. %s", evc)
451 1
        result = {"response": f"Circuit {circuit_id} removed"}
452 1
        status = 200
453
        log.debug("delete_circuit result %s %s", result, status)
454 1
455
        return JSONResponse(result, status_code=status)
456 1
457 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
458
    def get_metadata(self, request: Request) -> JSONResponse:
459 1
        """Get metadata from an EVC."""
460 1
        circuit_id = request.path_params["circuit_id"]
461 1
        try:
462
            return (
463
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
464
            )
465
        except KeyError as error:
466
            raise HTTPException(
467
                404,
468
                detail=f"circuit_id {circuit_id} not found."
469
            ) from error
470 1
471 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
472 1
    @validate_openapi(spec)
473
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
474 1
        """Add metadata to a bulk of EVCs."""
475 1
        data = get_json_or_400(request, self.controller.loop)
476
        circuit_ids = data.pop("circuit_ids")
477 1
478
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
479 1
480 1
        fail_evcs = []
481 1
        for _id in circuit_ids:
482 1
            try:
483 1
                evc = self.circuits[_id]
484 1
                evc.extend_metadata(data)
485 1
            except KeyError:
486
                fail_evcs.append(_id)
487 1
488 1
        if fail_evcs:
489 1
            raise HTTPException(404, detail=fail_evcs)
490
        return JSONResponse("Operation successful", status_code=201)
491 1
492 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
493 1
    @validate_openapi(spec)
494
    def add_metadata(self, request: Request) -> JSONResponse:
495 1
        """Add metadata to an EVC."""
496 1
        circuit_id = request.path_params["circuit_id"]
497 1
        metadata = get_json_or_400(request, self.controller.loop)
498
        if not isinstance(metadata, dict):
499 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError as error:
503
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found."
506
            ) from error
507 1
508 1
        evc.extend_metadata(metadata)
509 1
        evc.sync()
510
        return JSONResponse("Operation successful", status_code=201)
511 1
512 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
513 1
    @validate_openapi(spec)
514
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
515 1
        """Delete metada from a bulk of EVCs"""
516 1
        data = get_json_or_400(request, self.controller.loop)
517 1
        key = request.path_params["key"]
518 1
        circuit_ids = data.pop("circuit_ids")
519
        self.mongo_controller.update_evcs_metadata(
520
            circuit_ids, {key: ""}, "del"
521
        )
522 1
523 1
        fail_evcs = []
524 1
        for _id in circuit_ids:
525 1
            try:
526 1
                evc = self.circuits[_id]
527 1
                evc.remove_metadata(key)
528 1
            except KeyError:
529
                fail_evcs.append(_id)
530 1
531 1
        if fail_evcs:
532 1
            raise HTTPException(404, detail=fail_evcs)
533
        return JSONResponse("Operation successful")
534 1
535 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
536
    def delete_metadata(self, request: Request) -> JSONResponse:
537 1
        """Delete metadata from an EVC."""
538 1
        circuit_id = request.path_params["circuit_id"]
539 1
        key = request.path_params["key"]
540 1
        try:
541 1
            evc = self.circuits[circuit_id]
542 1
        except KeyError as error:
543
            raise HTTPException(
544
                404,
545
                detail=f"circuit_id {circuit_id} not found."
546
            ) from error
547 1
548 1
        evc.remove_metadata(key)
549 1
        evc.sync()
550
        return JSONResponse("Operation successful")
551 1
552 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
553
    def redeploy(self, request: Request) -> JSONResponse:
554 1
        """Endpoint to force the redeployment of an EVC."""
555 1
        circuit_id = request.path_params["circuit_id"]
556
        try_avoid_same_s_vlan = request.query_params.get(
557
            "try_avoid_same_s_vlan", "true"
558 1
        )
559 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
560
        if try_avoid_same_s_vlan not in {"true", "false"}:
561
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
562 1
            raise HTTPException(400, detail=msg)
563 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
564 1
        try:
565 1
            evc = self.circuits[circuit_id]
566 1
        except KeyError:
567
            raise HTTPException(
568
                404,
569
                detail=f"circuit_id {circuit_id} not found"
570 1
            ) from KeyError
571 1
        deployed = False
572 1
        if evc.is_enabled():
573 1
            with evc.lock:
574
                path_dict = evc.remove_current_flows(
575
                    sync=False,
576
                    return_path=try_avoid_same_s_vlan == "true"
577 1
                )
578 1
                evc.remove_failover_flows(sync=True)
579 1
                deployed = evc.deploy(path_dict)
580 1
        if deployed:
581 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
582
            status = 202
583 1
        else:
584
            result = {
585
                "response": f"Circuit {circuit_id} is disabled."
586 1
            }
587
            status = 409
588 1
589
        return JSONResponse(result, status_code=status)
590 1
591 1
    @rest("/v2/evc/schedule/", methods=["POST"])
592 1
    @validate_openapi(spec)
593
    def create_schedule(self, request: Request) -> JSONResponse:
594
        """
595
        Create a new schedule for a given circuit.
596
597
        This service do no check if there are conflicts with another schedule.
598
        Payload example:
599
            {
600
              "circuit_id":"aa:bb:cc",
601
              "schedule": {
602
                "date": "2019-08-07T14:52:10.967Z",
603
                "interval": "string",
604
                "frequency": "1 * * * *",
605
                "action": "create"
606
              }
607
            }
608 1
        """
609 1
        log.debug("create_schedule /v2/evc/schedule/")
610 1
        data = get_json_or_400(request, self.controller.loop)
611 1
        circuit_id = data["circuit_id"]
612
        schedule_data = data["schedule"]
613
614 1
        # Get EVC from circuits buffer
615
        circuits = self._get_circuits_buffer()
616
617 1
        # get the circuit
618
        evc = circuits.get(circuit_id)
619
620 1
        # get the circuit
621 1
        if not evc:
622 1
            result = f"circuit_id {circuit_id} not found"
623 1
            log.debug("create_schedule result %s %s", result, 404)
624
            raise HTTPException(404, detail=result)
625
626 1
        # new schedule from dict
627
        new_schedule = CircuitSchedule.from_dict(schedule_data)
628
629 1
        # If there is no schedule, create the list
630 1
        if not evc.circuit_scheduler:
631
            evc.circuit_scheduler = []
632
633 1
        # Add the new schedule
634
        evc.circuit_scheduler.append(new_schedule)
635
636 1
        # Add schedule job
637
        self.sched.add_circuit_job(evc, new_schedule)
638
639 1
        # save circuit to mongodb
640
        evc.sync()
641 1
642 1
        result = new_schedule.as_dict()
643
        status = 201
644 1
645 1
        log.debug("create_schedule result %s %s", result, status)
646
        return JSONResponse(result, status_code=status)
647 1
648 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
649 1
    @validate_openapi(spec)
650
    def update_schedule(self, request: Request) -> JSONResponse:
651
        """Update a schedule.
652
653
        Change all attributes from the given schedule from a EVC circuit.
654
        The schedule ID is preserved as default.
655
        Payload example:
656
            {
657
              "date": "2019-08-07T14:52:10.967Z",
658
              "interval": "string",
659
              "frequency": "1 * * *",
660
              "action": "create"
661
            }
662 1
        """
663 1
        data = get_json_or_400(request, self.controller.loop)
664 1
        schedule_id = request.path_params["schedule_id"]
665
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
666
667 1
        # Try to find a circuit schedule
668
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
669
670 1
        # Can not modify circuits deleted and archived
671 1
        if not found_schedule:
672 1
            result = f"schedule_id {schedule_id} not found"
673 1
            log.debug("update_schedule result %s %s", result, 404)
674
            raise HTTPException(404, detail=result)
675 1
676 1
        new_schedule = CircuitSchedule.from_dict(data)
677
        new_schedule.id = found_schedule.id
678 1
        # Remove the old schedule
679
        evc.circuit_scheduler.remove(found_schedule)
680 1
        # Append the modified schedule
681
        evc.circuit_scheduler.append(new_schedule)
682
683 1
        # Cancel all schedule jobs
684
        self.sched.cancel_job(found_schedule.id)
685 1
        # Add the new circuit schedule
686
        self.sched.add_circuit_job(evc, new_schedule)
687 1
        # Save EVC to mongodb
688
        evc.sync()
689 1
690 1
        result = new_schedule.as_dict()
691
        status = 200
692 1
693 1
        log.debug("update_schedule result %s %s", result, status)
694
        return JSONResponse(result, status_code=status)
695 1
696 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
697
    def delete_schedule(self, request: Request) -> JSONResponse:
698
        """Remove a circuit schedule.
699
700
        Remove the Schedule from EVC.
701
        Remove the Schedule from cron job.
702
        Save the EVC to the Storehouse.
703 1
        """
704 1
        schedule_id = request.path_params["schedule_id"]
705 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
706
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
707
708 1
        # Can not modify circuits deleted and archived
709 1
        if not found_schedule:
710 1
            result = f"schedule_id {schedule_id} not found"
711 1
            log.debug("delete_schedule result %s %s", result, 404)
712
            raise HTTPException(404, detail=result)
713
714 1
        # Remove the old schedule
715
        evc.circuit_scheduler.remove(found_schedule)
716
717 1
        # Cancel all schedule jobs
718
        self.sched.cancel_job(found_schedule.id)
719 1
        # Save EVC to mongodb
720
        evc.sync()
721 1
722 1
        result = "Schedule removed"
723
        status = 200
724 1
725 1
        log.debug("delete_schedule result %s %s", result, status)
726
        return JSONResponse(result, status_code=status)
727 1
728
    def _check_no_tag_duplication(
729
        self,
730
        evc_id: str,
731
        uni_a: Optional[UNI] = None,
732
        uni_z: Optional[UNI] = None
733
    ):
734
        """Check if the given EVC has UNIs with no tag and if these are
735
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
736
        Args:
737
            evc (dict): EVC to be analyzed.
738
        """
739
740 1
        # No UNIs
741 1
        if not (uni_a or uni_z):
742
            return
743 1
744
        if (not (uni_a and not uni_a.user_tag) and
745 1
                not (uni_z and not uni_z.user_tag)):
746 1
            return
747 1
        for circuit in self.circuits.copy().values():
748 1
            if (not circuit.archived and circuit._id != evc_id):
749 1
                if uni_a and uni_a.user_tag is None:
750 1
                    circuit.check_no_tag_duplicate(uni_a)
751 1
                if uni_z and uni_z.user_tag is None:
752
                    circuit.check_no_tag_duplicate(uni_z)
753 1
754 1
    @listen_to("kytos/topology.link_up")
755
    def on_link_up(self, event):
756
        """Change circuit when link is up or end_maintenance."""
757
        self.handle_link_up(event)
758 1
759
    def handle_link_up(self, event):
760 1
        """Change circuit when link is up or end_maintenance."""
761 1
        log.info("Event handle_link_up %s", event.content["link"])
762 1
        for evc in self.get_evcs_by_svc_level():
763 1
            if evc.is_enabled() and not evc.archived:
764 1
                with evc.lock:
765
                    evc.handle_link_up(event.content["link"])
766
767 1
    # Possibly replace this with interruptions?
768
    @listen_to(
769
        '.*.switch.interface.(link_up|link_down|created|deleted)'
770 1
    )
771
    def on_interface_link_change(self, event: KytosEvent):
772
        """
773
        Handler for interface link_up and link_down events.
774
        """
775
        self.handle_on_interface_link_change(event)
776 1
777
    def handle_on_interface_link_change(self, event: KytosEvent):
778
        """
779
        Handler to sort interface events {link_(up, down), create, deleted}
780
781
        To avoid multiple database updated (link flap):
782
        Every interface is identfied and processed in parallel.
783
        Once an interface event is received a time is started.
784
        While time is running self._intf_events will be updated.
785
        After time has passed last received event will be processed.
786 1
        """
787 1
        iface = event.content.get("interface")
788 1
        with self._lock_interfaces[iface.id]:
789
            _now = event.timestamp
790 1
            # Return out of order events
791
            if (
792
                iface.id in self._intf_events
793
                and self._intf_events[iface.id]["event"].timestamp > _now
794 1
            ):
795 1
                return
796 1
            self._intf_events[iface.id].update({"event": event})
797 1
            if "last_acquired" in self._intf_events[iface.id]:
798 1
                return
799 1
            self._intf_events[iface.id].update({"last_acquired": now()})
800 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
801 1
        with self._lock_interfaces[iface.id]:
802 1
            event = self._intf_events[iface.id]["event"]
803 1
            self._intf_events[iface.id].pop('last_acquired', None)
804 1
            _, _, event_type = event.name.rpartition('.')
805 1
            if event_type in ('link_up', 'created'):
806 1
                self.handle_interface_link_up(iface)
807 1
            elif event_type in ('link_down', 'deleted'):
808
                self.handle_interface_link_down(iface)
809 1
810
    def handle_interface_link_up(self, interface):
811
        """
812
        Handler for interface link_up events
813
        """
814
        log.info("Event handle_interface_link_up %s", interface)
815
        for evc in self.get_evcs_by_svc_level():
816
            with evc.lock:
817
                evc.handle_interface_link_up(
818
                    interface
819
                )
820 1
821
    def handle_interface_link_down(self, interface):
822
        """
823
        Handler for interface link_down events
824
        """
825
        log.info("Event handle_interface_link_down %s", interface)
826
        for evc in self.get_evcs_by_svc_level():
827
            with evc.lock:
828
                evc.handle_interface_link_down(
829
                    interface
830
                )
831 1
832 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
833
    def on_link_down(self, event):
834
        """Change circuit when link is down or under_mantenance."""
835
        self.handle_link_down(event)
836
837
    # pylint: disable=too-many-branches
838 1
    # pylint: disable=too-many-locals
839
    def handle_link_down(self, event):
840 1
        """Change circuit when link is down or under_mantenance."""
841 1
        link = event.content["link"]
842 1
        log.info("Event handle_link_down %s", link)
843 1
844 1
        with ExitStack() as exit_stack:
845 1
            exit_stack.enter_context(self._lock)
846 1
            swap_to_failover = list[EVC]()
847
            redeploy = list[EVC]()
848 1
            clear_failover = list[EVC]()
849 1
            clear_old_path = list[EVC]()
850 1
            evc_dict = dict[str, EVC]()
851 1
            evcs_to_update = list[EVC]()
852
853
            flow_modifications = defaultdict[str, dict[str, dict[list]]](dict)
854 1
            event_contents = defaultdict[str, dict[str, dict]](dict)
855
856
            for evc in self.get_evcs_by_svc_level():
857
                evc_dict[evc.id] = evc
858 1
                with ExitStack() as sub_stack:
859 1
                    sub_stack.enter_context(evc.lock)
860 1
                    if all((
861 1
                        evc.is_affected_by_link(link),
862 1
                        evc.failover_path,
863 1
                        not evc.is_failover_path_affected_by_link(link)
864 1
                    )):
865
                        swap_to_failover.append(evc)
866 1
                    elif all((
867 1
                        evc.is_affected_by_link(link),
868 1
                        not evc.failover_path or
869
                        evc.is_failover_path_affected_by_link(link)
870
                    )):
871
                        redeploy.append(evc)
872 1
                    elif all((
873 1
                        not evc.is_affected_by_link(link),
874 1
                        evc.failover_path,
875 1
                        evc.is_failover_path_affected_by_link(link)
876 1
                    )):
877 1
                        clear_failover.append(evc)
878 1
                    else:
879
                        continue
880
881
                    exit_stack.push(sub_stack.pop_all())
882 1
883 1
            # Swap from current path to failover path
884 1
885 1
            for evc in swap_to_failover:
886
                try:
887 1
                    new_flows = evc.get_failover_flows()
888 1
                    evc.old_path = evc.current_path
889
                    evc.current_path = evc.failover_path
890 1
                    evc.failover_path = Path([])
891
                # pylint: disable=broad-except
892 1
                except Exception:
893 1
                    err = traceback.format_exc().replace("\n", ", ")
894
                    log.error(
895
                        "Ignore Failover path for "
896
                        f"{evc} due to error: {err}"
897
                    )
898
                    del flow_modifications[evc.id]
899 1
                    del event_contents[evc.id]
900 1
                    redeploy.append(evc)
901 1
                    continue
902 1
                flow_modifications[evc.id]["swap_to_failover"] = new_flows
903
                event_contents[evc.id]["swap_to_failover"] = map_evc_event_content(
904
                    evc,
905 1
                    flows=deepcopy(new_flows)
906 1
                )
907
                clear_old_path.append(evc)
908 1
909
            for evc in clear_failover:
910 1
                evc.old_path = evc.failover_path
911
                evc.failover_path = Path([])
912
                clear_old_path.append(evc)
913
914
            # Clear out old flows
915
916 1
            for evc in clear_old_path:
917 1
                try:
918
                    del_flows = prepare_delete_flow(
919
                        merge_flow_dicts(
920
                            evc._prepare_uni_flows(evc.old_path, skip_in=True),
921 1
                            evc._prepare_nni_flows(evc.old_path)
922
                        )
923 1
                    )
924 1
                    evc.old_path = Path([])
925 1
                # pylint: disable=broad-except
926 1
                except Exception:
927 1
                    err = traceback.format_exc().replace("\n", ", ")
928 1
                    log.error(f"Fail to remove {evc} old_path: {err}")
929
                    if "swap_to_failover" in flow_modifications[evc.id]:
930 1
                        evc.failover_path = evc.current_path
931 1
                        evc.current_path = evc.old_path
932 1
                    else:
933 1
                        evc.failover_path = evc.old_path
934 1
                    evc.old_path = Path([])
935 1
                    del flow_modifications[evc.id]
936
                    del event_contents[evc.id]
937
                    redeploy.append(evc)
938 1
                    continue
939 1
                flow_modifications[evc.id]["clear_old_path"] = new_flows
0 ignored issues
show
introduced by
The variable new_flows does not seem to be defined in case the for loop on line 885 is not entered. Are you sure this can never be the case?
Loading history...
940
                event_contents[evc.id]["clear_old_path"] = map_evc_event_content(
941
                    evc,
942
                    removed_flows=deepcopy(del_flows)
943 1
                )
944
945
            swap_to_failover_flows = {}
946
            swap_to_failover_event_contents = {}
947
948
            clear_old_path_flows = {}
949
            clear_old_path_event_contents = {}
950 1
951 1
            for modified_evc_id in flow_modifications:
952
                if "swap_to_failover" in flow_modifications[modified_evc_id]:
953
                    swap_to_failover_flows = merge_flow_dicts(
954
                        swap_to_failover_flows,
955 1
                        flow_modifications[modified_evc_id]["swap_to_failover"]
956
                    )
957 1
                    swap_to_failover_event_contents[modified_evc_id] =\
958 1
                        event_contents[modified_evc_id]["swap_to_failover"]
959 1
                if "clear_old_path" in flow_modifications[modified_evc_id]:
960 1
                    clear_old_path_flows = merge_flow_dicts(
961 1
                        clear_old_path_flows,
962 1
                        flow_modifications[modified_evc_id]["clear_old_path"]
963 1
                    )
964 1
                    clear_old_path_event_contents[modified_evc_id] =\
965 1
                        event_contents[modified_evc_id]["clear_old_path"]
966 1
                evcs_to_update.append(evc_dict[modified_evc_id])
967
968
            if swap_to_failover_flows:
969 1
                emit_event(
970
                    self.controller, "failover_link_down",
971
                    content=deepcopy(swap_to_failover_event_contents)
972 1
                )
973
                send_flow_mods_event(
974
                    self.controller,
975
                    swap_to_failover_flows,
976
                    'install'
977
                )
978
979
            if clear_old_path_flows:
980 1
                send_flow_mods_event(
981 1
                    self.controller,
982 1
                    clear_old_path_flows,
983
                    'delete'
984
                )
985
                emit_event(
986
                    self.controller,
987 1
                    "failover_old_path",
988 1
                    content=clear_old_path_event_contents
989 1
                )
990 1
991 1
            # Handle redeploys
992
            # This uses a separate syncing mechanism here, so don't push to DB.
993
            # TODO: replace with bulk update mechanism
994 1
995 1
            for evc in redeploy:
996
                result = evc.handle_link_down()
997
                event_name = "error_redeploy_link_down"
998
                if result:
999 1
                    log.info(f"{evc} redeployed due to link down {link.id}")
1000
                    event_name = "redeployed_link_down"
1001 1
                emit_event(
1002 1
                    self.controller,
1003 1
                    event_name,
1004 1
                    content=map_evc_event_content(evc)
1005 1
                )
1006
1007
            # Push update to DB
1008 1
1009
            if evcs_to_update:
1010 1
                self.mongo_controller.update_evcs(
1011 1
                    [evc.as_dict() for evc in evcs_to_update]
1012 1
                )
1013 1
1014
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1015
    def on_evc_affected_by_link_down(self, event):
1016 1
        """Change circuit when link is down or under_mantenance."""
1017 1
        self.handle_evc_affected_by_link_down(event)
1018 1
1019
    def handle_evc_affected_by_link_down(self, event):
1020 1
        """Change circuit when link is down or under_mantenance."""
1021 1
        evc = self.circuits.get(event.content["evc_id"])
1022 1
        link = event.content['link']
1023
        if not evc:
1024 1
            return
1025 1
        with evc.lock:
1026
            if not evc.is_affected_by_link(link):
1027
                return
1028
            result = evc.handle_link_down()
1029 1
        event_name = "error_redeploy_link_down"
1030
        if result:
1031 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1032 1
            event_name = "redeployed_link_down"
1033 1
        emit_event(self.controller, event_name,
1034
                   content=map_evc_event_content(evc))
1035 1
1036 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1037 1
    def on_evc_deployed(self, event):
1038 1
        """Handle EVC deployed|redeployed_link_down."""
1039 1
        self.handle_evc_deployed(event)
1040
1041 1
    def handle_evc_deployed(self, event):
1042
        """Setup failover path on evc deployed."""
1043
        evc = self.circuits.get(event.content["evc_id"])
1044
        if not evc:
1045
            return
1046 1
        evc.try_setup_failover_path()
1047 1
1048
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
1049
    def on_cleanup_evcs_old_path(self, event):
1050 1
        """Handle cleanup evcs old path."""
1051 1
        self.handle_cleanup_evcs_old_path(event)
1052 1
1053 1
    def handle_cleanup_evcs_old_path(self, event):
1054 1
        """Handle cleanup evcs old path."""
1055 1
        evcs = event.content.get("evcs", [])
1056
        event_contents: dict[str, dict] = defaultdict(list)
1057 1
        total_flows = {}
1058 1
        for evc in evcs:
1059 1
            if not evc.old_path:
1060 1
                continue
1061
            with evc.lock:
1062
                removed_flows = {}
1063
                try:
1064
                    nni_flows = prepare_delete_flow(
1065
                        evc._prepare_nni_flows(evc.old_path)
1066
                    )
1067
                    uni_flows = prepare_delete_flow(
1068 1
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
1069 1
                    )
1070
                    removed_flows = merge_flow_dicts(
1071
                        nni_flows, uni_flows
1072
                    )
1073
                # pylint: disable=broad-except
1074
                except Exception:
1075
                    err = traceback.format_exc().replace("\n", ", ")
1076 1
                    log.error(f"Fail to remove {evc} old_path: {err}")
1077 1
                    continue
1078
                if removed_flows:
1079
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
1080
                    content = map_evc_event_content(
1081 1
                        evc,
1082
                        removed_flows=deepcopy(removed_flows),
1083 1
                        current_path=evc.current_path.as_dict(),
1084 1
                    )
1085 1
                    event_contents[evc.id] = content
1086 1
                    evc.old_path = Path([])
1087
        if event_contents:
1088 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
1089
            emit_event(self.controller, "failover_old_path",
1090 1
                       content=event_contents)
1091 1
1092
    @listen_to("kytos/topology.topology_loaded")
1093 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1094 1
        """Load EVCs once the topology is available."""
1095 1
        self.load_all_evcs()
1096 1
1097
    def load_all_evcs(self):
1098
        """Try to load all EVCs on startup."""
1099
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1100 1
        for circuit_id, circuit in circuits:
1101 1
            if circuit_id not in self.circuits:
1102 1
                self._load_evc(circuit)
1103 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1104 1
                   timeout=1)
1105 1
1106 1
    def _load_evc(self, circuit_dict):
1107 1
        """Load one EVC from mongodb to memory."""
1108 1
        try:
1109 1
            evc = self._evc_from_dict(circuit_dict)
1110 1
        except (ValueError, KytosTagError) as exception:
1111
            log.error(
1112 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1113
            )
1114 1
            return None
1115 1
        if evc.archived:
1116 1
            return None
1117
1118 1
        self.circuits.setdefault(evc.id, evc)
1119
        self.sched.add(evc)
1120 1
        return evc
1121 1
1122
    @listen_to("kytos/flow_manager.flow.error")
1123 1
    def on_flow_mod_error(self, event):
1124 1
        """Handle flow mod errors related to an EVC."""
1125 1
        self.handle_flow_mod_error(event)
1126 1
1127 1
    def handle_flow_mod_error(self, event):
1128 1
        """Handle flow mod errors related to an EVC."""
1129
        flow = event.content["flow"]
1130
        command = event.content.get("error_command")
1131
        if command != "add":
1132 1
            return
1133 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1134 1
        if evc:
1135 1
            with evc.lock:
1136
                evc.remove_current_flows(sync=False)
1137 1
                evc.remove_failover_flows(sync=True)
1138 1
1139 1
    def _evc_dict_with_instances(self, evc_dict):
1140 1
        """Convert some dict values to instance of EVC classes.
1141
1142
        This method will convert: [UNI, Link]
1143 1
        """
1144 1
        data = evc_dict.copy()  # Do not modify the original dict
1145
        for attribute, value in data.items():
1146 1
            # Get multiple attributes.
1147
            # Ex: uni_a, uni_z
1148
            if "uni" in attribute:
1149
                try:
1150
                    data[attribute] = self._uni_from_dict(value)
1151
                except ValueError as exception:
1152
                    result = "Error creating UNI: Invalid value"
1153 1
                    raise ValueError(result) from exception
1154 1
1155 1
            if attribute == "circuit_scheduler":
1156
                data[attribute] = []
1157
                for schedule in value:
1158 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1159 1
1160 1
            # Get multiple attributes.
1161 1
            # Ex: primary_links,
1162 1
            #     backup_links,
1163 1
            #     current_links_cache,
1164 1
            #     primary_links_cache,
1165 1
            #     backup_links_cache
1166 1
            if "links" in attribute:
1167
                data[attribute] = [
1168 1
                    self._link_from_dict(link, attribute) for link in value
1169
                ]
1170
1171
            # Ex: current_path,
1172
            #     primary_path,
1173
            #     backup_path
1174 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1175
                data[attribute] = Path(
1176 1
                    [self._link_from_dict(link, attribute) for link in value]
1177 1
                )
1178 1
1179 1
        return data
1180 1
1181
    def _evc_from_dict(self, evc_dict):
1182
        data = self._evc_dict_with_instances(evc_dict)
1183 1
        data["table_group"] = self.table_group
1184 1
        return EVC(self.controller, **data)
1185
1186 1
    def _uni_from_dict(self, uni_dict):
1187 1
        """Return a UNI object from python dict."""
1188 1
        if uni_dict is None:
1189 1
            return False
1190 1
1191 1
        interface_id = uni_dict.get("interface_id")
1192
        interface = self.controller.get_interface_by_id(interface_id)
1193
        if interface is None:
1194 1
            result = (
1195 1
                "Error creating UNI:"
1196 1
                + f"Could not instantiate interface {interface_id}"
1197 1
            )
1198 1
            raise ValueError(result) from ValueError
1199
        tag_convert = {1: "vlan"}
1200
        tag_dict = uni_dict.get("tag", None)
1201
        if tag_dict:
1202
            tag_type = tag_dict.get("tag_type")
1203
            tag_type = tag_convert.get(tag_type, tag_type)
1204
            tag_value = tag_dict.get("value")
1205
            if isinstance(tag_value, list):
1206
                tag_value = get_tag_ranges(tag_value)
1207
                mask_list = get_vlan_tags_and_masks(tag_value)
1208
                tag = TAGRange(tag_type, tag_value, mask_list)
1209
            else:
1210
                tag = TAG(tag_type, tag_value)
1211
        else:
1212
            tag = None
1213
        uni = UNI(interface, tag)
1214
        return uni
1215
1216
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1217
        """Return a Link object from python dict."""
1218
        id_a = link_dict.get("endpoint_a").get("id")
1219
        id_b = link_dict.get("endpoint_b").get("id")
1220
1221
        endpoint_a = self.controller.get_interface_by_id(id_a)
1222
        endpoint_b = self.controller.get_interface_by_id(id_b)
1223
        if not endpoint_a:
1224
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1225
            raise ValueError(error_msg)
1226
        if not endpoint_b:
1227
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1228
            raise ValueError(error_msg)
1229
1230
        link = Link(endpoint_a, endpoint_b)
1231
        allowed_paths = {"current_path", "failover_path"}
1232
        if "metadata" in link_dict and attribute in allowed_paths:
1233
            link.extend_metadata(link_dict.get("metadata"))
1234
1235
        s_vlan = link.get_metadata("s_vlan")
1236
        if s_vlan:
1237
            tag = TAG.from_dict(s_vlan)
1238
            if tag is False:
1239
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1240
                raise ValueError(error_msg)
1241
            link.update_metadata("s_vlan", tag)
1242
        return link
1243
1244
    def _find_evc_by_schedule_id(self, schedule_id):
1245
        """
1246
        Find an EVC and CircuitSchedule based on schedule_id.
1247
1248
        :param schedule_id: Schedule ID
1249
        :return: EVC and Schedule
1250
        """
1251
        circuits = self._get_circuits_buffer()
1252
        found_schedule = None
1253
        evc = None
1254
1255
        # pylint: disable=unused-variable
1256
        for c_id, circuit in circuits.items():
1257
            for schedule in circuit.circuit_scheduler:
1258
                if schedule.id == schedule_id:
1259
                    found_schedule = schedule
1260
                    evc = circuit
1261
                    break
1262
            if found_schedule:
1263
                break
1264
        return evc, found_schedule
1265
1266
    def _get_circuits_buffer(self):
1267
        """
1268
        Return the circuit buffer.
1269
1270
        If the buffer is empty, try to load data from mongodb.
1271
        """
1272
        if not self.circuits:
1273
            # Load circuits from mongodb to buffer
1274
            circuits = self.mongo_controller.get_circuits()['circuits']
1275
            for c_id, circuit in circuits.items():
1276
                evc = self._evc_from_dict(circuit)
1277
                self.circuits[c_id] = evc
1278
        return self.circuits
1279
1280
    # pylint: disable=attribute-defined-outside-init
1281
    @alisten_to("kytos/of_multi_table.enable_table")
1282
    async def on_table_enabled(self, event):
1283
        """Handle a recently table enabled."""
1284
        table_group = event.content.get("mef_eline", None)
1285
        if not table_group:
1286
            return
1287
        for group in table_group:
1288
            if group not in settings.TABLE_GROUP_ALLOWED:
1289
                log.error(f'The table group "{group}" is not allowed for '
1290
                          f'mef_eline. Allowed table groups are '
1291
                          f'{settings.TABLE_GROUP_ALLOWED}')
1292
                return
1293
        self.table_group.update(table_group)
1294
        content = {"group_table": self.table_group}
1295
        name = "kytos/mef_eline.enable_table"
1296
        await aemit_event(self.controller, name, content)
1297