Passed
Pull Request — master (#626)
by Aldo
08:48 queued 04:35
created

build.main   F

Complexity

Total Complexity 214

Size/Duplication

Total Lines 1201
Duplicated Lines 3.5 %

Test Coverage

Coverage 92.31%

Importance

Changes 0
Metric Value
eloc 807
dl 42
loc 1201
ccs 648
cts 702
cp 0.9231
rs 1.793
c 0
b 0
f 0
wmc 214

52 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main.add_metadata() 0 19 3
A Main.get_metadata() 0 13 2
A Main.bulk_add_metadata() 20 20 4
A Main.list_circuits() 0 15 1
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
B Main.list_schedules() 0 31 5
A Main._use_uni_tags() 0 10 2
A Main.on_cleanup_evcs_old_path() 0 4 1
A Main.handle_evc_deployed() 0 6 2
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.handle_evc_affected_by_link_down() 0 16 5
A Main.on_evc_deployed() 0 4 1
A Main.on_link_down() 0 4 1
C Main._evc_dict_with_instances() 0 41 9
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
B Main._link_from_dict() 0 27 7
B Main._uni_from_dict() 0 29 5
A Main.on_table_enabled() 0 16 4
A Main._get_circuits_buffer() 0 13 3
B Main.handle_cleanup_evcs_old_path() 0 38 7
A Main.update_schedule() 0 47 2
A Main.load_all_evcs() 0 8 3
A Main.handle_interface_link_up() 0 10 4
A Main.handle_link_up() 0 7 5
A Main.delete_metadata() 0 16 2
A Main.on_link_up() 0 4 1
A Main.delete_schedule() 0 31 2
A Main.setup() 0 30 1
D Main.handle_link_down() 0 76 13
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 13 4
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 114 14
C Main.should_be_checked() 0 16 9
B Main.redeploy() 0 38 6
A Main.handle_flow_mod_error() 0 10 3
F Main.update() 0 61 15
B Main.execute_consistency() 0 22 8
A Main.handle_interface_link_down() 0 10 4
B Main.handle_on_interface_link_change() 0 32 8
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main.bulk_delete_metadata() 22 22 4
F Main._check_no_tag_duplication() 0 25 14
A Main._load_evc() 0 15 3
A Main.create_schedule() 0 56 3
A Main.on_interface_link_change() 0 8 1
A Main.delete_circuit() 0 38 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from copy import deepcopy
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
33
                                         check_disabled_component, emit_event,
34
                                         get_vlan_tags_and_masks,
35
                                         map_evc_event_content,
36
                                         merge_flow_dicts, prepare_delete_flow,
37
                                         send_flow_mods_event)
38
39
40
# pylint: disable=too-many-public-methods
41 1
class Main(KytosNApp):
42
    """Main class of amlight/mef_eline NApp.
43
44
    This class is the entry point for this napp.
45
    """
46
47 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
48
49 1
    def setup(self):
50
        """Replace the '__init__' method for the KytosNApp subclass.
51
52
        The setup method is automatically called by the controller when your
53
        application is loaded.
54
55
        So, if you have any setup routine, insert it here.
56
        """
57
        # object used to scheduler circuit events
58 1
        self.sched = Scheduler()
59
60
        # object to save and load circuits
61 1
        self.mongo_controller = self.get_eline_controller()
62 1
        self.mongo_controller.bootstrap_indexes()
63
64
        # set the controller that will manager the dynamic paths
65 1
        DynamicPathManager.set_controller(self.controller)
66
67
        # dictionary of EVCs created. It acts as a circuit buffer.
68
        # Every create/update/delete must be synced to mongodb.
69 1
        self.circuits = {}
70
71 1
        self._intf_events = defaultdict(dict)
72 1
        self._lock_interfaces = defaultdict(Lock)
73 1
        self.table_group = {"epl": 0, "evpl": 0}
74 1
        self._lock = Lock()
75 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
76
77 1
        self.load_all_evcs()
78 1
        self._topology_updated_at = None
79
80 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
81
        """Get circuits sorted by desc service level and asc creation_time.
82
83
        In the future, as more ops are offloaded it should be get from the DB.
84
        """
85 1
        if enable_filter:
86 1
            return sorted(
87
                          [circuit for circuit in self.circuits.values()
88
                           if circuit.is_enabled()],
89
                          key=lambda x: (-x.service_level, x.creation_time),
90
            )
91 1
        return sorted(self.circuits.values(),
92
                      key=lambda x: (-x.service_level, x.creation_time))
93
94 1
    @staticmethod
95 1
    def get_eline_controller():
96
        """Return the ELineController instance."""
97
        return controllers.ELineController()
98
99 1
    def execute(self):
100
        """Execute once when the napp is running."""
101 1
        if self._lock.locked():
102 1
            return
103 1
        log.debug("Starting consistency routine")
104 1
        with self._lock:
105 1
            self.execute_consistency()
106 1
        log.debug("Finished consistency routine")
107
108 1
    def should_be_checked(self, circuit):
109
        "Verify if the circuit meets the necessary conditions to be checked"
110
        # pylint: disable=too-many-boolean-expressions
111 1
        if (
112
                circuit.is_enabled()
113
                and not circuit.is_active()
114
                and not circuit.lock.locked()
115
                and not circuit.has_recent_removed_flow()
116
                and not circuit.is_recent_updated()
117
                and circuit.are_unis_active()
118
                # if a inter-switch EVC does not have current_path, it does not
119
                # make sense to run sdntrace on it
120
                and (circuit.is_intra_switch() or circuit.current_path)
121
                ):
122 1
            return True
123
        return False
124
125 1
    def execute_consistency(self):
126
        """Execute consistency routine."""
127 1
        circuits_to_check = []
