Passed
Pull Request — master (#571)
by Vinicius
04:22
created

build.main   F

Complexity

Total Complexity 213

Size/Duplication

Total Lines 1194
Duplicated Lines 3.52 %

Test Coverage

Coverage 92.57%

Importance

Changes 0
Metric Value
eloc 802
dl 42
loc 1194
ccs 648
cts 700
cp 0.9257
rs 1.798
c 0
b 0
f 0
wmc 213

52 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.add_metadata() 0 19 3
A Main.get_metadata() 0 13 2
A Main.bulk_add_metadata() 20 20 4
A Main.on_link_down() 0 4 1
A Main.on_evc_deployed() 0 4 1
A Main._use_uni_tags() 0 10 2
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main.list_circuits() 0 15 1
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.setup() 0 30 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 13 4
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 114 14
B Main.list_schedules() 0 31 5
C Main.should_be_checked() 0 16 9
B Main.execute_consistency() 0 22 8
A Main.delete_metadata() 0 16 2
F Main.update() 0 61 15
A Main.bulk_delete_metadata() 22 22 4
A Main.delete_circuit() 0 38 4
A Main.on_cleanup_evcs_old_path() 0 4 1
B Main.handle_cleanup_evcs_old_path() 0 38 7
C Main._evc_dict_with_instances() 0 41 9
A Main.update_schedule() 0 47 2
A Main.handle_evc_deployed() 0 6 2
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 8 3
A Main.handle_interface_link_up() 0 9 3
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 31 2
A Main.handle_evc_affected_by_link_down() 0 16 5
D Main.handle_link_down() 0 76 13
B Main._link_from_dict() 0 27 7
B Main._uni_from_dict() 0 29 5
B Main.redeploy() 0 33 6
A Main.handle_flow_mod_error() 0 11 4
A Main.handle_interface_link_down() 0 9 3
B Main.handle_on_interface_link_change() 0 32 8
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
F Main._check_no_tag_duplication() 0 25 14
A Main._load_evc() 0 15 3
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 56 3
A Main._get_circuits_buffer() 0 13 3
A Main.on_interface_link_change() 0 8 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from copy import deepcopy
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts, prepare_delete_flow,
36
                                         send_flow_mods_event)
37
38
39
# pylint: disable=too-many-public-methods
40 1
class Main(KytosNApp):
41
    """Main class of amlight/mef_eline NApp.
42
43
    This class is the entry point for this napp.
44
    """
45
46 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
47
48 1
    def setup(self):
49
        """Replace the '__init__' method for the KytosNApp subclass.
50
51
        The setup method is automatically called by the controller when your
52
        application is loaded.
53
54
        So, if you have any setup routine, insert it here.
55
        """
56
        # object used to scheduler circuit events
57 1
        self.sched = Scheduler()
58
59
        # object to save and load circuits
60 1
        self.mongo_controller = self.get_eline_controller()
61 1
        self.mongo_controller.bootstrap_indexes()
62
63
        # set the controller that will manager the dynamic paths
64 1
        DynamicPathManager.set_controller(self.controller)
65
66
        # dictionary of EVCs created. It acts as a circuit buffer.
67
        # Every create/update/delete must be synced to mongodb.
68 1
        self.circuits = {}
69
70 1
        self._intf_events = defaultdict(dict)
71 1
        self._lock_interfaces = defaultdict(Lock)
72 1
        self.table_group = {"epl": 0, "evpl": 0}
73 1
        self._lock = Lock()
74 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
75
76 1
        self.load_all_evcs()
77 1
        self._topology_updated_at = None
78
79 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
80
        """Get circuits sorted by desc service level and asc creation_time.
81
82
        In the future, as more ops are offloaded it should be get from the DB.
83
        """
84 1
        if enable_filter:
85 1
            return sorted(
86
                          [circuit for circuit in self.circuits.values()
87
                           if circuit.is_enabled()],
88
                          key=lambda x: (-x.service_level, x.creation_time),
89
            )
90 1
        return sorted(self.circuits.values(),
91
                      key=lambda x: (-x.service_level, x.creation_time))
92
93 1
    @staticmethod
94 1
    def get_eline_controller():
95
        """Return the ELineController instance."""
96
        return controllers.ELineController()
97
98 1
    def execute(self):
99
        """Execute once when the napp is running."""
100 1
        if self._lock.locked():
101 1
            return
102 1
        log.debug("Starting consistency routine")
103 1
        with self._lock:
104 1
            self.execute_consistency()
105 1
        log.debug("Finished consistency routine")
106
107 1
    def should_be_checked(self, circuit):
108
        "Verify if the circuit meets the necessary conditions to be checked"
109
        # pylint: disable=too-many-boolean-expressions
