Passed
Push — master ( 71c496...12cb0f )
by Aldo
03:10 queued 15s
created

build.main   F

Complexity

Total Complexity 213

Size/Duplication

Total Lines 1199
Duplicated Lines 3.5 %

Test Coverage

Coverage 92.58%

Importance

Changes 0
Metric Value
eloc 805
dl 42
loc 1199
ccs 649
cts 701
cp 0.9258
rs 1.795
c 0
b 0
f 0
wmc 213

52 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.add_metadata() 0 19 3
A Main.get_metadata() 0 13 2
A Main.bulk_add_metadata() 20 20 4
A Main.on_link_down() 0 4 1
A Main.on_evc_deployed() 0 4 1
A Main._use_uni_tags() 0 10 2
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main.on_cleanup_evcs_old_path() 0 4 1
A Main.handle_evc_deployed() 0 6 2
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.handle_evc_affected_by_link_down() 0 16 5
A Main.list_circuits() 0 15 1
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.setup() 0 30 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 13 4
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 114 14
B Main.list_schedules() 0 31 5
C Main.should_be_checked() 0 16 9
B Main.execute_consistency() 0 22 8
A Main.delete_metadata() 0 16 2
F Main.update() 0 61 15
A Main.bulk_delete_metadata() 22 22 4
A Main.delete_circuit() 0 38 4
B Main.handle_cleanup_evcs_old_path() 0 38 7
C Main._evc_dict_with_instances() 0 41 9
A Main.update_schedule() 0 47 2
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 8 3
A Main.handle_interface_link_up() 0 9 3
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.delete_schedule() 0 31 2
D Main.handle_link_down() 0 76 13
B Main._link_from_dict() 0 27 7
B Main._uni_from_dict() 0 29 5
B Main.redeploy() 0 38 6
A Main.handle_flow_mod_error() 0 11 4
A Main.handle_interface_link_down() 0 9 3
B Main.handle_on_interface_link_change() 0 32 8
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
F Main._check_no_tag_duplication() 0 25 14
A Main._load_evc() 0 15 3
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 56 3
A Main._get_circuits_buffer() 0 13 3
A Main.on_interface_link_change() 0 8 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from copy import deepcopy
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts, prepare_delete_flow,
36
                                         send_flow_mods_event)
37
38
39
# pylint: disable=too-many-public-methods
40 1
class Main(KytosNApp):
41
    """Main class of amlight/mef_eline NApp.
42
43
    This class is the entry point for this napp.
44
    """
45
46 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
47
48 1
    def setup(self):
49
        """Replace the '__init__' method for the KytosNApp subclass.
50
51
        The setup method is automatically called by the controller when your
52
        application is loaded.
53
54
        So, if you have any setup routine, insert it here.
55
        """
56
        # object used to scheduler circuit events
57 1
        self.sched = Scheduler()
58
59
        # object to save and load circuits
60 1
        self.mongo_controller = self.get_eline_controller()
61 1
        self.mongo_controller.bootstrap_indexes()
62
63
        # set the controller that will manager the dynamic paths
64 1
        DynamicPathManager.set_controller(self.controller)
65
66
        # dictionary of EVCs created. It acts as a circuit buffer.
67
        # Every create/update/delete must be synced to mongodb.
68 1
        self.circuits = {}
69
70 1
        self._intf_events = defaultdict(dict)
71 1
        self._lock_interfaces = defaultdict(Lock)
72 1
        self.table_group = {"epl": 0, "evpl": 0}
73 1
        self._lock = Lock()
74 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
75
76 1
        self.load_all_evcs()
77 1
        self._topology_updated_at = None
78
79 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
80
        """Get circuits sorted by desc service level and asc creation_time.
81
82
        In the future, as more ops are offloaded it should be get from the DB.
83
        """
84 1
        if enable_filter:
85 1
            return sorted(
86
                          [circuit for circuit in self.circuits.values()
87
                           if circuit.is_enabled()],
88
                          key=lambda x: (-x.service_level, x.creation_time),
89
            )
90 1
        return sorted(self.circuits.values(),
91
                      key=lambda x: (-x.service_level, x.creation_time))
92
93 1
    @staticmethod
94 1
    def get_eline_controller():
95
        """Return the ELineController instance."""
96
        return controllers.ELineController()
97
98 1
    def execute(self):
99
        """Execute once when the napp is running."""
100 1
        if self._lock.locked():
101 1
            return
102 1
        log.debug("Starting consistency routine")
103 1
        with self._lock:
104 1
            self.execute_consistency()
105 1
        log.debug("Finished consistency routine")
106
107 1
    def should_be_checked(self, circuit):
108
        "Verify if the circuit meets the necessary conditions to be checked"
109
        # pylint: disable=too-many-boolean-expressions