128 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
129 1
            if self.should_be_checked(circuit):
130 1
                circuits_to_check.append(circuit)
131 1
            circuit.try_setup_failover_path(warn_if_not_path=False)
132 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
133 1
        for circuit in circuits_to_check:
134 1
            is_checked = circuits_checked.get(circuit.id)
135 1
            if is_checked:
136 1
                circuit.execution_rounds = 0
137 1
                log.info(f"{circuit} enabled but inactive - activating")
138 1
                with circuit.lock:
139 1
                    circuit.activate()
140 1
                    circuit.sync()
141
            else:
142 1
                circuit.execution_rounds += 1
143 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
144 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
145 1
                    with circuit.lock:
146 1
                        circuit.deploy()
147
148 1
    def shutdown(self):
149
        """Execute when your napp is unloaded.
150
151
        If you have some cleanup procedure, insert it here.
152
        """
153
154 1
    @rest("/v2/evc/", methods=["GET"])
155 1
    def list_circuits(self, request: Request) -> JSONResponse:
156
        """Endpoint to return circuits stored.
157
158
        archive query arg if defined (not null) will be filtered
159
        accordingly, by default only non archived evcs will be listed
160
        """
161 1
        log.debug("list_circuits /v2/evc")
162 1
        args = request.query_params
163 1
        archived = args.get("archived", "false").lower()
164 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
165 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
166
                                                      metadata=args)
167 1
        circuits = circuits['circuits']
168 1
        return JSONResponse(circuits)
169
170 1
    @rest("/v2/evc/schedule", methods=["GET"])
171 1
    def list_schedules(self, _request: Request) -> JSONResponse:
172
        """Endpoint to return all schedules stored for all circuits.
173
174
        Return a JSON with the following template:
175
        [{"schedule_id": <schedule_id>,
176
         "circuit_id": <circuit_id>,
177
         "schedule": <schedule object>}]
178
        """
179 1
        log.debug("list_schedules /v2/evc/schedule")
180 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
181 1
        if not circuits:
182 1
            result = {}
183 1
            status = 200
184 1
            return JSONResponse(result, status_code=status)
185
186 1
        result = []
187 1
        status = 200
188 1
        for circuit in circuits:
189 1
            circuit_scheduler = circuit.get("circuit_scheduler")
190 1
            if circuit_scheduler:
191 1
                for scheduler in circuit_scheduler:
192 1
                    value = {
193
                        "schedule_id": scheduler.get("id"),
194
                        "circuit_id": circuit.get("id"),
195
                        "schedule": scheduler,
196
                    }
197 1
                    result.append(value)
198
199 1
        log.debug("list_schedules result %s %s", result, status)
200 1
        return JSONResponse(result, status_code=status)
201
202 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
203 1
    def get_circuit(self, request: Request) -> JSONResponse:
204
        """Endpoint to return a circuit based on id."""
205 1
        circuit_id = request.path_params["circuit_id"]
206 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
207 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
208 1
        if not circuit:
209 1
            result = f"circuit_id {circuit_id} not found"
210 1
            log.debug("get_circuit result %s %s", result, 404)
211 1
            raise HTTPException(404, detail=result)
212 1
        status = 200
213 1
        log.debug("get_circuit result %s %s", circuit, status)
214 1
        return JSONResponse(circuit, status_code=status)
215
216
    # pylint: disable=too-many-branches, too-many-statements
217 1
    @rest("/v2/evc/", methods=["POST"])
218 1
    @validate_openapi(spec)
219 1
    def create_circuit(self, request: Request) -> JSONResponse:
220
        """Try to create a new circuit.
221
222
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
223
        UNI_Z's requested C-VID are available from the interfaces' pools. This
224
        is checked when creating the UNI object.
225
226
        Then, E-Line NApp requests a primary and a backup path to the
227
        Pathfinder NApp using the attributes primary_links and backup_links
228
        submitted via REST
229
230
        # For each link composing paths in #3:
231
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
232
        #  - Using the S-VID obtained, generate abstract flow entries to be
233
        #    sent to FlowManager
234
235
        Push abstract flow entries to FlowManager and FlowManager pushes
236
        OpenFlow entries to datapaths
237
238
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
239
        creation
240
241
        Finnaly, notify user of the status of its request.
242
        """
243
        # Try to create the circuit object
244 1
        log.debug("create_circuit /v2/evc/")
245 1
        data = get_json_or_400(request, self.controller.loop)
246
247 1
        try:
248 1
            evc = self._evc_from_dict(data)
249 1
        except (ValueError, KytosTagError) as exception:
250 1
            log.debug("create_circuit result %s %s", exception, 400)
251 1
            raise HTTPException(400, detail=str(exception)) from exception
252 1
        try:
253 1
            check_disabled_component(evc.uni_a, evc.uni_z)
254 1
        except DisabledSwitch as exception:
255 1
            log.debug("create_circuit result %s %s", exception, 409)
256 1
            raise HTTPException(
257
                    409,
258
                    detail=f"Path is not valid: {exception}"
259
                ) from exception
260
261 1
        if evc.primary_path:
262 1
            try:
263 1
                evc.primary_path.is_valid(
264
                    evc.uni_a.interface.switch,
265
                    evc.uni_z.interface.switch,
266
                    bool(evc.circuit_scheduler),
267
                )
268 1
            except InvalidPath as exception:
269 1
                raise HTTPException(
270
                    400,
271
                    detail=f"primary_path is not valid: {exception}"
272
                ) from exception
273 1
        if evc.backup_path:
274 1
            try:
275 1
                evc.backup_path.is_valid(
276
                    evc.uni_a.interface.switch,
277
                    evc.uni_z.interface.switch,
278
                    bool(evc.circuit_scheduler),
279
                )