110 1
        if (
111
                circuit.is_enabled()
112
                and not circuit.is_active()
113
                and not circuit.lock.locked()
114
                and not circuit.has_recent_removed_flow()
115
                and not circuit.is_recent_updated()
116
                and circuit.are_unis_active(self.controller.switches)
117
                # if a inter-switch EVC does not have current_path, it does not
118
                # make sense to run sdntrace on it
119
                and (circuit.is_intra_switch() or circuit.current_path)
120
                ):
121 1
            return True
122
        return False
123
124 1
    def execute_consistency(self):
125
        """Execute consistency routine."""
126 1
        circuits_to_check = []
127 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
            circuit.try_setup_failover_path()
131 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
132 1
        for circuit in circuits_to_check:
133 1
            is_checked = circuits_checked.get(circuit.id)
134 1
            if is_checked:
135 1
                circuit.execution_rounds = 0
136 1
                log.info(f"{circuit} enabled but inactive - activating")
137 1
                with circuit.lock:
138 1
                    circuit.activate()
139 1
                    circuit.sync()
140
            else:
141 1
                circuit.execution_rounds += 1
142 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
143 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
144 1
                    with circuit.lock:
145 1
                        circuit.deploy()
146
147 1
    def shutdown(self):
148
        """Execute when your napp is unloaded.
149
150
        If you have some cleanup procedure, insert it here.
151
        """
152
153 1
    @rest("/v2/evc/", methods=["GET"])
154 1
    def list_circuits(self, request: Request) -> JSONResponse:
155
        """Endpoint to return circuits stored.
156
157
        archive query arg if defined (not null) will be filtered
158
        accordingly, by default only non archived evcs will be listed
159
        """
160 1
        log.debug("list_circuits /v2/evc")
161 1
        args = request.query_params
162 1
        archived = args.get("archived", "false").lower()
163 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
164 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
165
                                                      metadata=args)
166 1
        circuits = circuits['circuits']
167 1
        return JSONResponse(circuits)
168
169 1
    @rest("/v2/evc/schedule", methods=["GET"])
170 1
    def list_schedules(self, _request: Request) -> JSONResponse:
171
        """Endpoint to return all schedules stored for all circuits.
172
173
        Return a JSON with the following template:
174
        [{"schedule_id": <schedule_id>,
175
         "circuit_id": <circuit_id>,
176
         "schedule": <schedule object>}]
177
        """
178 1
        log.debug("list_schedules /v2/evc/schedule")
179 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
180 1
        if not circuits:
181 1
            result = {}
182 1
            status = 200
183 1
            return JSONResponse(result, status_code=status)
184
185 1
        result = []
186 1
        status = 200
187 1
        for circuit in circuits:
188 1
            circuit_scheduler = circuit.get("circuit_scheduler")
189 1
            if circuit_scheduler:
190 1
                for scheduler in circuit_scheduler:
191 1
                    value = {
192
                        "schedule_id": scheduler.get("id"),
193
                        "circuit_id": circuit.get("id"),
194
                        "schedule": scheduler,
195
                    }
196 1
                    result.append(value)
197
198 1
        log.debug("list_schedules result %s %s", result, status)
199 1
        return JSONResponse(result, status_code=status)
200
201 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
202 1
    def get_circuit(self, request: Request) -> JSONResponse:
203
        """Endpoint to return a circuit based on id."""
204 1
        circuit_id = request.path_params["circuit_id"]
205 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
206 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
207 1
        if not circuit:
208 1
            result = f"circuit_id {circuit_id} not found"
209 1
            log.debug("get_circuit result %s %s", result, 404)
210 1
            raise HTTPException(404, detail=result)
211 1
        status = 200
212 1
        log.debug("get_circuit result %s %s", circuit, status)
213 1
        return JSONResponse(circuit, status_code=status)
214
215
    # pylint: disable=too-many-branches, too-many-statements
216 1
    @rest("/v2/evc/", methods=["POST"])
217 1
    @validate_openapi(spec)
218 1
    def create_circuit(self, request: Request) -> JSONResponse:
219
        """Try to create a new circuit.
220
221
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
222
        UNI_Z's requested C-VID are available from the interfaces' pools. This
223
        is checked when creating the UNI object.
224
225
        Then, E-Line NApp requests a primary and a backup path to the
226
        Pathfinder NApp using the attributes primary_links and backup_links
227
        submitted via REST
228
229
        # For each link composing paths in #3:
230
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
231
        #  - Using the S-VID obtained, generate abstract flow entries to be
232
        #    sent to FlowManager
233
234
        Push abstract flow entries to FlowManager and FlowManager pushes
235
        OpenFlow entries to datapaths
236
237
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
238
        creation
239
240
        Finnaly, notify user of the status of its request.
241
        """
242
        # Try to create the circuit object
243 1
        log.debug("create_circuit /v2/evc/")
244 1
        data = get_json_or_400(request, self.controller.loop)