110 1
        if (
111
                circuit.is_enabled()
112
                and not circuit.is_active()
113
                and not circuit.lock.locked()
114
                and not circuit.has_recent_removed_flow()
115
                and not circuit.is_recent_updated()
116
                and circuit.are_unis_active(self.controller.switches)
117
                # if a inter-switch EVC does not have current_path, it does not
118
                # make sense to run sdntrace on it
119
                and (circuit.is_intra_switch() or circuit.current_path)
120
                ):
121 1
            return True
122
        return False
123
124 1
    def execute_consistency(self):
125
        """Execute consistency routine."""
126 1
        circuits_to_check = []
127 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
            circuit.try_setup_failover_path()
131 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
132 1
        for circuit in circuits_to_check:
133 1
            is_checked = circuits_checked.get(circuit.id)
134 1
            if is_checked:
135 1
                circuit.execution_rounds = 0
136 1
                log.info(f"{circuit} enabled but inactive - activating")
137 1
                with circuit.lock:
138 1
                    circuit.activate()
139 1
                    circuit.sync()
140
            else:
141 1
                circuit.execution_rounds += 1
142 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
143 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
144 1
                    with circuit.lock:
145 1
                        circuit.deploy()
146
147 1
    def shutdown(self):
148
        """Execute when your napp is unloaded.
149
150
        If you have some cleanup procedure, insert it here.
151
        """
152
153 1
    @rest("/v2/evc/", methods=["GET"])
154 1
    def list_circuits(self, request: Request) -> JSONResponse:
155
        """Endpoint to return circuits stored.
156
157
        archive query arg if defined (not null) will be filtered
158
        accordingly, by default only non archived evcs will be listed
159
        """
160 1
        log.debug("list_circuits /v2/evc")
161 1
        args = request.query_params
162 1
        archived = args.get("archived", "false").lower()
163 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
164 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
165
                                                      metadata=args)
166 1
        circuits = circuits['circuits']
167 1
        return JSONResponse(circuits)
168
169 1
    @rest("/v2/evc/schedule", methods=["GET"])
170 1
    def list_schedules(self, _request: Request) -> JSONResponse:
171
        """Endpoint to return all schedules stored for all circuits.
172
173
        Return a JSON with the following template:
174
        [{"schedule_id": <schedule_id>,
175
         "circuit_id": <circuit_id>,
176
         "schedule": <schedule object>}]
177
        """
178 1
        log.debug("list_schedules /v2/evc/schedule")
179 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
180 1
        if not circuits:
181 1
            result = {}
182 1
            status = 200
183 1
            return JSONResponse(result, status_code=status)
184
185 1
        result = []
186 1
        status = 200
187 1
        for circuit in circuits:
188 1
            circuit_scheduler = circuit.get("circuit_scheduler")
189 1
            if circuit_scheduler:
190 1
                for scheduler in circuit_scheduler:
191 1
                    value = {
192
                        "schedule_id": scheduler.get("id"),
193
                        "circuit_id": circuit.get("id"),
194
                        "schedule": scheduler,
195
                    }
196 1
                    result.append(value)
197
198 1
        log.debug("list_schedules result %s %s", result, status)
199 1
        return JSONResponse(result, status_code=status)
200
201 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
202 1
    def get_circuit(self, request: Request) -> JSONResponse:
203
        """Endpoint to return a circuit based on id."""
204 1
        circuit_id = request.path_params["circuit_id"]
205 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
206 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
207 1
        if not circuit:
208 1
            result = f"circuit_id {circuit_id} not found"
209 1
            log.debug("get_circuit result %s %s", result, 404)
210 1
            raise HTTPException(404, detail=result)
211 1
        status = 200
212 1
        log.debug("get_circuit result %s %s", circuit, status)
213 1
        return JSONResponse(circuit, status_code=status)
214
215
    # pylint: disable=too-many-branches, too-many-statements
216 1
    @rest("/v2/evc/", methods=["POST"])
217 1
    @validate_openapi(spec)
218 1
    def create_circuit(self, request: Request) -> JSONResponse:
219
        """Try to create a new circuit.
220
221
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
222
        UNI_Z's requested C-VID are available from the interfaces' pools. This
223
        is checked when creating the UNI object.
224
225
        Then, E-Line NApp requests a primary and a backup path to the
226
        Pathfinder NApp using the attributes primary_links and backup_links
227
        submitted via REST
228
229
        # For each link composing paths in #3:
230
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
231
        #  - Using the S-VID obtained, generate abstract flow entries to be
232
        #    sent to FlowManager
233
234
        Push abstract flow entries to FlowManager and FlowManager pushes
235
        OpenFlow entries to datapaths
236
237
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
238
        creation
239
240
        Finnaly, notify user of the status of its request.
241
        """
242
        # Try to create the circuit object
243 1
        log.debug("create_circuit /v2/evc/")