280 1
            except InvalidPath as exception:
281 1
                raise HTTPException(
282
                    400,
283
                    detail=f"backup_path is not valid: {exception}"
284
                ) from exception
285
286 1
        if not evc._tag_lists_equal():
287 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
288 1
            raise HTTPException(400, detail=detail)
289
290 1
        try:
291 1
            evc._validate_has_primary_or_dynamic()
292 1
        except ValueError as exception:
293 1
            raise HTTPException(400, detail=str(exception)) from exception
294
295 1
        try:
296 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
297
        except DuplicatedNoTagUNI as exception:
298
            log.debug("create_circuit result %s %s", exception, 409)
299
            raise HTTPException(409, detail=str(exception)) from exception
300
301 1
        try:
302 1
            self._use_uni_tags(evc)
303 1
        except KytosTagError as exception:
304 1
            raise HTTPException(400, detail=str(exception)) from exception
305
306
        # save circuit
307 1
        try:
308 1
            evc.sync()
309
        except ValidationError as exception:
310
            raise HTTPException(400, detail=str(exception)) from exception
311
312
        # store circuit in dictionary
313 1
        self.circuits[evc.id] = evc
314
315
        # Schedule the circuit deploy
316 1
        self.sched.add(evc)
317
318
        # Circuit has no schedule, deploy now
319 1
        deployed = False
320 1
        if not evc.circuit_scheduler:
321 1
            with evc.lock:
322 1
                deployed = evc.deploy()
323
324
        # Notify users
325 1
        result = {"circuit_id": evc.id, "deployed": deployed}
326 1
        status = 201
327 1
        log.debug("create_circuit result %s %s", result, status)
328 1
        emit_event(self.controller, name="created",
329
                   content=map_evc_event_content(evc))
330 1
        return JSONResponse(result, status_code=status)
331
332 1
    @staticmethod
333 1
    def _use_uni_tags(evc):
334 1
        uni_a = evc.uni_a
335 1
        evc._use_uni_vlan(uni_a)
336 1
        try:
337 1
            uni_z = evc.uni_z
338 1
            evc._use_uni_vlan(uni_z)
339 1
        except KytosTagError as err:
340 1
            evc.make_uni_vlan_available(uni_a)
341 1
            raise err
342
343 1
    @listen_to('kytos/flow_manager.flow.removed')
344 1
    def on_flow_delete(self, event):
345
        """Capture delete messages to keep track when flows got removed."""
346
        self.handle_flow_delete(event)
347
348 1
    def handle_flow_delete(self, event):
349
        """Keep track when the EVC got flows removed by deriving its cookie."""
350 1
        flow = event.content["flow"]
351 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
352 1
        if evc:
353 1
            log.debug("Flow removed in EVC %s", evc.id)
354 1
            evc.set_flow_removed_at()
355
356 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
357 1
    @validate_openapi(spec)
358 1
    def update(self, request: Request) -> JSONResponse:
359
        """Update a circuit based on payload.
360
361
        The EVC attributes (creation_time, active, current_path,
362
        failover_path, _id, archived) can't be updated.
363
        """
364 1
        data = get_json_or_400(request, self.controller.loop)
365 1
        circuit_id = request.path_params["circuit_id"]
366 1
        log.debug("update /v2/evc/%s", circuit_id)
367 1
        try:
368 1
            evc = self.circuits[circuit_id]
369 1
        except KeyError:
370 1
            result = f"circuit_id {circuit_id} not found"
371 1
            log.debug("update result %s %s", result, 404)
372 1
            raise HTTPException(404, detail=result) from KeyError
373
374 1
        try:
375 1
            updated_data = self._evc_dict_with_instances(data)
376 1
            self._check_no_tag_duplication(
377
                circuit_id, updated_data.get("uni_a"),
378
                updated_data.get("uni_z")
379
            )
380 1
            enable, redeploy = evc.update(**updated_data)
381 1
        except (ValueError, KytosTagError, ValidationError) as exception:
382 1
            log.debug("update result %s %s", exception, 400)
383 1
            raise HTTPException(400, detail=str(exception)) from exception
384 1
        except DuplicatedNoTagUNI as exception:
385
            log.debug("update result %s %s", exception, 409)
386
            raise HTTPException(409, detail=str(exception)) from exception
387 1
        except DisabledSwitch as exception:
388 1
            log.debug("update result %s %s", exception, 409)
389 1
            raise HTTPException(
390
                    409,
391
                    detail=f"Path is not valid: {exception}"
392
                ) from exception
393 1
        redeployed = False
394 1
        if evc.is_active():
395
            if enable is False:  # disable if active
396
                with evc.lock:
397
                    evc.remove()
398
            elif redeploy is not None:  # redeploy if active
399
                with evc.lock:
400
                    evc.remove()
401
                    redeployed = evc.deploy()
402
        else:
403 1
            if enable is True:  # enable if inactive
404 1
                with evc.lock:
405 1
                    redeployed = evc.deploy()
406 1
            elif evc.is_enabled() and redeploy:
407 1
                with evc.lock:
408 1
                    evc.remove()
409 1
                    redeployed = evc.deploy()
410 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
411 1
        status = 200
412
413 1
        log.debug("update result %s %s", result, status)
414 1
        emit_event(self.controller, "updated",
415
                   content=map_evc_event_content(evc, **data))
416 1
        return JSONResponse(result, status_code=status)
417
418 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
419 1
    def delete_circuit(self, request: Request) -> JSONResponse:
420
        """Remove a circuit.
421
422
        First, the flows are removed from the switches, and then the EVC is
423
        disabled.
424
        """
425 1
        circuit_id = request.path_params["circuit_id"]
426 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
427 1
        try:
428 1
            evc = self.circuits.pop(circuit_id)