245
246 1
        try:
247 1
            evc = self._evc_from_dict(data)
248 1
        except (ValueError, KytosTagError) as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 400)
250 1
            raise HTTPException(400, detail=str(exception)) from exception
251 1
        try:
252 1
            check_disabled_component(evc.uni_a, evc.uni_z)
253 1
        except DisabledSwitch as exception:
254 1
            log.debug("create_circuit result %s %s", exception, 409)
255 1
            raise HTTPException(
256
                    409,
257
                    detail=f"Path is not valid: {exception}"
258
                ) from exception
259
260 1
        if evc.primary_path:
261 1
            try:
262 1
                evc.primary_path.is_valid(
263
                    evc.uni_a.interface.switch,
264
                    evc.uni_z.interface.switch,
265
                    bool(evc.circuit_scheduler),
266
                )
267 1
            except InvalidPath as exception:
268 1
                raise HTTPException(
269
                    400,
270
                    detail=f"primary_path is not valid: {exception}"
271
                ) from exception
272 1
        if evc.backup_path:
273 1
            try:
274 1
                evc.backup_path.is_valid(
275
                    evc.uni_a.interface.switch,
276
                    evc.uni_z.interface.switch,
277
                    bool(evc.circuit_scheduler),
278
                )
279 1
            except InvalidPath as exception:
280 1
                raise HTTPException(
281
                    400,
282
                    detail=f"backup_path is not valid: {exception}"
283
                ) from exception
284
285 1
        if not evc._tag_lists_equal():
286 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
287 1
            raise HTTPException(400, detail=detail)
288
289 1
        try:
290 1
            evc._validate_has_primary_or_dynamic()
291 1
        except ValueError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
296
        except DuplicatedNoTagUNI as exception:
297
            log.debug("create_circuit result %s %s", exception, 409)
298
            raise HTTPException(409, detail=str(exception)) from exception
299
300 1
        try:
301 1
            self._use_uni_tags(evc)
302 1
        except KytosTagError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # save circuit
306 1
        try:
307 1
            evc.sync()
308
        except ValidationError as exception:
309
            raise HTTPException(400, detail=str(exception)) from exception
310
311
        # store circuit in dictionary
312 1
        self.circuits[evc.id] = evc
313
314
        # Schedule the circuit deploy
315 1
        self.sched.add(evc)
316
317
        # Circuit has no schedule, deploy now
318 1
        deployed = False
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                deployed = evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id, "deployed": deployed}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392 1
        redeployed = False
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    redeployed = evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    redeployed = evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    redeployed = evc.deploy()
409 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432 1
        log.info("Removing %s", evc)
433
434 1
        with evc.lock:
435 1
            if not evc.archived:
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.remove_current_flows(sync=False)
440 1
                evc.remove_failover_flows(sync=False)
441 1
                evc.archive()
442 1
                evc.remove_uni_tags()
443 1
                evc.sync()
444 1
                emit_event(
445
                    self.controller, "deleted",
446
                    content=map_evc_event_content(evc)
447
                )
448
449 1
        log.info("EVC removed. %s", evc)
450 1
        result = {"response": f"Circuit {circuit_id} removed"}
451 1
        status = 200
452 1
        log.debug("delete_circuit result %s %s", result, status)
453
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs_metadata(
519
            circuit_ids, {key: ""}, "del"
520
        )
521
522 1
        fail_evcs = []
523 1
        for _id in circuit_ids:
524 1
            try:
525 1
                evc = self.circuits[_id]
526 1
                evc.remove_metadata(key)
527 1
            except KeyError:
528 1
                fail_evcs.append(_id)
529
530 1
        if fail_evcs:
531 1
            raise HTTPException(404, detail=fail_evcs)
532 1
        return JSONResponse("Operation successful")
533
534 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
535 1
    def delete_metadata(self, request: Request) -> JSONResponse:
536
        """Delete metadata from an EVC."""
537 1
        circuit_id = request.path_params["circuit_id"]
538 1
        key = request.path_params["key"]
539 1
        try:
540 1
            evc = self.circuits[circuit_id]
541 1
        except KeyError as error:
542 1
            raise HTTPException(
543
                404,
544
                detail=f"circuit_id {circuit_id} not found."
545
            ) from error
546
547 1
        evc.remove_metadata(key)
548 1
        evc.sync()
549 1
        return JSONResponse("Operation successful")
550
551 1
    @rest("/v2/evc/{circuit_id}/redeploy/", methods=["PATCH"])
552 1
    def redeploy(self, request: Request) -> JSONResponse:
553
        """Endpoint to force the redeployment of an EVC."""
554 1
        circuit_id = request.path_params["circuit_id"]
555 1
        avoid_vlan = request.query_params.get("avoid_vlan", "false").lower()
556 1
        if avoid_vlan not in {"true", "false"}:
557
            msg = "Parameter avoid_vlan does not have a valid value."
558
            raise HTTPException(400, detail=msg)
559 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
560 1
        try:
561 1
            evc = self.circuits[circuit_id]
562 1
        except KeyError:
563 1
            raise HTTPException(
564
                404,
565
                detail=f"circuit_id {circuit_id} not found"
566
            ) from KeyError
567 1
        deployed = False
568 1
        if evc.is_enabled():
569 1
            with evc.lock:
570 1
                path_dict = evc.remove_current_flows(
571
                    sync=False, return_path=avoid_vlan == "true")
572 1
                evc.remove_failover_flows(sync=True)
573 1
                deployed = evc.deploy(path_dict)
574 1
        if deployed:
575 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
576 1
            status = 202
577
        else:
578 1
            result = {
579
                "response": f"Circuit {circuit_id} is disabled."
580
            }
581 1
            status = 409
582
583 1
        return JSONResponse(result, status_code=status)
584
585 1
    @rest("/v2/evc/schedule/", methods=["POST"])
586 1
    @validate_openapi(spec)
587 1
    def create_schedule(self, request: Request) -> JSONResponse:
588
        """
589
        Create a new schedule for a given circuit.
590
591
        This service do no check if there are conflicts with another schedule.
592
        Payload example:
593
            {
594
              "circuit_id":"aa:bb:cc",
595
              "schedule": {
596
                "date": "2019-08-07T14:52:10.967Z",
597
                "interval": "string",
598
                "frequency": "1 * * * *",
599
                "action": "create"
600
              }
601
            }
602
        """
603 1
        log.debug("create_schedule /v2/evc/schedule/")
604 1
        data = get_json_or_400(request, self.controller.loop)
605 1
        circuit_id = data["circuit_id"]
606 1
        schedule_data = data["schedule"]
607
608
        # Get EVC from circuits buffer
609 1
        circuits = self._get_circuits_buffer()
610
611
        # get the circuit
612 1
        evc = circuits.get(circuit_id)
613
614
        # get the circuit
615 1
        if not evc:
616 1
            result = f"circuit_id {circuit_id} not found"
617 1
            log.debug("create_schedule result %s %s", result, 404)
618 1
            raise HTTPException(404, detail=result)
619
620
        # new schedule from dict
621 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
622
623
        # If there is no schedule, create the list
624 1
        if not evc.circuit_scheduler:
625 1
            evc.circuit_scheduler = []
626
627
        # Add the new schedule
628 1
        evc.circuit_scheduler.append(new_schedule)
629
630
        # Add schedule job
631 1
        self.sched.add_circuit_job(evc, new_schedule)
632
633
        # save circuit to mongodb
634 1
        evc.sync()
635
636 1
        result = new_schedule.as_dict()
637 1
        status = 201
638
639 1
        log.debug("create_schedule result %s %s", result, status)
640 1
        return JSONResponse(result, status_code=status)
641
642 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
643 1
    @validate_openapi(spec)
644 1
    def update_schedule(self, request: Request) -> JSONResponse:
645
        """Update a schedule.
646
647
        Change all attributes from the given schedule from a EVC circuit.
648
        The schedule ID is preserved as default.
649
        Payload example:
650
            {
651
              "date": "2019-08-07T14:52:10.967Z",
652
              "interval": "string",
653
              "frequency": "1 * * *",
654
              "action": "create"
655
            }
656
        """
657 1
        data = get_json_or_400(request, self.controller.loop)
658 1
        schedule_id = request.path_params["schedule_id"]
659 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
660
661
        # Try to find a circuit schedule
662 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
663
664
        # Can not modify circuits deleted and archived
665 1
        if not found_schedule:
666 1
            result = f"schedule_id {schedule_id} not found"
667 1
            log.debug("update_schedule result %s %s", result, 404)
668 1
            raise HTTPException(404, detail=result)
669
670 1
        new_schedule = CircuitSchedule.from_dict(data)
671 1
        new_schedule.id = found_schedule.id
672
        # Remove the old schedule
673 1
        evc.circuit_scheduler.remove(found_schedule)
674
        # Append the modified schedule
675 1
        evc.circuit_scheduler.append(new_schedule)
676
677
        # Cancel all schedule jobs
678 1
        self.sched.cancel_job(found_schedule.id)
679
        # Add the new circuit schedule
680 1
        self.sched.add_circuit_job(evc, new_schedule)
681
        # Save EVC to mongodb
682 1
        evc.sync()
683
684 1
        result = new_schedule.as_dict()
685 1
        status = 200
686
687 1
        log.debug("update_schedule result %s %s", result, status)
688 1
        return JSONResponse(result, status_code=status)
689
690 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
691 1
    def delete_schedule(self, request: Request) -> JSONResponse:
692
        """Remove a circuit schedule.
693
694
        Remove the Schedule from EVC.
695
        Remove the Schedule from cron job.
696
        Save the EVC to the Storehouse.
697
        """
698 1
        schedule_id = request.path_params["schedule_id"]
699 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
700 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
701
702
        # Can not modify circuits deleted and archived
703 1
        if not found_schedule:
704 1
            result = f"schedule_id {schedule_id} not found"
705 1
            log.debug("delete_schedule result %s %s", result, 404)
706 1
            raise HTTPException(404, detail=result)
707
708
        # Remove the old schedule
709 1
        evc.circuit_scheduler.remove(found_schedule)
710
711
        # Cancel all schedule jobs
712 1
        self.sched.cancel_job(found_schedule.id)
713
        # Save EVC to mongodb
714 1
        evc.sync()
715
716 1
        result = "Schedule removed"
717 1
        status = 200
718
719 1
        log.debug("delete_schedule result %s %s", result, status)
720 1
        return JSONResponse(result, status_code=status)
721
722 1
    def _check_no_tag_duplication(
723
        self,
724
        evc_id: str,
725
        uni_a: Optional[UNI] = None,
726
        uni_z: Optional[UNI] = None
727
    ):
728
        """Check if the given EVC has UNIs with no tag and if these are
729
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
730
        Args:
731
            evc (dict): EVC to be analyzed.
732
        """
733
734
        # No UNIs
735 1
        if not (uni_a or uni_z):
736 1
            return
737
738 1
        if (not (uni_a and not uni_a.user_tag) and
739
                not (uni_z and not uni_z.user_tag)):
740 1
            return
741 1
        for circuit in self.circuits.copy().values():
742 1
            if (not circuit.archived and circuit._id != evc_id):
743 1
                if uni_a and uni_a.user_tag is None:
744 1
                    circuit.check_no_tag_duplicate(uni_a)
745 1
                if uni_z and uni_z.user_tag is None:
746 1
                    circuit.check_no_tag_duplicate(uni_z)
747
748 1
    @listen_to("kytos/topology.link_up")
749 1
    def on_link_up(self, event):
750
        """Change circuit when link is up or end_maintenance."""
751
        self.handle_link_up(event)
752
753 1
    def handle_link_up(self, event):
754
        """Change circuit when link is up or end_maintenance."""
755 1
        log.info("Event handle_link_up %s", event.content["link"])
756 1
        for evc in self.get_evcs_by_svc_level():
757 1
            if evc.is_enabled() and not evc.archived:
758 1
                with evc.lock:
759 1
                    evc.handle_link_up(event.content["link"])
760
761
    # Possibly replace this with interruptions?
762 1
    @listen_to(
763
        '.*.switch.interface.(link_up|link_down|created|deleted)'
764
    )
765 1
    def on_interface_link_change(self, event: KytosEvent):
766
        """
767
        Handler for interface link_up and link_down events.
768
        """
769
        self.handle_on_interface_link_change(event)
770
771 1
    def handle_on_interface_link_change(self, event: KytosEvent):
772
        """
773
        Handler to sort interface events {link_(up, down), create, deleted}
774
775
        To avoid multiple database updated (link flap):
776
        Every interface is identfied and processed in parallel.
777
        Once an interface event is received a time is started.
778
        While time is running self._intf_events will be updated.
779
        After time has passed last received event will be processed.
780
        """
781 1
        iface = event.content.get("interface")
782 1
        with self._lock_interfaces[iface.id]:
783 1
            _now = event.timestamp
784
            # Return out of order events
785 1
            if (
786
                iface.id in self._intf_events
787
                and self._intf_events[iface.id]["event"].timestamp > _now
788
            ):
789 1
                return
790 1
            self._intf_events[iface.id].update({"event": event})
791 1
            if "last_acquired" in self._intf_events[iface.id]:
792 1
                return
793 1
            self._intf_events[iface.id].update({"last_acquired": now()})
794 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
795 1
        with self._lock_interfaces[iface.id]:
796 1
            event = self._intf_events[iface.id]["event"]
797 1
            self._intf_events[iface.id].pop('last_acquired', None)
798 1
            _, _, event_type = event.name.rpartition('.')
799 1
            if event_type in ('link_up', 'created'):
800 1
                self.handle_interface_link_up(iface)
801 1
            elif event_type in ('link_down', 'deleted'):
802 1
                self.handle_interface_link_down(iface)
803
804 1
    def handle_interface_link_up(self, interface):
805
        """
806
        Handler for interface link_up events
807
        """
808
        log.info("Event handle_interface_link_up %s", interface)
809
        for evc in self.get_evcs_by_svc_level():
810
            with evc.lock:
811
                evc.handle_interface_link_up(
812
                    interface
813
                )
814
815 1
    def handle_interface_link_down(self, interface):