244 1
        data = get_json_or_400(request, self.controller.loop)
245
246 1
        try:
247 1
            evc = self._evc_from_dict(data)
248 1
        except (ValueError, KytosTagError) as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 400)
250 1
            raise HTTPException(400, detail=str(exception)) from exception
251 1
        try:
252 1
            check_disabled_component(evc.uni_a, evc.uni_z)
253 1
        except DisabledSwitch as exception:
254 1
            log.debug("create_circuit result %s %s", exception, 409)
255 1
            raise HTTPException(
256
                    409,
257
                    detail=f"Path is not valid: {exception}"
258
                ) from exception
259
260 1
        if evc.primary_path:
261 1
            try:
262 1
                evc.primary_path.is_valid(
263
                    evc.uni_a.interface.switch,
264
                    evc.uni_z.interface.switch,
265
                    bool(evc.circuit_scheduler),
266
                )
267 1
            except InvalidPath as exception:
268 1
                raise HTTPException(
269
                    400,
270
                    detail=f"primary_path is not valid: {exception}"
271
                ) from exception
272 1
        if evc.backup_path:
273 1
            try:
274 1
                evc.backup_path.is_valid(
275
                    evc.uni_a.interface.switch,
276
                    evc.uni_z.interface.switch,
277
                    bool(evc.circuit_scheduler),
278
                )
279 1
            except InvalidPath as exception:
280 1
                raise HTTPException(
281
                    400,
282
                    detail=f"backup_path is not valid: {exception}"
283
                ) from exception
284
285 1
        if not evc._tag_lists_equal():
286 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
287 1
            raise HTTPException(400, detail=detail)
288
289 1
        try:
290 1
            evc._validate_has_primary_or_dynamic()
291 1
        except ValueError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
296
        except DuplicatedNoTagUNI as exception:
297
            log.debug("create_circuit result %s %s", exception, 409)
298
            raise HTTPException(409, detail=str(exception)) from exception
299
300 1
        try:
301 1
            self._use_uni_tags(evc)
302 1
        except KytosTagError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # save circuit
306 1
        try:
307 1
            evc.sync()
308
        except ValidationError as exception:
309
            raise HTTPException(400, detail=str(exception)) from exception
310
311
        # store circuit in dictionary
312 1
        self.circuits[evc.id] = evc
313
314
        # Schedule the circuit deploy
315 1
        self.sched.add(evc)
316
317
        # Circuit has no schedule, deploy now
318 1
        deployed = False
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                deployed = evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id, "deployed": deployed}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392 1
        redeployed = False
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    redeployed = evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    redeployed = evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    redeployed = evc.deploy()
409 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432 1
        log.info("Removing %s", evc)
433
434 1
        with evc.lock:
435 1
            if not evc.archived:
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.remove_current_flows(sync=False)
440 1
                evc.remove_failover_flows(sync=False)
441 1
                evc.archive()
442 1
                evc.remove_uni_tags()
443 1
                evc.sync()
444 1
                emit_event(
445
                    self.controller, "deleted",
446
                    content=map_evc_event_content(evc)
447
                )
448
449 1
        log.info("EVC removed. %s", evc)
450 1
        result = {"response": f"Circuit {circuit_id} removed"}
451 1
        status = 200
452 1
        log.debug("delete_circuit result %s %s", result, status)
453
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs_metadata(
519
            circuit_ids, {key: ""}, "del"
520
        )
521
522 1
        fail_evcs = []
523 1
        for _id in circuit_ids:
524 1
            try:
525 1
                evc = self.circuits[_id]
526 1
                evc.remove_metadata(key)
527 1
            except KeyError:
528 1
                fail_evcs.append(_id)
529
530 1
        if fail_evcs:
531 1
            raise HTTPException(404, detail=fail_evcs)
532 1
        return JSONResponse("Operation successful")
533
534 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
535 1
    def delete_metadata(self, request: Request) -> JSONResponse:
536
        """Delete metadata from an EVC."""
537 1
        circuit_id = request.path_params["circuit_id"]
538 1
        key = request.path_params["key"]
539 1
        try:
540 1
            evc = self.circuits[circuit_id]
541 1
        except KeyError as error:
542 1
            raise HTTPException(
543
                404,
544
                detail=f"circuit_id {circuit_id} not found."
545
            ) from error
546
547 1
        evc.remove_metadata(key)
548 1
        evc.sync()
549 1
        return JSONResponse("Operation successful")
550
551 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
552 1
    def redeploy(self, request: Request) -> JSONResponse:
553
        """Endpoint to force the redeployment of an EVC."""
554 1
        circuit_id = request.path_params["circuit_id"]
555 1
        try_avoid_same_s_vlan = request.query_params.get(
556
            "try_avoid_same_s_vlan", "true"
557
        )