429 1
        except KeyError:
430 1
            result = f"circuit_id {circuit_id} not found"
431 1
            log.debug("delete_circuit result %s %s", result, 404)
432 1
            raise HTTPException(404, detail=result) from KeyError
433 1
        log.info("Removing %s", evc)
434
435 1
        with evc.lock:
436 1
            if not evc.archived:
437 1
                evc.deactivate()
438 1
                evc.disable()
439 1
                self.sched.remove(evc)
440 1
                evc.remove_current_flows(sync=False)
441 1
                evc.remove_failover_flows(sync=False)
442 1
                evc.archive()
443 1
                evc.remove_uni_tags()
444 1
                evc.sync()
445 1
                emit_event(
446
                    self.controller, "deleted",
447
                    content=map_evc_event_content(evc)
448
                )
449
450 1
        log.info("EVC removed. %s", evc)
451 1
        result = {"response": f"Circuit {circuit_id} removed"}
452 1
        status = 200
453 1
        log.debug("delete_circuit result %s %s", result, status)
454
455 1
        return JSONResponse(result, status_code=status)
456
457 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
458 1
    def get_metadata(self, request: Request) -> JSONResponse:
459
        """Get metadata from an EVC."""
460 1
        circuit_id = request.path_params["circuit_id"]
461 1
        try:
462 1
            return (
463
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
464
            )
465
        except KeyError as error:
466
            raise HTTPException(
467
                404,
468
                detail=f"circuit_id {circuit_id} not found."
469
            ) from error
470
471 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
472 1
    @validate_openapi(spec)
473 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
474
        """Add metadata to a bulk of EVCs."""
475 1
        data = get_json_or_400(request, self.controller.loop)
476 1
        circuit_ids = data.pop("circuit_ids")
477
478 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
479
480 1
        fail_evcs = []
481 1
        for _id in circuit_ids:
482 1
            try:
483 1
                evc = self.circuits[_id]
484 1
                evc.extend_metadata(data)
485 1
            except KeyError:
486 1
                fail_evcs.append(_id)
487
488 1
        if fail_evcs:
489 1
            raise HTTPException(404, detail=fail_evcs)
490 1
        return JSONResponse("Operation successful", status_code=201)
491
492 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
493 1
    @validate_openapi(spec)
494 1
    def add_metadata(self, request: Request) -> JSONResponse:
495
        """Add metadata to an EVC."""
496 1
        circuit_id = request.path_params["circuit_id"]
497 1
        metadata = get_json_or_400(request, self.controller.loop)
498 1
        if not isinstance(metadata, dict):
499
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError as error:
503 1
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found."
506
            ) from error
507
508 1
        evc.extend_metadata(metadata)
509 1
        evc.sync()
510 1
        return JSONResponse("Operation successful", status_code=201)
511
512 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
513 1
    @validate_openapi(spec)
514 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
515
        """Delete metada from a bulk of EVCs"""
516 1
        data = get_json_or_400(request, self.controller.loop)
517 1
        key = request.path_params["key"]
518 1
        circuit_ids = data.pop("circuit_ids")
519 1
        self.mongo_controller.update_evcs_metadata(
520
            circuit_ids, {key: ""}, "del"
521
        )
522
523 1
        fail_evcs = []
524 1
        for _id in circuit_ids:
525 1
            try:
526 1
                evc = self.circuits[_id]
527 1
                evc.remove_metadata(key)
528 1
            except KeyError:
529 1
                fail_evcs.append(_id)
530
531 1
        if fail_evcs:
532 1
            raise HTTPException(404, detail=fail_evcs)
533 1
        return JSONResponse("Operation successful")
534
535 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
536 1
    def delete_metadata(self, request: Request) -> JSONResponse:
537
        """Delete metadata from an EVC."""
538 1
        circuit_id = request.path_params["circuit_id"]
539 1
        key = request.path_params["key"]
540 1
        try:
541 1
            evc = self.circuits[circuit_id]
542 1
        except KeyError as error:
543 1
            raise HTTPException(
544
                404,
545
                detail=f"circuit_id {circuit_id} not found."
546
            ) from error
547
548 1
        evc.remove_metadata(key)
549 1
        evc.sync()
550 1
        return JSONResponse("Operation successful")
551
552 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
553 1
    def redeploy(self, request: Request) -> JSONResponse:
554
        """Endpoint to force the redeployment of an EVC."""
555 1
        circuit_id = request.path_params["circuit_id"]
556 1
        try_avoid_same_s_vlan = request.query_params.get(
557
            "try_avoid_same_s_vlan", "true"
558
        )
559 1
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
560 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
561
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
562
            raise HTTPException(400, detail=msg)
563 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
564 1
        try:
565 1
            evc = self.circuits[circuit_id]
566 1
        except KeyError:
567 1
            raise HTTPException(
568
                404,
569
                detail=f"circuit_id {circuit_id} not found"
570
            ) from KeyError
571 1
        deployed = False
572 1
        if evc.is_enabled():
573 1
            with evc.lock:
574 1
                path_dict = evc.remove_current_flows(
575
                    sync=False,
576
                    return_path=try_avoid_same_s_vlan == "true"
577
                )
578 1
                evc.remove_failover_flows(sync=True)
579 1
                deployed = evc.deploy(path_dict)
580 1
        if deployed:
581 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
582 1
            status = 202
583
        else:
584 1
            result = {
585
                "response": f"Circuit {circuit_id} is disabled."
586
            }
587 1
            status = 409
588
589 1
        return JSONResponse(result, status_code=status)
590
591 1
    @rest("/v2/evc/schedule/", methods=["POST"])
592 1
    @validate_openapi(spec)
593 1
    def create_schedule(self, request: Request) -> JSONResponse:
594
        """
595
        Create a new schedule for a given circuit.
596
597
        This service do no check if there are conflicts with another schedule.
598
        Payload example:
599
            {
600
              "circuit_id":"aa:bb:cc",
601
              "schedule": {
602
                "date": "2019-08-07T14:52:10.967Z",
603
                "interval": "string",
604
                "frequency": "1 * * * *",
605
                "action": "create"
606
              }
607
            }
608
        """
609 1
        log.debug("create_schedule /v2/evc/schedule/")
610 1
        data = get_json_or_400(request, self.controller.loop)
611 1
        circuit_id = data["circuit_id"]
612 1
        schedule_data = data["schedule"]
613
614
        # Get EVC from circuits buffer
615 1
        circuits = self._get_circuits_buffer()
616
617
        # get the circuit
618 1
        evc = circuits.get(circuit_id)
619
620
        # get the circuit
621 1
        if not evc:
622 1
            result = f"circuit_id {circuit_id} not found"
623 1
            log.debug("create_schedule result %s %s", result, 404)
624 1
            raise HTTPException(404, detail=result)
625
626
        # new schedule from dict
627 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
628
629
        # If there is no schedule, create the list
630 1
        if not evc.circuit_scheduler:
631 1
            evc.circuit_scheduler = []
632
633
        # Add the new schedule
634 1
        evc.circuit_scheduler.append(new_schedule)
635
636
        # Add schedule job
637 1
        self.sched.add_circuit_job(evc, new_schedule)
638
639
        # save circuit to mongodb
640 1
        evc.sync()
641
642 1
        result = new_schedule.as_dict()
643 1
        status = 201
644
645 1
        log.debug("create_schedule result %s %s", result, status)
646 1
        return JSONResponse(result, status_code=status)
647
648 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
649 1
    @validate_openapi(spec)
650 1
    def update_schedule(self, request: Request) -> JSONResponse:
651
        """Update a schedule.
652
653
        Change all attributes from the given schedule from a EVC circuit.
654
        The schedule ID is preserved as default.
655
        Payload example:
656
            {
657
              "date": "2019-08-07T14:52:10.967Z",
658
              "interval": "string",
659
              "frequency": "1 * * *",
660
              "action": "create"
661
            }
662
        """
663 1
        data = get_json_or_400(request, self.controller.loop)
664 1
        schedule_id = request.path_params["schedule_id"]
665 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
666
667
        # Try to find a circuit schedule
668 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
669
670
        # Can not modify circuits deleted and archived
671 1
        if not found_schedule:
672 1
            result = f"schedule_id {schedule_id} not found"
673 1
            log.debug("update_schedule result %s %s", result, 404)
674 1
            raise HTTPException(404, detail=result)
675
676 1
        new_schedule = CircuitSchedule.from_dict(data)
677 1
        new_schedule.id = found_schedule.id
678
        # Remove the old schedule
679 1
        evc.circuit_scheduler.remove(found_schedule)
680
        # Append the modified schedule
681 1
        evc.circuit_scheduler.append(new_schedule)
682
683
        # Cancel all schedule jobs
684 1
        self.sched.cancel_job(found_schedule.id)
685
        # Add the new circuit schedule
686 1
        self.sched.add_circuit_job(evc, new_schedule)
687
        # Save EVC to mongodb
688 1
        evc.sync()
689
690 1
        result = new_schedule.as_dict()
691 1
        status = 200
692
693 1
        log.debug("update_schedule result %s %s", result, status)
694 1
        return JSONResponse(result, status_code=status)
695
696 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
697 1
    def delete_schedule(self, request: Request) -> JSONResponse:
698
        """Remove a circuit schedule.
699
700
        Remove the Schedule from EVC.
701
        Remove the Schedule from cron job.
702
        Save the EVC to the Storehouse.
703
        """
704 1
        schedule_id = request.path_params["schedule_id"]
705 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
706 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
707
708
        # Can not modify circuits deleted and archived
709 1
        if not found_schedule:
710 1
            result = f"schedule_id {schedule_id} not found"
711 1
            log.debug("delete_schedule result %s %s", result, 404)
712 1
            raise HTTPException(404, detail=result)
713
714
        # Remove the old schedule
715 1
        evc.circuit_scheduler.remove(found_schedule)
716
717
        # Cancel all schedule jobs
718 1
        self.sched.cancel_job(found_schedule.id)
719
        # Save EVC to mongodb
720 1
        evc.sync()
721
722 1
        result = "Schedule removed"
723 1
        status = 200
724
725 1
        log.debug("delete_schedule result %s %s", result, status)
726 1
        return JSONResponse(result, status_code=status)
727
728 1
    def _check_no_tag_duplication(
729
        self,
730
        evc_id: str,
731
        uni_a: Optional[UNI] = None,
732
        uni_z: Optional[UNI] = None
733
    ):
734
        """Check if the given EVC has UNIs with no tag and if these are
735
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
736
        Args:
737
            evc (dict): EVC to be analyzed.
738
        """
739
740
        # No UNIs
741 1
        if not (uni_a or uni_z):
742 1
            return
743
744 1
        if (not (uni_a and not uni_a.user_tag) and
745
                not (uni_z and not uni_z.user_tag)):
746 1
            return
747 1
        for circuit in self.circuits.copy().values():
748 1
            if (not circuit.archived and circuit._id != evc_id):
749 1
                if uni_a and uni_a.user_tag is None:
750 1
                    circuit.check_no_tag_duplicate(uni_a)
751 1
                if uni_z and uni_z.user_tag is None:
752 1
                    circuit.check_no_tag_duplicate(uni_z)
753
754 1
    @listen_to("kytos/topology.link_up")
755 1
    def on_link_up(self, event):
756
        """Change circuit when link is up or end_maintenance."""