816
        """
817
        Handler for interface link_down events
818
        """
819
        log.info("Event handle_interface_link_down %s", interface)
820
        for evc in self.get_evcs_by_svc_level():
821
            with evc.lock:
822
                evc.handle_interface_link_down(
823
                    interface
824
                )
825
826 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
827 1
    def on_link_down(self, event):
828
        """Change circuit when link is down or under_mantenance."""
829
        self.handle_link_down(event)
830
831
    # pylint: disable=too-many-branches
832
    # pylint: disable=too-many-locals
833 1
    def handle_link_down(self, event):
834
        """Change circuit when link is down or under_mantenance."""
835 1
        link = event.content["link"]
836 1
        log.info("Event handle_link_down %s", link)
837 1
        switch_flows = {}
838 1
        evcs_with_failover = []
839 1
        evcs_normal = []
840 1
        check_failover = []
841 1
        failover_event_contents = {}
842
843 1
        for evc in self.get_evcs_by_svc_level():
844 1
            with evc.lock:
845 1
                if evc.is_affected_by_link(link):
846 1
                    evc.affected_by_link_at = event.timestamp
847
                    # if there is no failover path, handles link down the
848
                    # tradditional way
849 1
                    if (
850
                        not evc.failover_path or
851
                        evc.is_failover_path_affected_by_link(link)
852
                    ):
853 1
                        evcs_normal.append(evc)
854 1
                        continue
855 1
                    try:
856 1
                        dpid_flows = evc.get_failover_flows()
857 1
                        evc.old_path = evc.current_path
858 1
                        evc.current_path = evc.failover_path
859 1
                        evc.failover_path = Path([])
860
                    # pylint: disable=broad-except
861 1
                    except Exception:
862 1
                        err = traceback.format_exc().replace("\n", ", ")
863 1
                        log.error(
864
                            "Ignore Failover path for "
865
                            f"{evc} due to error: {err}"
866
                        )
867 1
                        evcs_normal.append(evc)
868 1
                        continue
869 1
                    for dpid, flows in dpid_flows.items():
870 1
                        switch_flows.setdefault(dpid, [])
871 1
                        switch_flows[dpid].extend(flows)
872 1
                    evcs_with_failover.append(evc)
873 1
                    failover_event_contents[evc.id] = map_evc_event_content(
874
                        evc,
875
                        flows={k: v.copy() for k, v in switch_flows.items()}
876
                    )
877 1
                elif evc.is_failover_path_affected_by_link(link):
878 1
                    evc.old_path = evc.failover_path
879 1
                    evc.failover_path = Path([])
880 1
                    check_failover.append(evc)
881
882 1
        if failover_event_contents:
883 1
            emit_event(self.controller, "failover_link_down",
884
                       content=deepcopy(failover_event_contents))
885 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
886
887 1
        for evc in evcs_normal:
888 1
            emit_event(
889
                self.controller,
890
                "evc_affected_by_link_down",
891
                content={"link": link} | map_evc_event_content(evc)
892
            )
893
894 1
        evcs_to_update = []
895 1
        for evc in evcs_with_failover:
896 1
            evcs_to_update.append(evc.as_dict())
897 1
            log.info(
898
                f"{evc} redeployed with failover due to link down {link.id}"
899
            )
900 1
        for evc in check_failover:
901 1
            evcs_to_update.append(evc.as_dict())
902
903 1
        self.mongo_controller.update_evcs(evcs_to_update)
904
905 1
        emit_event(
906
            self.controller,
907
            "cleanup_evcs_old_path",
908
            content={"evcs": evcs_with_failover + check_failover}
909
        )
910
911 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
912 1
    def on_evc_affected_by_link_down(self, event):
913
        """Change circuit when link is down or under_mantenance."""
914
        self.handle_evc_affected_by_link_down(event)
915
916 1
    def handle_evc_affected_by_link_down(self, event):
917
        """Change circuit when link is down or under_mantenance."""
918 1
        evc = self.circuits.get(event.content["evc_id"])
919 1
        link = event.content['link']
920 1
        if not evc:
921 1
            return
922 1
        with evc.lock:
923 1
            if not evc.is_affected_by_link(link):
924
                return
925 1
            result = evc.handle_link_down()
926 1
        event_name = "error_redeploy_link_down"
927 1
        if result:
928 1
            log.info(f"{evc} redeployed due to link down {link.id}")
929 1
            event_name = "redeployed_link_down"
930 1
        emit_event(self.controller, event_name,
931
                   content=map_evc_event_content(evc))
932
933 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
934 1
    def on_evc_deployed(self, event):
935
        """Handle EVC deployed|redeployed_link_down."""
936
        self.handle_evc_deployed(event)
937
938 1
    def handle_evc_deployed(self, event):
939
        """Setup failover path on evc deployed."""
940
        evc = self.circuits.get(event.content["evc_id"])