558 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
559 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
560
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
561
            raise HTTPException(400, detail=msg)
562 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
563 1
        try:
564 1
            evc = self.circuits[circuit_id]
565 1
        except KeyError:
566 1
            raise HTTPException(
567
                404,
568
                detail=f"circuit_id {circuit_id} not found"
569
            ) from KeyError
570 1
        deployed = False
571 1
        if evc.is_enabled():
572 1
            with evc.lock:
573 1
                path_dict = evc.remove_current_flows(
574
                    sync=False,
575
                    return_path=try_avoid_same_s_vlan == "true"
576
                )
577 1
                evc.remove_failover_flows(sync=True)
578 1
                deployed = evc.deploy(path_dict)
579 1
        if deployed:
580 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
581 1
            status = 202
582
        else:
583 1
            result = {
584
                "response": f"Circuit {circuit_id} is disabled."
585
            }
586 1
            status = 409
587
588 1
        return JSONResponse(result, status_code=status)
589
590 1
    @rest("/v2/evc/schedule/", methods=["POST"])
591 1
    @validate_openapi(spec)
592 1
    def create_schedule(self, request: Request) -> JSONResponse:
593
        """
594
        Create a new schedule for a given circuit.
595
596
        This service do no check if there are conflicts with another schedule.
597
        Payload example:
598
            {
599
              "circuit_id":"aa:bb:cc",
600
              "schedule": {
601
                "date": "2019-08-07T14:52:10.967Z",
602
                "interval": "string",
603
                "frequency": "1 * * * *",
604
                "action": "create"
605
              }
606
            }
607
        """
608 1
        log.debug("create_schedule /v2/evc/schedule/")
609 1
        data = get_json_or_400(request, self.controller.loop)
610 1
        circuit_id = data["circuit_id"]
611 1
        schedule_data = data["schedule"]
612
613
        # Get EVC from circuits buffer
614 1
        circuits = self._get_circuits_buffer()
615
616
        # get the circuit
617 1
        evc = circuits.get(circuit_id)
618
619
        # get the circuit
620 1
        if not evc:
621 1
            result = f"circuit_id {circuit_id} not found"
622 1
            log.debug("create_schedule result %s %s", result, 404)
623 1
            raise HTTPException(404, detail=result)
624
625
        # new schedule from dict
626 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
627
628
        # If there is no schedule, create the list
629 1
        if not evc.circuit_scheduler:
630 1
            evc.circuit_scheduler = []
631
632
        # Add the new schedule
633 1
        evc.circuit_scheduler.append(new_schedule)
634
635
        # Add schedule job
636 1
        self.sched.add_circuit_job(evc, new_schedule)
637
638
        # save circuit to mongodb
639 1
        evc.sync()
640
641 1
        result = new_schedule.as_dict()
642 1
        status = 201
643
644 1
        log.debug("create_schedule result %s %s", result, status)
645 1
        return JSONResponse(result, status_code=status)
646
647 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
648 1
    @validate_openapi(spec)
649 1
    def update_schedule(self, request: Request) -> JSONResponse:
650
        """Update a schedule.
651
652
        Change all attributes from the given schedule from a EVC circuit.
653
        The schedule ID is preserved as default.
654
        Payload example:
655
            {
656
              "date": "2019-08-07T14:52:10.967Z",
657
              "interval": "string",
658
              "frequency": "1 * * *",
659
              "action": "create"
660
            }
661
        """
662 1
        data = get_json_or_400(request, self.controller.loop)
663 1
        schedule_id = request.path_params["schedule_id"]
664 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
665
666
        # Try to find a circuit schedule
667 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
668
669
        # Can not modify circuits deleted and archived
670 1
        if not found_schedule:
671 1
            result = f"schedule_id {schedule_id} not found"
672 1
            log.debug("update_schedule result %s %s", result, 404)
673 1
            raise HTTPException(404, detail=result)
674
675 1
        new_schedule = CircuitSchedule.from_dict(data)
676 1
        new_schedule.id = found_schedule.id
677
        # Remove the old schedule
678 1
        evc.circuit_scheduler.remove(found_schedule)
679
        # Append the modified schedule
680 1
        evc.circuit_scheduler.append(new_schedule)
681
682
        # Cancel all schedule jobs
683 1
        self.sched.cancel_job(found_schedule.id)
684
        # Add the new circuit schedule
685 1
        self.sched.add_circuit_job(evc, new_schedule)
686
        # Save EVC to mongodb
687 1
        evc.sync()
688
689 1
        result = new_schedule.as_dict()
690 1
        status = 200
691
692 1
        log.debug("update_schedule result %s %s", result, status)
693 1
        return JSONResponse(result, status_code=status)
694
695 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
696 1
    def delete_schedule(self, request: Request) -> JSONResponse:
697
        """Remove a circuit schedule.
698
699
        Remove the Schedule from EVC.
700
        Remove the Schedule from cron job.
701
        Save the EVC to the Storehouse.
702
        """
703 1
        schedule_id = request.path_params["schedule_id"]
704 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
705 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
706
707
        # Can not modify circuits deleted and archived
708 1
        if not found_schedule:
709 1
            result = f"schedule_id {schedule_id} not found"
710 1
            log.debug("delete_schedule result %s %s", result, 404)
711 1
            raise HTTPException(404, detail=result)
712
713
        # Remove the old schedule
714 1
        evc.circuit_scheduler.remove(found_schedule)
715
716
        # Cancel all schedule jobs
717 1
        self.sched.cancel_job(found_schedule.id)
718
        # Save EVC to mongodb
719 1
        evc.sync()
720
721 1
        result = "Schedule removed"
722 1
        status = 200
723
724 1
        log.debug("delete_schedule result %s %s", result, status)
725 1
        return JSONResponse(result, status_code=status)
726
727 1
    def _check_no_tag_duplication(
728
        self,
729
        evc_id: str,
730
        uni_a: Optional[UNI] = None,
731
        uni_z: Optional[UNI] = None
732
    ):
733
        """Check if the given EVC has UNIs with no tag and if these are
734
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
735
        Args:
736
            evc (dict): EVC to be analyzed.
737
        """
738
739
        # No UNIs
740 1
        if not (uni_a or uni_z):
741 1
            return
742
743 1
        if (not (uni_a and not uni_a.user_tag) and
744
                not (uni_z and not uni_z.user_tag)):
745 1
            return
746 1
        for circuit in self.circuits.copy().values():
747 1
            if (not circuit.archived and circuit._id != evc_id):
748 1
                if uni_a and uni_a.user_tag is None:
749 1
                    circuit.check_no_tag_duplicate(uni_a)
750 1
                if uni_z and uni_z.user_tag is None:
751 1
                    circuit.check_no_tag_duplicate(uni_z)
752
753 1
    @listen_to("kytos/topology.link_up")
754 1
    def on_link_up(self, event):
755
        """Change circuit when link is up or end_maintenance."""
756
        self.handle_link_up(event)
757
758 1
    def handle_link_up(self, event):
759
        """Change circuit when link is up or end_maintenance."""
760 1
        log.info("Event handle_link_up %s", event.content["link"])
761 1
        for evc in self.get_evcs_by_svc_level():
762 1
            if evc.is_enabled() and not evc.archived:
763 1
                with evc.lock:
764 1
                    evc.handle_link_up(event.content["link"])
765
766
    # Possibly replace this with interruptions?
767 1
    @listen_to(
768
        '.*.switch.interface.(link_up|link_down|created|deleted)'
769
    )
770 1
    def on_interface_link_change(self, event: KytosEvent):
771
        """
772
        Handler for interface link_up and link_down events.
773
        """
774
        self.handle_on_interface_link_change(event)
775
776 1
    def handle_on_interface_link_change(self, event: KytosEvent):
777
        """
778
        Handler to sort interface events {link_(up, down), create, deleted}
779
780
        To avoid multiple database updated (link flap):
781
        Every interface is identfied and processed in parallel.
782
        Once an interface event is received a time is started.
783
        While time is running self._intf_events will be updated.
784
        After time has passed last received event will be processed.
785
        """
786 1
        iface = event.content.get("interface")
787 1
        with self._lock_interfaces[iface.id]:
788 1
            _now = event.timestamp
789
            # Return out of order events
790 1
            if (
791
                iface.id in self._intf_events
792
                and self._intf_events[iface.id]["event"].timestamp > _now
793
            ):
794 1
                return
795 1
            self._intf_events[iface.id].update({"event": event})
796 1
            if "last_acquired" in self._intf_events[iface.id]:
797 1
                return
798 1
            self._intf_events[iface.id].update({"last_acquired": now()})
799 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
800 1
        with self._lock_interfaces[iface.id]:
801 1
            event = self._intf_events[iface.id]["event"]
802 1
            self._intf_events[iface.id].pop('last_acquired', None)
803 1
            _, _, event_type = event.name.rpartition('.')
804 1
            if event_type in ('link_up', 'created'):
805 1
                self.handle_interface_link_up(iface)
806 1
            elif event_type in ('link_down', 'deleted'):
807 1
                self.handle_interface_link_down(iface)
808
809 1
    def handle_interface_link_up(self, interface):
810
        """
811
        Handler for interface link_up events
812
        """
813
        log.info("Event handle_interface_link_up %s", interface)
814
        for evc in self.get_evcs_by_svc_level():
815
            with evc.lock:
816
                evc.handle_interface_link_up(
817
                    interface
818
                )
819
820 1
    def handle_interface_link_down(self, interface):
821
        """
822
        Handler for interface link_down events
823
        """