757
        self.handle_link_up(event)
758
759 1
    def handle_link_up(self, event):
760
        """Change circuit when link is up or end_maintenance."""
761 1
        log.info("Event handle_link_up %s", event.content["link"])
762 1
        for evc in self.get_evcs_by_svc_level():
763 1
            if evc.is_enabled() and not evc.archived:
764 1
                with evc.lock:
765 1
                    evc.handle_link_up(event.content["link"])
766
767
    # Possibly replace this with interruptions?
768 1
    @listen_to(
769
        '.*.switch.interface.(link_up|link_down|created|deleted)'
770
    )
771 1
    def on_interface_link_change(self, event: KytosEvent):
772
        """
773
        Handler for interface link_up and link_down events.
774
        """
775
        self.handle_on_interface_link_change(event)
776
777 1
    def handle_on_interface_link_change(self, event: KytosEvent):
778
        """
779
        Handler to sort interface events {link_(up, down), create, deleted}
780
781
        To avoid multiple database updated (link flap):
782
        Every interface is identfied and processed in parallel.
783
        Once an interface event is received a time is started.
784
        While time is running self._intf_events will be updated.
785
        After time has passed last received event will be processed.
786
        """
787 1
        iface = event.content.get("interface")
788 1
        with self._lock_interfaces[iface.id]:
789 1
            _now = event.timestamp
790
            # Return out of order events
791 1
            if (
792
                iface.id in self._intf_events
793
                and self._intf_events[iface.id]["event"].timestamp > _now
794
            ):
795 1
                return
796 1
            self._intf_events[iface.id].update({"event": event})
797 1
            if "last_acquired" in self._intf_events[iface.id]:
798 1
                return
799 1
            self._intf_events[iface.id].update({"last_acquired": now()})
800 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
801 1
        with self._lock_interfaces[iface.id]:
802 1
            event = self._intf_events[iface.id]["event"]
803 1
            self._intf_events[iface.id].pop('last_acquired', None)
804 1
            _, _, event_type = event.name.rpartition('.')
805 1
            if event_type in ('link_up', 'created'):
806 1
                self.handle_interface_link_up(iface)
807 1
            elif event_type in ('link_down', 'deleted'):
808 1
                self.handle_interface_link_down(iface)
809
810 1
    def handle_interface_link_up(self, interface):
811
        """
812
        Handler for interface link_up events
813
        """
814
        log.info("Event handle_interface_link_up %s", interface)
815
        for evc in self.get_evcs_by_svc_level():
816
            if _does_uni_affect_evc(evc, interface, "up"):
817
                with evc.lock:
818
                    evc.handle_interface_link_up(
819
                        interface
820
                    )
821
822 1
    def handle_interface_link_down(self, interface):
823
        """
824
        Handler for interface link_down events
825
        """
826
        log.info("Event handle_interface_link_down %s", interface)
827
        for evc in self.get_evcs_by_svc_level():
828
            if _does_uni_affect_evc(evc, interface, "down"):
829
                with evc.lock:
830
                    evc.handle_interface_link_down(
831
                        interface
832
                    )
833
834 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
835 1
    def on_link_down(self, event):
836
        """Change circuit when link is down or under_mantenance."""
837
        self.handle_link_down(event)
838
839
    # pylint: disable=too-many-branches
840
    # pylint: disable=too-many-locals
841 1
    def handle_link_down(self, event):
842
        """Change circuit when link is down or under_mantenance."""
843 1
        link = event.content["link"]
844 1
        log.info("Event handle_link_down %s", link)
845 1
        switch_flows = {}
846 1
        evcs_with_failover = []
847 1
        evcs_normal = []
848 1
        check_failover = []
849 1
        failover_event_contents = {}
850
851 1
        for evc in self.get_evcs_by_svc_level():
852 1
            with evc.lock:
853 1
                if evc.is_affected_by_link(link):
854 1
                    evc.affected_by_link_at = event.timestamp
855
                    # if there is no failover path, handles link down the
856
                    # tradditional way
857 1
                    if (
858
                        not evc.failover_path or
859
                        evc.is_failover_path_affected_by_link(link)
860
                    ):
861 1
                        evcs_normal.append(evc)
862 1
                        continue
863 1
                    try:
864 1
                        dpid_flows = evc.get_failover_flows()
865 1
                        evc.old_path = evc.current_path
866 1
                        evc.current_path = evc.failover_path
867 1
                        evc.failover_path = Path([])
868
                    # pylint: disable=broad-except
869 1
                    except Exception:
870 1
                        err = traceback.format_exc().replace("\n", ", ")
871 1
                        log.error(
872
                            "Ignore Failover path for "
873
                            f"{evc} due to error: {err}"
874
                        )
875 1
                        evcs_normal.append(evc)
876 1
                        continue
877 1
                    for dpid, flows in dpid_flows.items():
878 1
                        switch_flows.setdefault(dpid, [])
879 1
                        switch_flows[dpid].extend(flows)
880 1
                    evcs_with_failover.append(evc)
881 1
                    failover_event_contents[evc.id] = map_evc_event_content(
882
                        evc,
883
                        flows={k: v.copy() for k, v in switch_flows.items()}
884
                    )
885 1
                elif evc.is_failover_path_affected_by_link(link):
886 1
                    evc.old_path = evc.failover_path
887 1
                    evc.failover_path = Path([])
888 1
                    check_failover.append(evc)
889
890 1
        if failover_event_contents:
891 1
            emit_event(self.controller, "failover_link_down",
892
                       content=deepcopy(failover_event_contents))
893 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
894
895 1
        for evc in evcs_normal:
896 1
            emit_event(
897
                self.controller,
898
                "evc_affected_by_link_down",
899
                content={"link": link} | map_evc_event_content(evc)
900
            )