941
        if not evc:
942
            return
943
        evc.try_setup_failover_path()
944
945 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
946 1
    def on_cleanup_evcs_old_path(self, event):
947
        """Handle cleanup evcs old path."""
948
        self.handle_cleanup_evcs_old_path(event)
949
950 1
    def handle_cleanup_evcs_old_path(self, event):
951
        """Handle cleanup evcs old path."""
952 1
        evcs = event.content.get("evcs", [])
953 1
        event_contents: dict[str, dict] = defaultdict(list)
954 1
        total_flows = {}
955 1
        for evc in evcs:
956 1
            if not evc.old_path:
957 1
                continue
958 1
            with evc.lock:
959 1
                removed_flows = {}
960 1
                try:
961 1
                    nni_flows = prepare_delete_flow(
962
                        evc._prepare_nni_flows(evc.old_path)
963
                    )
964 1
                    uni_flows = prepare_delete_flow(
965
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
966
                    )
967 1
                    removed_flows = merge_flow_dicts(
968
                        nni_flows, uni_flows
969
                    )
970
                # pylint: disable=broad-except
971
                except Exception:
972
                    err = traceback.format_exc().replace("\n", ", ")
973
                    log.error(f"Fail to remove {evc} old_path: {err}")
974
                    continue
975 1
                if removed_flows:
976 1
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
977 1
                    content = map_evc_event_content(
978
                        evc,
979
                        removed_flows=deepcopy(removed_flows),
980
                        current_path=evc.current_path.as_dict(),
981
                    )
982 1
                    event_contents[evc.id] = content
983 1
                    evc.old_path = Path([])
984 1
        if event_contents:
985 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
986 1
            emit_event(self.controller, "failover_old_path",
987
                       content=event_contents)
988
989 1
    @listen_to("kytos/topology.topology_loaded")
990 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
991
        """Load EVCs once the topology is available."""
992
        self.load_all_evcs()
993
994 1
    def load_all_evcs(self):
995
        """Try to load all EVCs on startup."""
996 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
997 1
        for circuit_id, circuit in circuits:
998 1
            if circuit_id not in self.circuits:
999 1
                self._load_evc(circuit)
1000 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1001
                   timeout=1)
1002
1003 1
    def _load_evc(self, circuit_dict):
1004
        """Load one EVC from mongodb to memory."""
1005 1
        try:
1006 1
            evc = self._evc_from_dict(circuit_dict)
1007 1
        except (ValueError, KytosTagError) as exception:
1008 1
            log.error(
1009
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1010
            )
1011 1
            return None
1012 1
        if evc.archived:
1013 1
            return None
1014
1015 1
        self.circuits.setdefault(evc.id, evc)
1016 1
        self.sched.add(evc)
1017 1
        return evc
1018
1019 1
    @listen_to("kytos/flow_manager.flow.error")
1020 1
    def on_flow_mod_error(self, event):
1021
        """Handle flow mod errors related to an EVC."""
1022
        self.handle_flow_mod_error(event)
1023
1024 1
    def handle_flow_mod_error(self, event):
1025
        """Handle flow mod errors related to an EVC."""
1026 1
        flow = event.content["flow"]
1027 1
        command = event.content.get("error_command")
1028 1
        if command != "add":
1029
            return
1030 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1031 1
        if evc:
1032 1
            with evc.lock:
1033 1
                evc.remove_current_flows(sync=False)
1034 1
                evc.remove_failover_flows(sync=True)
1035
1036 1
    def _evc_dict_with_instances(self, evc_dict):
1037
        """Convert some dict values to instance of EVC classes.
1038
1039
        This method will convert: [UNI, Link]
1040
        """
1041 1
        data = evc_dict.copy()  # Do not modify the original dict
1042 1
        for attribute, value in data.items():
1043
            # Get multiple attributes.
1044
            # Ex: uni_a, uni_z
1045 1
            if "uni" in attribute:
1046 1
                try:
1047 1
                    data[attribute] = self._uni_from_dict(value)
1048 1
                except ValueError as exception:
1049 1
                    result = "Error creating UNI: Invalid value"
1050 1
                    raise ValueError(result) from exception
1051
1052 1
            if attribute == "circuit_scheduler":
1053 1
                data[attribute] = []
1054 1
                for schedule in value:
1055 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1056
1057
            # Get multiple attributes.
1058
            # Ex: primary_links,
1059
            #     backup_links,
1060
            #     current_links_cache,
1061
            #     primary_links_cache,
1062
            #     backup_links_cache
1063 1
            if "links" in attribute:
1064 1
                data[attribute] = [
1065
                    self._link_from_dict(link, attribute) for link in value
1066
                ]
1067
1068
            # Ex: current_path,
1069
            #     primary_path,
1070
            #     backup_path