824
        log.info("Event handle_interface_link_down %s", interface)
825
        for evc in self.get_evcs_by_svc_level():
826
            with evc.lock:
827
                evc.handle_interface_link_down(
828
                    interface
829
                )
830
831 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
832 1
    def on_link_down(self, event):
833
        """Change circuit when link is down or under_mantenance."""
834
        self.handle_link_down(event)
835
836
    # pylint: disable=too-many-branches
837
    # pylint: disable=too-many-locals
838 1
    def handle_link_down(self, event):
839
        """Change circuit when link is down or under_mantenance."""
840 1
        link = event.content["link"]
841 1
        log.info("Event handle_link_down %s", link)
842 1
        switch_flows = {}
843 1
        evcs_with_failover = []
844 1
        evcs_normal = []
845 1
        check_failover = []
846 1
        failover_event_contents = {}
847
848 1
        for evc in self.get_evcs_by_svc_level():
849 1
            with evc.lock:
850 1
                if evc.is_affected_by_link(link):
851 1
                    evc.affected_by_link_at = event.timestamp
852
                    # if there is no failover path, handles link down the
853
                    # tradditional way
854 1
                    if (
855
                        not evc.failover_path or
856
                        evc.is_failover_path_affected_by_link(link)
857
                    ):
858 1
                        evcs_normal.append(evc)
859 1
                        continue
860 1
                    try:
861 1
                        dpid_flows = evc.get_failover_flows()
862 1
                        evc.old_path = evc.current_path
863 1
                        evc.current_path = evc.failover_path
864 1
                        evc.failover_path = Path([])
865
                    # pylint: disable=broad-except
866 1
                    except Exception:
867 1
                        err = traceback.format_exc().replace("\n", ", ")
868 1
                        log.error(
869
                            "Ignore Failover path for "
870
                            f"{evc} due to error: {err}"
871
                        )
872 1
                        evcs_normal.append(evc)
873 1
                        continue
874 1
                    for dpid, flows in dpid_flows.items():
875 1
                        switch_flows.setdefault(dpid, [])
876 1
                        switch_flows[dpid].extend(flows)
877 1
                    evcs_with_failover.append(evc)
878 1
                    failover_event_contents[evc.id] = map_evc_event_content(
879
                        evc,
880
                        flows={k: v.copy() for k, v in switch_flows.items()}
881
                    )
882 1
                elif evc.is_failover_path_affected_by_link(link):
883 1
                    evc.old_path = evc.failover_path
884 1
                    evc.failover_path = Path([])
885 1
                    check_failover.append(evc)
886
887 1
        if failover_event_contents:
888 1
            emit_event(self.controller, "failover_link_down",
889
                       content=deepcopy(failover_event_contents))
890 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
891
892 1
        for evc in evcs_normal:
893 1
            emit_event(
894
                self.controller,
895
                "evc_affected_by_link_down",
896
                content={"link": link} | map_evc_event_content(evc)
897
            )
898
899 1
        evcs_to_update = []
900 1
        for evc in evcs_with_failover:
901 1
            evcs_to_update.append(evc.as_dict())
902 1
            log.info(
903
                f"{evc} redeployed with failover due to link down {link.id}"
904
            )
905 1
        for evc in check_failover:
906 1
            evcs_to_update.append(evc.as_dict())
907
908 1
        self.mongo_controller.update_evcs(evcs_to_update)
909
910 1
        emit_event(
911
            self.controller,
912
            "cleanup_evcs_old_path",
913
            content={"evcs": evcs_with_failover + check_failover}
914
        )
915
916 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
917 1
    def on_evc_affected_by_link_down(self, event):
918
        """Change circuit when link is down or under_mantenance."""
919
        self.handle_evc_affected_by_link_down(event)
920
921 1
    def handle_evc_affected_by_link_down(self, event):
922
        """Change circuit when link is down or under_mantenance."""
923 1
        evc = self.circuits.get(event.content["evc_id"])
924 1
        link = event.content['link']
925 1
        if not evc:
926 1
            return
927 1
        with evc.lock:
928 1
            if not evc.is_affected_by_link(link):
929
                return
930 1
            result = evc.handle_link_down()
931 1
        event_name = "error_redeploy_link_down"
932 1
        if result:
933 1
            log.info(f"{evc} redeployed due to link down {link.id}")
934 1
            event_name = "redeployed_link_down"
935 1
        emit_event(self.controller, event_name,
936
                   content=map_evc_event_content(evc))
937
938 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
939 1
    def on_evc_deployed(self, event):
940
        """Handle EVC deployed|redeployed_link_down."""
941
        self.handle_evc_deployed(event)
942
943 1
    def handle_evc_deployed(self, event):
944
        """Setup failover path on evc deployed."""
945
        evc = self.circuits.get(event.content["evc_id"])