901
902 1
        evcs_to_update = []
903 1
        for evc in evcs_with_failover:
904 1
            evcs_to_update.append(evc.as_dict())
905 1
            log.info(
906
                f"{evc} redeployed with failover due to link down {link.id}"
907
            )
908 1
        for evc in check_failover:
909 1
            evcs_to_update.append(evc.as_dict())
910
911 1
        self.mongo_controller.update_evcs(evcs_to_update)
912
913 1
        emit_event(
914
            self.controller,
915
            "cleanup_evcs_old_path",
916
            content={"evcs": evcs_with_failover + check_failover}
917
        )
918
919 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
920 1
    def on_evc_affected_by_link_down(self, event):
921
        """Change circuit when link is down or under_mantenance."""
922
        self.handle_evc_affected_by_link_down(event)
923
924 1
    def handle_evc_affected_by_link_down(self, event):
925
        """Change circuit when link is down or under_mantenance."""
926 1
        evc = self.circuits.get(event.content["evc_id"])
927 1
        link = event.content['link']
928 1
        if not evc:
929 1
            return
930 1
        with evc.lock:
931 1
            if not evc.is_affected_by_link(link):
932
                return
933 1
            result = evc.handle_link_down()
934 1
        event_name = "error_redeploy_link_down"
935 1
        if result:
936 1
            log.info(f"{evc} redeployed due to link down {link.id}")
937 1
            event_name = "redeployed_link_down"
938 1
        emit_event(self.controller, event_name,
939
                   content=map_evc_event_content(evc))
940
941 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
942 1
    def on_evc_deployed(self, event):
943
        """Handle EVC deployed|redeployed_link_down."""
944
        self.handle_evc_deployed(event)
945
946 1
    def handle_evc_deployed(self, event):
947
        """Setup failover path on evc deployed."""
948
        evc = self.circuits.get(event.content["evc_id"])
949
        if not evc:
950
            return
951
        evc.try_setup_failover_path()
952
953 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
954 1
    def on_cleanup_evcs_old_path(self, event):
955
        """Handle cleanup evcs old path."""
956
        self.handle_cleanup_evcs_old_path(event)
957
958 1
    def handle_cleanup_evcs_old_path(self, event):
959
        """Handle cleanup evcs old path."""
960 1
        evcs = event.content.get("evcs", [])
961 1
        event_contents: dict[str, dict] = defaultdict(list)
962 1
        total_flows = {}
963 1
        for evc in evcs:
964 1
            if not evc.old_path:
965 1
                continue
966 1
            with evc.lock:
967 1
                removed_flows = {}
968 1
                try:
969 1
                    nni_flows = prepare_delete_flow(
970
                        evc._prepare_nni_flows(evc.old_path)
971
                    )
972 1
                    uni_flows = prepare_delete_flow(
973
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
974
                    )
975 1
                    removed_flows = merge_flow_dicts(
976
                        nni_flows, uni_flows
977
                    )
978
                # pylint: disable=broad-except
979
                except Exception:
980
                    err = traceback.format_exc().replace("\n", ", ")
981
                    log.error(f"Fail to remove {evc} old_path: {err}")
982
                    continue
983 1
                if removed_flows:
984 1
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
985 1
                    content = map_evc_event_content(
986
                        evc,
987
                        removed_flows=deepcopy(removed_flows),
988
                        current_path=evc.current_path.as_dict(),
989
                    )
990 1
                    event_contents[evc.id] = content
991 1
                    evc.old_path = Path([])
992 1
        if event_contents:
993 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
994 1
            emit_event(self.controller, "failover_old_path",
995
                       content=event_contents)
996
997 1
    @listen_to("kytos/topology.topology_loaded")
998 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
999
        """Load EVCs once the topology is available."""
1000
        self.load_all_evcs()
1001
1002 1
    def load_all_evcs(self):
1003
        """Try to load all EVCs on startup."""
1004 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1005 1
        for circuit_id, circuit in circuits:
1006 1
            if circuit_id not in self.circuits:
1007 1
                self._load_evc(circuit)
1008 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1009
                   timeout=1)
1010
1011 1
    def _load_evc(self, circuit_dict):
1012
        """Load one EVC from mongodb to memory."""
1013 1
        try:
1014 1
            evc = self._evc_from_dict(circuit_dict)
1015 1
        except (ValueError, KytosTagError) as exception:
1016 1
            log.error(
1017
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1018
            )
1019 1
            return None
1020 1
        if evc.archived:
1021 1
            return None
1022
1023 1
        self.circuits.setdefault(evc.id, evc)
1024 1
        self.sched.add(evc)
1025 1
        return evc
1026
1027 1
    @listen_to("kytos/flow_manager.flow.error")
1028 1
    def on_flow_mod_error(self, event):
1029
        """Handle flow mod errors related to an EVC."""
1030
        self.handle_flow_mod_error(event)
1031
1032 1
    def handle_flow_mod_error(self, event):
1033
        """Handle flow mod errors related to an EVC."""
1034 1
        flow = event.content["flow"]
1035 1
        command = event.content.get("error_command")
1036 1
        if command != "add":
1037
            return
1038 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1039 1
        with evc.lock:
1040 1
            evc.remove_current_flows(sync=False)
1041 1
            evc.remove_failover_flows(sync=True)
1042
1043 1
    def _evc_dict_with_instances(self, evc_dict):
1044
        """Convert some dict values to instance of EVC classes.
1045
1046
        This method will convert: [UNI, Link]
1047
        """
1048 1
        data = evc_dict.copy()  # Do not modify the original dict
1049 1
        for attribute, value in data.items():
1050
            # Get multiple attributes.
1051
            # Ex: uni_a, uni_z