1071 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1072 1
                data[attribute] = Path(
1073
                    [self._link_from_dict(link, attribute) for link in value]
1074
                )
1075
1076 1
        return data
1077
1078 1
    def _evc_from_dict(self, evc_dict):
1079 1
        data = self._evc_dict_with_instances(evc_dict)
1080 1
        data["table_group"] = self.table_group
1081 1
        return EVC(self.controller, **data)
1082
1083 1
    def _uni_from_dict(self, uni_dict):
1084
        """Return a UNI object from python dict."""
1085 1
        if uni_dict is None:
1086 1
            return False
1087
1088 1
        interface_id = uni_dict.get("interface_id")
1089 1
        interface = self.controller.get_interface_by_id(interface_id)
1090 1
        if interface is None:
1091 1
            result = (
1092
                "Error creating UNI:"
1093
                + f"Could not instantiate interface {interface_id}"
1094
            )
1095 1
            raise ValueError(result) from ValueError
1096 1
        tag_convert = {1: "vlan"}
1097 1
        tag_dict = uni_dict.get("tag", None)
1098 1
        if tag_dict:
1099 1
            tag_type = tag_dict.get("tag_type")
1100 1
            tag_type = tag_convert.get(tag_type, tag_type)
1101 1
            tag_value = tag_dict.get("value")
1102 1
            if isinstance(tag_value, list):
1103 1
                tag_value = get_tag_ranges(tag_value)
1104 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1105 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1106
            else:
1107 1
                tag = TAG(tag_type, tag_value)
1108
        else:
1109 1
            tag = None
1110 1
        uni = UNI(interface, tag)
1111 1
        return uni
1112
1113 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1114
        """Return a Link object from python dict."""
1115 1
        id_a = link_dict.get("endpoint_a").get("id")
1116 1
        id_b = link_dict.get("endpoint_b").get("id")
1117
1118 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1119 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1120 1
        if not endpoint_a:
1121 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1122 1
            raise ValueError(error_msg)
1123 1
        if not endpoint_b:
1124
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1125
            raise ValueError(error_msg)
1126
1127 1
        link = Link(endpoint_a, endpoint_b)
1128 1
        allowed_paths = {"current_path", "failover_path"}
1129 1
        if "metadata" in link_dict and attribute in allowed_paths:
1130 1
            link.extend_metadata(link_dict.get("metadata"))
1131
1132 1
        s_vlan = link.get_metadata("s_vlan")
1133 1
        if s_vlan:
1134 1
            tag = TAG.from_dict(s_vlan)
1135 1
            if tag is False:
1136
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1137
                raise ValueError(error_msg)
1138 1
            link.update_metadata("s_vlan", tag)
1139 1
        return link
1140
1141 1
    def _find_evc_by_schedule_id(self, schedule_id):
1142
        """
1143
        Find an EVC and CircuitSchedule based on schedule_id.
1144
1145
        :param schedule_id: Schedule ID
1146
        :return: EVC and Schedule
1147
        """
1148 1
        circuits = self._get_circuits_buffer()
1149 1
        found_schedule = None
1150 1
        evc = None
1151
1152
        # pylint: disable=unused-variable
1153 1
        for c_id, circuit in circuits.items():
1154 1
            for schedule in circuit.circuit_scheduler:
1155 1
                if schedule.id == schedule_id:
1156 1
                    found_schedule = schedule
1157 1
                    evc = circuit
1158 1
                    break
1159 1
            if found_schedule:
1160 1
                break
1161 1
        return evc, found_schedule
1162
1163 1
    def _get_circuits_buffer(self):
1164
        """
1165
        Return the circuit buffer.
1166
1167
        If the buffer is empty, try to load data from mongodb.
1168
        """
1169 1
        if not self.circuits:
1170
            # Load circuits from mongodb to buffer
1171 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1172 1
            for c_id, circuit in circuits.items():
1173 1
                evc = self._evc_from_dict(circuit)
1174 1
                self.circuits[c_id] = evc
1175 1
        return self.circuits
1176
1177
    # pylint: disable=attribute-defined-outside-init
1178 1
    @alisten_to("kytos/of_multi_table.enable_table")
1179 1
    async def on_table_enabled(self, event):
1180
        """Handle a recently table enabled."""
1181 1
        table_group = event.content.get("mef_eline", None)
1182 1
        if not table_group:
1183 1
            return
1184 1
        for group in table_group:
1185 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1186 1
                log.error(f'The table group "{group}" is not allowed for '
1187
                          f'mef_eline. Allowed table groups are '
1188
                          f'{settings.TABLE_GROUP_ALLOWED}')
1189 1
                return
1190 1
        self.table_group.update(table_group)
1191 1
        content = {"group_table": self.table_group}
1192 1
        name = "kytos/mef_eline.enable_table"
1193
        await aemit_event(self.controller, name, content)
1194