946
        if not evc:
947
            return
948
        evc.try_setup_failover_path()
949
950 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
951 1
    def on_cleanup_evcs_old_path(self, event):
952
        """Handle cleanup evcs old path."""
953
        self.handle_cleanup_evcs_old_path(event)
954
955 1
    def handle_cleanup_evcs_old_path(self, event):
956
        """Handle cleanup evcs old path."""
957 1
        evcs = event.content.get("evcs", [])
958 1
        event_contents: dict[str, dict] = defaultdict(list)
959 1
        total_flows = {}
960 1
        for evc in evcs:
961 1
            if not evc.old_path:
962 1
                continue
963 1
            with evc.lock:
964 1
                removed_flows = {}
965 1
                try:
966 1
                    nni_flows = prepare_delete_flow(
967
                        evc._prepare_nni_flows(evc.old_path)
968
                    )
969 1
                    uni_flows = prepare_delete_flow(
970
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
971
                    )
972 1
                    removed_flows = merge_flow_dicts(
973
                        nni_flows, uni_flows
974
                    )
975
                # pylint: disable=broad-except
976
                except Exception:
977
                    err = traceback.format_exc().replace("\n", ", ")
978
                    log.error(f"Fail to remove {evc} old_path: {err}")
979
                    continue
980 1
                if removed_flows:
981 1
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
982 1
                    content = map_evc_event_content(
983
                        evc,
984
                        removed_flows=deepcopy(removed_flows),
985
                        current_path=evc.current_path.as_dict(),
986
                    )
987 1
                    event_contents[evc.id] = content
988 1
                    evc.old_path = Path([])
989 1
        if event_contents:
990 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
991 1
            emit_event(self.controller, "failover_old_path",
992
                       content=event_contents)
993
994 1
    @listen_to("kytos/topology.topology_loaded")
995 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
996
        """Load EVCs once the topology is available."""
997
        self.load_all_evcs()
998
999 1
    def load_all_evcs(self):
1000
        """Try to load all EVCs on startup."""
1001 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1002 1
        for circuit_id, circuit in circuits:
1003 1
            if circuit_id not in self.circuits:
1004 1
                self._load_evc(circuit)
1005 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1006
                   timeout=1)
1007
1008 1
    def _load_evc(self, circuit_dict):
1009
        """Load one EVC from mongodb to memory."""
1010 1
        try:
1011 1
            evc = self._evc_from_dict(circuit_dict)
1012 1
        except (ValueError, KytosTagError) as exception:
1013 1
            log.error(
1014
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1015
            )
1016 1
            return None
1017 1
        if evc.archived:
1018 1
            return None
1019
1020 1
        self.circuits.setdefault(evc.id, evc)
1021 1
        self.sched.add(evc)
1022 1
        return evc
1023
1024 1
    @listen_to("kytos/flow_manager.flow.error")
1025 1
    def on_flow_mod_error(self, event):
1026
        """Handle flow mod errors related to an EVC."""
1027
        self.handle_flow_mod_error(event)
1028
1029 1
    def handle_flow_mod_error(self, event):
1030
        """Handle flow mod errors related to an EVC."""
1031 1
        flow = event.content["flow"]
1032 1
        command = event.content.get("error_command")
1033 1
        if command != "add":
1034
            return
1035 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1036 1
        if evc:
1037 1
            with evc.lock:
1038 1
                evc.remove_current_flows(sync=False)
1039 1
                evc.remove_failover_flows(sync=True)
1040
1041 1
    def _evc_dict_with_instances(self, evc_dict):
1042
        """Convert some dict values to instance of EVC classes.
1043
1044
        This method will convert: [UNI, Link]
1045
        """
1046 1
        data = evc_dict.copy()  # Do not modify the original dict
1047 1
        for attribute, value in data.items():
1048
            # Get multiple attributes.
1049
            # Ex: uni_a, uni_z
1050 1
            if "uni" in attribute:
1051 1
                try:
1052 1
                    data[attribute] = self._uni_from_dict(value)
1053 1
                except ValueError as exception:
1054 1
                    result = "Error creating UNI: Invalid value"
1055 1
                    raise ValueError(result) from exception
1056
1057 1
            if attribute == "circuit_scheduler":
1058 1
                data[attribute] = []
1059 1
                for schedule in value:
1060 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1061
1062
            # Get multiple attributes.
1063
            # Ex: primary_links,
1064
            #     backup_links,
1065
            #     current_links_cache,
1066
            #     primary_links_cache,
1067
            #     backup_links_cache
1068 1
            if "links" in attribute:
1069 1
                data[attribute] = [
1070
                    self._link_from_dict(link, attribute) for link in value
1071
                ]
1072
1073
            # Ex: current_path,
1074
            #     primary_path,
1075
            #     backup_path