1052 1
            if "uni" in attribute:
1053 1
                try:
1054 1
                    data[attribute] = self._uni_from_dict(value)
1055 1
                except ValueError as exception:
1056 1
                    result = "Error creating UNI: Invalid value"
1057 1
                    raise ValueError(result) from exception
1058
1059 1
            if attribute == "circuit_scheduler":
1060 1
                data[attribute] = []
1061 1
                for schedule in value:
1062 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1063
1064
            # Get multiple attributes.
1065
            # Ex: primary_links,
1066
            #     backup_links,
1067
            #     current_links_cache,
1068
            #     primary_links_cache,
1069
            #     backup_links_cache
1070 1
            if "links" in attribute:
1071 1
                data[attribute] = [
1072
                    self._link_from_dict(link, attribute) for link in value
1073
                ]
1074
1075
            # Ex: current_path,
1076
            #     primary_path,
1077
            #     backup_path
1078 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1079 1
                data[attribute] = Path(
1080
                    [self._link_from_dict(link, attribute) for link in value]
1081
                )
1082
1083 1
        return data
1084
1085 1
    def _evc_from_dict(self, evc_dict):
1086 1
        data = self._evc_dict_with_instances(evc_dict)
1087 1
        data["table_group"] = self.table_group
1088 1
        return EVC(self.controller, **data)
1089
1090 1
    def _uni_from_dict(self, uni_dict):
1091
        """Return a UNI object from python dict."""
1092 1
        if uni_dict is None:
1093 1
            return False
1094
1095 1
        interface_id = uni_dict.get("interface_id")
1096 1
        interface = self.controller.get_interface_by_id(interface_id)
1097 1
        if interface is None:
1098 1
            result = (
1099
                "Error creating UNI:"
1100
                + f"Could not instantiate interface {interface_id}"
1101
            )
1102 1
            raise ValueError(result) from ValueError
1103 1
        tag_convert = {1: "vlan"}
1104 1
        tag_dict = uni_dict.get("tag", None)
1105 1
        if tag_dict:
1106 1
            tag_type = tag_dict.get("tag_type")
1107 1
            tag_type = tag_convert.get(tag_type, tag_type)
1108 1
            tag_value = tag_dict.get("value")
1109 1
            if isinstance(tag_value, list):
1110 1
                tag_value = get_tag_ranges(tag_value)
1111 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1112 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1113
            else:
1114 1
                tag = TAG(tag_type, tag_value)
1115
        else:
1116 1
            tag = None
1117 1
        uni = UNI(interface, tag)
1118 1
        return uni
1119
1120 1
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1121
        """Return a Link object from python dict."""
1122 1
        id_a = link_dict.get("endpoint_a").get("id")
1123 1
        id_b = link_dict.get("endpoint_b").get("id")
1124
1125 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1126 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1127 1
        if not endpoint_a:
1128 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1129 1
            raise ValueError(error_msg)
1130 1
        if not endpoint_b:
1131
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1132
            raise ValueError(error_msg)
1133
1134 1
        link = Link(endpoint_a, endpoint_b)
1135 1
        allowed_paths = {"current_path", "failover_path"}
1136 1
        if "metadata" in link_dict and attribute in allowed_paths:
1137 1
            link.extend_metadata(link_dict.get("metadata"))
1138
1139 1
        s_vlan = link.get_metadata("s_vlan")
1140 1
        if s_vlan:
1141 1
            tag = TAG.from_dict(s_vlan)
1142 1
            if tag is False:
1143
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1144
                raise ValueError(error_msg)
1145 1
            link.update_metadata("s_vlan", tag)
1146 1
        return link
1147
1148 1
    def _find_evc_by_schedule_id(self, schedule_id):
1149
        """
1150
        Find an EVC and CircuitSchedule based on schedule_id.
1151
1152
        :param schedule_id: Schedule ID
1153
        :return: EVC and Schedule
1154
        """
1155 1
        circuits = self._get_circuits_buffer()
1156 1
        found_schedule = None
1157 1
        evc = None
1158
1159
        # pylint: disable=unused-variable
1160 1
        for c_id, circuit in circuits.items():
1161 1
            for schedule in circuit.circuit_scheduler:
1162 1
                if schedule.id == schedule_id:
1163 1
                    found_schedule = schedule
1164 1
                    evc = circuit
1165 1
                    break
1166 1
            if found_schedule:
1167 1
                break
1168 1
        return evc, found_schedule
1169
1170 1
    def _get_circuits_buffer(self):
1171
        """
1172
        Return the circuit buffer.
1173
1174
        If the buffer is empty, try to load data from mongodb.
1175
        """
1176 1
        if not self.circuits:
1177
            # Load circuits from mongodb to buffer
1178 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1179 1
            for c_id, circuit in circuits.items():
1180 1
                evc = self._evc_from_dict(circuit)
1181 1
                self.circuits[c_id] = evc
1182 1
        return self.circuits
1183
1184
    # pylint: disable=attribute-defined-outside-init
1185 1
    @alisten_to("kytos/of_multi_table.enable_table")
1186 1
    async def on_table_enabled(self, event):
1187
        """Handle a recently table enabled."""
1188 1
        table_group = event.content.get("mef_eline", None)
1189 1
        if not table_group:
1190 1
            return
1191 1
        for group in table_group:
1192 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1193 1
                log.error(f'The table group "{group}" is not allowed for '
1194
                          f'mef_eline. Allowed table groups are '
1195
                          f'{settings.TABLE_GROUP_ALLOWED}')
1196 1
                return
1197 1
        self.table_group.update(table_group)
1198 1
        content = {"group_table": self.table_group}
1199 1
        name = "kytos/mef_eline.enable_table"
1200
        await aemit_event(self.controller, name, content)
1201