1076 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1077 1
                data[attribute] = Path(
1078
                    [self._link_from_dict(link, attribute) for link in value]
1079
                )
1080
1081 1
        return data
1082
1083 1
    def _evc_from_dict(self, evc_dict):
1084 1
        data = self._evc_dict_with_instances(evc_dict)
1085 1
        data["table_group"] = self.table_group
1086 1
        return EVC(self.controller, **data)
1087
1088 1
    def _uni_from_dict(self, uni_dict):
1089
        """Return a UNI object from python dict."""
1090 1
        if uni_dict is None:
1091 1
            return False
1092
1093 1
        interface_id = uni_dict.get("interface_id")
1094 1
        interface = self.controller.get_interface_by_id(interface_id)
1095 1
        if interface is None:
1096 1
            result = (
1097
                "Error creating UNI:"
1098
                + f"Could not instantiate interface {interface_id}"
1099
            )
1100 1
            raise ValueError(result) from ValueError
1101 1
        tag_convert = {1: "vlan"}
1102 1
        tag_dict = uni_dict.get("tag", None)
1103 1
        if tag_dict:
1104 1
            tag_type = tag_dict.get("tag_type")
1105 1
            tag_type = tag_convert.get(tag_type, tag_type)
1106 1
            tag_value = tag_dict.get("value")
1107 1
            if isinstance(tag_value, list):
1108 1
                tag_value = get_tag_ranges(tag_value)
1109 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1110 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1111
            else:
1112 1
                tag = TAG(tag_type, tag_value)
1113
        else:
1114 1
            tag = None
1115 1
        uni = UNI(interface, tag)
1116 1
        return uni
1117
1118 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1119
        """Return a Link object from python dict."""
1120 1
        id_a = link_dict.get("endpoint_a").get("id")
1121 1
        id_b = link_dict.get("endpoint_b").get("id")
1122
1123 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1124 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1125 1
        if not endpoint_a:
1126 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1127 1
            raise ValueError(error_msg)
1128 1
        if not endpoint_b:
1129
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1130
            raise ValueError(error_msg)
1131
1132 1
        link = Link(endpoint_a, endpoint_b)
1133 1
        allowed_paths = {"current_path", "failover_path"}
1134 1
        if "metadata" in link_dict and attribute in allowed_paths:
1135 1
            link.extend_metadata(link_dict.get("metadata"))
1136
1137 1
        s_vlan = link.get_metadata("s_vlan")
1138 1
        if s_vlan:
1139 1
            tag = TAG.from_dict(s_vlan)
1140 1
            if tag is False:
1141
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1142
                raise ValueError(error_msg)
1143 1
            link.update_metadata("s_vlan", tag)
1144 1
        return link
1145
1146 1
    def _find_evc_by_schedule_id(self, schedule_id):
1147
        """
1148
        Find an EVC and CircuitSchedule based on schedule_id.
1149
1150
        :param schedule_id: Schedule ID
1151
        :return: EVC and Schedule
1152
        """
1153 1
        circuits = self._get_circuits_buffer()
1154 1
        found_schedule = None
1155 1
        evc = None
1156
1157
        # pylint: disable=unused-variable
1158 1
        for c_id, circuit in circuits.items():
1159 1
            for schedule in circuit.circuit_scheduler:
1160 1
                if schedule.id == schedule_id:
1161 1
                    found_schedule = schedule
1162 1
                    evc = circuit
1163 1
                    break
1164 1
            if found_schedule:
1165 1
                break
1166 1
        return evc, found_schedule
1167
1168 1
    def _get_circuits_buffer(self):
1169
        """
1170
        Return the circuit buffer.
1171
1172
        If the buffer is empty, try to load data from mongodb.
1173
        """
1174 1
        if not self.circuits:
1175
            # Load circuits from mongodb to buffer
1176 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1177 1
            for c_id, circuit in circuits.items():
1178 1
                evc = self._evc_from_dict(circuit)
1179 1
                self.circuits[c_id] = evc
1180 1
        return self.circuits
1181
1182
    # pylint: disable=attribute-defined-outside-init
1183 1
    @alisten_to("kytos/of_multi_table.enable_table")
1184 1
    async def on_table_enabled(self, event):
1185
        """Handle a recently table enabled."""
1186 1
        table_group = event.content.get("mef_eline", None)
1187 1
        if not table_group:
1188 1
            return
1189 1
        for group in table_group:
1190 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1191 1
                log.error(f'The table group "{group}" is not allowed for '
1192
                          f'mef_eline. Allowed table groups are '
1193
                          f'{settings.TABLE_GROUP_ALLOWED}')
1194 1
                return
1195 1
        self.table_group.update(table_group)
1196 1
        content = {"group_table": self.table_group}
1197 1
        name = "kytos/mef_eline.enable_table"
1198
        await aemit_event(self.controller, name, content)
1199