Passed
Push — master ( 8932c8...89a60e )
by Vinicius
02:55 queued 15s
created

build.main   F

Complexity

Total Complexity 211

Size/Duplication

Total Lines 1187
Duplicated Lines 3.54 %

Test Coverage

Coverage 92.8%

Importance

Changes 0
Metric Value
wmc 211
eloc 795
dl 42
loc 1187
ccs 644
cts 694
cp 0.928
rs 1.805
c 0
b 0
f 0

52 Methods

Rating   Name   Duplication   Size   Complexity  
A Main.add_metadata() 0 19 3
A Main.get_metadata() 0 13 2
A Main.bulk_add_metadata() 20 20 4
A Main.handle_link_up() 0 7 5
A Main.on_link_up() 0 4 1
F Main._check_no_tag_duplication() 0 25 14
A Main._use_uni_tags() 0 10 2
A Main.on_flow_delete() 0 4 1
A Main.handle_flow_delete() 0 7 2
A Main.on_cleanup_evcs_old_path() 0 4 1
A Main.list_circuits() 0 15 1
A Main.get_circuit() 0 13 2
A Main.shutdown() 0 2 1
A Main.setup() 0 30 1
A Main.execute() 0 8 3
A Main.get_evcs_by_svc_level() 0 13 4
A Main.get_eline_controller() 0 4 1
F Main.create_circuit() 0 114 14
B Main.list_schedules() 0 31 5
C Main.should_be_checked() 0 16 9
B Main.execute_consistency() 0 22 8
B Main.handle_cleanup_evcs_old_path() 0 37 7
C Main._evc_dict_with_instances() 0 41 9
A Main.on_link_down() 0 4 1
A Main.update_schedule() 0 47 2
A Main.handle_evc_deployed() 0 6 2
A Main._find_evc_by_schedule_id() 0 21 5
A Main._evc_from_dict() 0 4 1
A Main.load_all_evcs() 0 8 3
A Main.handle_interface_link_up() 0 9 3
A Main.delete_metadata() 0 16 2
A Main.on_evc_affected_by_link_down() 0 4 1
A Main.delete_schedule() 0 31 2
A Main.handle_evc_affected_by_link_down() 0 16 5
A Main.on_evc_deployed() 0 4 1
D Main.handle_link_down() 0 76 13
B Main._link_from_dict() 0 26 6
B Main._uni_from_dict() 0 29 5
B Main.redeploy() 0 28 5
A Main.handle_flow_mod_error() 0 11 4
F Main.update() 0 61 15
A Main.handle_interface_link_down() 0 9 3
B Main.handle_on_interface_link_change() 0 32 8
A Main.on_flow_mod_error() 0 4 1
A Main.on_topology_loaded() 0 4 1
A Main.bulk_delete_metadata() 22 22 4
A Main._load_evc() 0 15 3
A Main.on_table_enabled() 0 16 4
A Main.create_schedule() 0 56 3
A Main._get_circuits_buffer() 0 13 3
A Main.on_interface_link_change() 0 8 1
A Main.delete_circuit() 0 38 4

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.main often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from copy import deepcopy
11 1
from threading import Lock
12 1
from typing import Optional
13
14 1
from pydantic import ValidationError
15
16 1
from kytos.core import KytosNApp, log, rest
17 1
from kytos.core.events import KytosEvent
18 1
from kytos.core.exceptions import KytosTagError
19 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
20
                                validate_openapi)
21 1
from kytos.core.interface import TAG, UNI, TAGRange
22 1
from kytos.core.link import Link
23 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
24
                                 get_json_or_400)
25 1
from kytos.core.tag_ranges import get_tag_ranges
26 1
from napps.kytos.mef_eline import controllers, settings
27 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
28
                                              DuplicatedNoTagUNI, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts, prepare_delete_flow,
36
                                         send_flow_mods_event)
37
38
39
# pylint: disable=too-many-public-methods
40 1
class Main(KytosNApp):
41
    """Main class of amlight/mef_eline NApp.
42
43
    This class is the entry point for this napp.
44
    """
45
46 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
47
48 1
    def setup(self):
49
        """Replace the '__init__' method for the KytosNApp subclass.
50
51
        The setup method is automatically called by the controller when your
52
        application is loaded.
53
54
        So, if you have any setup routine, insert it here.
55
        """
56
        # object used to scheduler circuit events
57 1
        self.sched = Scheduler()
58
59
        # object to save and load circuits
60 1
        self.mongo_controller = self.get_eline_controller()
61 1
        self.mongo_controller.bootstrap_indexes()
62
63
        # set the controller that will manager the dynamic paths
64 1
        DynamicPathManager.set_controller(self.controller)
65
66
        # dictionary of EVCs created. It acts as a circuit buffer.
67
        # Every create/update/delete must be synced to mongodb.
68 1
        self.circuits = {}
69
70 1
        self._intf_events = defaultdict(dict)
71 1
        self._lock_interfaces = defaultdict(Lock)
72 1
        self.table_group = {"epl": 0, "evpl": 0}
73 1
        self._lock = Lock()
74 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
75
76 1
        self.load_all_evcs()
77 1
        self._topology_updated_at = None
78
79 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
80
        """Get circuits sorted by desc service level and asc creation_time.
81
82
        In the future, as more ops are offloaded it should be get from the DB.
83
        """
84 1
        if enable_filter:
85 1
            return sorted(
86
                          [circuit for circuit in self.circuits.values()
87
                           if circuit.is_enabled()],
88
                          key=lambda x: (-x.service_level, x.creation_time),
89
            )
90 1
        return sorted(self.circuits.values(),
91
                      key=lambda x: (-x.service_level, x.creation_time))
92
93 1
    @staticmethod
94 1
    def get_eline_controller():
95
        """Return the ELineController instance."""
96
        return controllers.ELineController()
97
98 1
    def execute(self):
99
        """Execute once when the napp is running."""
100 1
        if self._lock.locked():
101 1
            return
102 1
        log.debug("Starting consistency routine")
103 1
        with self._lock:
104 1
            self.execute_consistency()
105 1
        log.debug("Finished consistency routine")
106
107 1
    def should_be_checked(self, circuit):
108
        "Verify if the circuit meets the necessary conditions to be checked"
109
        # pylint: disable=too-many-boolean-expressions
110 1
        if (
111
                circuit.is_enabled()
112
                and not circuit.is_active()
113
                and not circuit.lock.locked()
114
                and not circuit.has_recent_removed_flow()
115
                and not circuit.is_recent_updated()
116
                and circuit.are_unis_active(self.controller.switches)
117
                # if a inter-switch EVC does not have current_path, it does not
118
                # make sense to run sdntrace on it
119
                and (circuit.is_intra_switch() or circuit.current_path)
120
                ):
121 1
            return True
122
        return False
123
124 1
    def execute_consistency(self):
125
        """Execute consistency routine."""
126 1
        circuits_to_check = []
127 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
            circuit.try_setup_failover_path()
131 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
132 1
        for circuit in circuits_to_check:
133 1
            is_checked = circuits_checked.get(circuit.id)
134 1
            if is_checked:
135 1
                circuit.execution_rounds = 0
136 1
                log.info(f"{circuit} enabled but inactive - activating")
137 1
                with circuit.lock:
138 1
                    circuit.activate()
139 1
                    circuit.sync()
140
            else:
141 1
                circuit.execution_rounds += 1
142 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
143 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
144 1
                    with circuit.lock:
145 1
                        circuit.deploy()
146
147 1
    def shutdown(self):
148
        """Execute when your napp is unloaded.
149
150
        If you have some cleanup procedure, insert it here.
151
        """
152
153 1
    @rest("/v2/evc/", methods=["GET"])
154 1
    def list_circuits(self, request: Request) -> JSONResponse:
155
        """Endpoint to return circuits stored.
156
157
        archive query arg if defined (not null) will be filtered
158
        accordingly, by default only non archived evcs will be listed
159
        """
160 1
        log.debug("list_circuits /v2/evc")
161 1
        args = request.query_params
162 1
        archived = args.get("archived", "false").lower()
163 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
164 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
165
                                                      metadata=args)
166 1
        circuits = circuits['circuits']
167 1
        return JSONResponse(circuits)
168
169 1
    @rest("/v2/evc/schedule", methods=["GET"])
170 1
    def list_schedules(self, _request: Request) -> JSONResponse:
171
        """Endpoint to return all schedules stored for all circuits.
172
173
        Return a JSON with the following template:
174
        [{"schedule_id": <schedule_id>,
175
         "circuit_id": <circuit_id>,
176
         "schedule": <schedule object>}]
177
        """
178 1
        log.debug("list_schedules /v2/evc/schedule")
179 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
180 1
        if not circuits:
181 1
            result = {}
182 1
            status = 200
183 1
            return JSONResponse(result, status_code=status)
184
185 1
        result = []
186 1
        status = 200
187 1
        for circuit in circuits:
188 1
            circuit_scheduler = circuit.get("circuit_scheduler")
189 1
            if circuit_scheduler:
190 1
                for scheduler in circuit_scheduler:
191 1
                    value = {
192
                        "schedule_id": scheduler.get("id"),
193
                        "circuit_id": circuit.get("id"),
194
                        "schedule": scheduler,
195
                    }
196 1
                    result.append(value)
197
198 1
        log.debug("list_schedules result %s %s", result, status)
199 1
        return JSONResponse(result, status_code=status)
200
201 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
202 1
    def get_circuit(self, request: Request) -> JSONResponse:
203
        """Endpoint to return a circuit based on id."""
204 1
        circuit_id = request.path_params["circuit_id"]
205 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
206 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
207 1
        if not circuit:
208 1
            result = f"circuit_id {circuit_id} not found"
209 1
            log.debug("get_circuit result %s %s", result, 404)
210 1
            raise HTTPException(404, detail=result)
211 1
        status = 200
212 1
        log.debug("get_circuit result %s %s", circuit, status)
213 1
        return JSONResponse(circuit, status_code=status)
214
215
    # pylint: disable=too-many-branches, too-many-statements
216 1
    @rest("/v2/evc/", methods=["POST"])
217 1
    @validate_openapi(spec)
218 1
    def create_circuit(self, request: Request) -> JSONResponse:
219
        """Try to create a new circuit.
220
221
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
222
        UNI_Z's requested C-VID are available from the interfaces' pools. This
223
        is checked when creating the UNI object.
224
225
        Then, E-Line NApp requests a primary and a backup path to the
226
        Pathfinder NApp using the attributes primary_links and backup_links
227
        submitted via REST
228
229
        # For each link composing paths in #3:
230
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
231
        #  - Using the S-VID obtained, generate abstract flow entries to be
232
        #    sent to FlowManager
233
234
        Push abstract flow entries to FlowManager and FlowManager pushes
235
        OpenFlow entries to datapaths
236
237
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
238
        creation
239
240
        Finnaly, notify user of the status of its request.
241
        """
242
        # Try to create the circuit object
243 1
        log.debug("create_circuit /v2/evc/")
244 1
        data = get_json_or_400(request, self.controller.loop)
245
246 1
        try:
247 1
            evc = self._evc_from_dict(data)
248 1
        except (ValueError, KytosTagError) as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 400)
250 1
            raise HTTPException(400, detail=str(exception)) from exception
251 1
        try:
252 1
            check_disabled_component(evc.uni_a, evc.uni_z)
253 1
        except DisabledSwitch as exception:
254 1
            log.debug("create_circuit result %s %s", exception, 409)
255 1
            raise HTTPException(
256
                    409,
257
                    detail=f"Path is not valid: {exception}"
258
                ) from exception
259
260 1
        if evc.primary_path:
261 1
            try:
262 1
                evc.primary_path.is_valid(
263
                    evc.uni_a.interface.switch,
264
                    evc.uni_z.interface.switch,
265
                    bool(evc.circuit_scheduler),
266
                )
267 1
            except InvalidPath as exception:
268 1
                raise HTTPException(
269
                    400,
270
                    detail=f"primary_path is not valid: {exception}"
271
                ) from exception
272 1
        if evc.backup_path:
273 1
            try:
274 1
                evc.backup_path.is_valid(
275
                    evc.uni_a.interface.switch,
276
                    evc.uni_z.interface.switch,
277
                    bool(evc.circuit_scheduler),
278
                )
279 1
            except InvalidPath as exception:
280 1
                raise HTTPException(
281
                    400,
282
                    detail=f"backup_path is not valid: {exception}"
283
                ) from exception
284
285 1
        if not evc._tag_lists_equal():
286 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
287 1
            raise HTTPException(400, detail=detail)
288
289 1
        try:
290 1
            evc._validate_has_primary_or_dynamic()
291 1
        except ValueError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
296
        except DuplicatedNoTagUNI as exception:
297
            log.debug("create_circuit result %s %s", exception, 409)
298
            raise HTTPException(409, detail=str(exception)) from exception
299
300 1
        try:
301 1
            self._use_uni_tags(evc)
302 1
        except KytosTagError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # save circuit
306 1
        try:
307 1
            evc.sync()
308
        except ValidationError as exception:
309
            raise HTTPException(400, detail=str(exception)) from exception
310
311
        # store circuit in dictionary
312 1
        self.circuits[evc.id] = evc
313
314
        # Schedule the circuit deploy
315 1
        self.sched.add(evc)
316
317
        # Circuit has no schedule, deploy now
318 1
        deployed = False
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                deployed = evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id, "deployed": deployed}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392 1
        redeployed = False
393 1
        if evc.is_active():
394
            if enable is False:  # disable if active
395
                with evc.lock:
396
                    evc.remove()
397
            elif redeploy is not None:  # redeploy if active
398
                with evc.lock:
399
                    evc.remove()
400
                    redeployed = evc.deploy()
401
        else:
402 1
            if enable is True:  # enable if inactive
403 1
                with evc.lock:
404 1
                    redeployed = evc.deploy()
405 1
            elif evc.is_enabled() and redeploy:
406 1
                with evc.lock:
407 1
                    evc.remove()
408 1
                    redeployed = evc.deploy()
409 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
410 1
        status = 200
411
412 1
        log.debug("update result %s %s", result, status)
413 1
        emit_event(self.controller, "updated",
414
                   content=map_evc_event_content(evc, **data))
415 1
        return JSONResponse(result, status_code=status)
416
417 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
418 1
    def delete_circuit(self, request: Request) -> JSONResponse:
419
        """Remove a circuit.
420
421
        First, the flows are removed from the switches, and then the EVC is
422
        disabled.
423
        """
424 1
        circuit_id = request.path_params["circuit_id"]
425 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
426 1
        try:
427 1
            evc = self.circuits.pop(circuit_id)
428 1
        except KeyError:
429 1
            result = f"circuit_id {circuit_id} not found"
430 1
            log.debug("delete_circuit result %s %s", result, 404)
431 1
            raise HTTPException(404, detail=result) from KeyError
432 1
        log.info("Removing %s", evc)
433
434 1
        with evc.lock:
435 1
            if not evc.archived:
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.remove_current_flows(sync=False)
440 1
                evc.remove_failover_flows(sync=False)
441 1
                evc.archive()
442 1
                evc.remove_uni_tags()
443 1
                evc.sync()
444 1
                emit_event(
445
                    self.controller, "deleted",
446
                    content=map_evc_event_content(evc)
447
                )
448
449 1
        log.info("EVC removed. %s", evc)
450 1
        result = {"response": f"Circuit {circuit_id} removed"}
451 1
        status = 200
452 1
        log.debug("delete_circuit result %s %s", result, status)
453
454 1
        return JSONResponse(result, status_code=status)
455
456 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
457 1
    def get_metadata(self, request: Request) -> JSONResponse:
458
        """Get metadata from an EVC."""
459 1
        circuit_id = request.path_params["circuit_id"]
460 1
        try:
461 1
            return (
462
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
463
            )
464
        except KeyError as error:
465
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
471 1
    @validate_openapi(spec)
472 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
473
        """Add metadata to a bulk of EVCs."""
474 1
        data = get_json_or_400(request, self.controller.loop)
475 1
        circuit_ids = data.pop("circuit_ids")
476
477 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
478
479 1
        fail_evcs = []
480 1
        for _id in circuit_ids:
481 1
            try:
482 1
                evc = self.circuits[_id]
483 1
                evc.extend_metadata(data)
484 1
            except KeyError:
485 1
                fail_evcs.append(_id)
486
487 1
        if fail_evcs:
488 1
            raise HTTPException(404, detail=fail_evcs)
489 1
        return JSONResponse("Operation successful", status_code=201)
490
491 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
492 1
    @validate_openapi(spec)
493 1
    def add_metadata(self, request: Request) -> JSONResponse:
494
        """Add metadata to an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        metadata = get_json_or_400(request, self.controller.loop)
497 1
        if not isinstance(metadata, dict):
498
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
499 1
        try:
500 1
            evc = self.circuits[circuit_id]
501 1
        except KeyError as error:
502 1
            raise HTTPException(
503
                404,
504
                detail=f"circuit_id {circuit_id} not found."
505
            ) from error
506
507 1
        evc.extend_metadata(metadata)
508 1
        evc.sync()
509 1
        return JSONResponse("Operation successful", status_code=201)
510
511 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
512 1
    @validate_openapi(spec)
513 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
514
        """Delete metada from a bulk of EVCs"""
515 1
        data = get_json_or_400(request, self.controller.loop)
516 1
        key = request.path_params["key"]
517 1
        circuit_ids = data.pop("circuit_ids")
518 1
        self.mongo_controller.update_evcs_metadata(
519
            circuit_ids, {key: ""}, "del"
520
        )
521
522 1
        fail_evcs = []
523 1
        for _id in circuit_ids:
524 1
            try:
525 1
                evc = self.circuits[_id]
526 1
                evc.remove_metadata(key)
527 1
            except KeyError:
528 1
                fail_evcs.append(_id)
529
530 1
        if fail_evcs:
531 1
            raise HTTPException(404, detail=fail_evcs)
532 1
        return JSONResponse("Operation successful")
533
534 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
535 1
    def delete_metadata(self, request: Request) -> JSONResponse:
536
        """Delete metadata from an EVC."""
537 1
        circuit_id = request.path_params["circuit_id"]
538 1
        key = request.path_params["key"]
539 1
        try:
540 1
            evc = self.circuits[circuit_id]
541 1
        except KeyError as error:
542 1
            raise HTTPException(
543
                404,
544
                detail=f"circuit_id {circuit_id} not found."
545
            ) from error
546
547 1
        evc.remove_metadata(key)
548 1
        evc.sync()
549 1
        return JSONResponse("Operation successful")
550
551 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
552 1
    def redeploy(self, request: Request) -> JSONResponse:
553
        """Endpoint to force the redeployment of an EVC."""
554 1
        circuit_id = request.path_params["circuit_id"]
555 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
556 1
        try:
557 1
            evc = self.circuits[circuit_id]
558 1
        except KeyError:
559 1
            raise HTTPException(
560
                404,
561
                detail=f"circuit_id {circuit_id} not found"
562
            ) from KeyError
563 1
        deployed = False
564 1
        if evc.is_enabled():
565 1
            with evc.lock:
566 1
                evc.remove_current_flows(sync=False)
567 1
                evc.remove_failover_flows(sync=True)
568 1
                deployed = evc.deploy()
569 1
        if deployed:
570 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
571 1
            status = 202
572
        else:
573 1
            result = {
574
                "response": f"Circuit {circuit_id} is disabled."
575
            }
576 1
            status = 409
577
578 1
        return JSONResponse(result, status_code=status)
579
580 1
    @rest("/v2/evc/schedule/", methods=["POST"])
581 1
    @validate_openapi(spec)
582 1
    def create_schedule(self, request: Request) -> JSONResponse:
583
        """
584
        Create a new schedule for a given circuit.
585
586
        This service do no check if there are conflicts with another schedule.
587
        Payload example:
588
            {
589
              "circuit_id":"aa:bb:cc",
590
              "schedule": {
591
                "date": "2019-08-07T14:52:10.967Z",
592
                "interval": "string",
593
                "frequency": "1 * * * *",
594
                "action": "create"
595
              }
596
            }
597
        """
598 1
        log.debug("create_schedule /v2/evc/schedule/")
599 1
        data = get_json_or_400(request, self.controller.loop)
600 1
        circuit_id = data["circuit_id"]
601 1
        schedule_data = data["schedule"]
602
603
        # Get EVC from circuits buffer
604 1
        circuits = self._get_circuits_buffer()
605
606
        # get the circuit
607 1
        evc = circuits.get(circuit_id)
608
609
        # get the circuit
610 1
        if not evc:
611 1
            result = f"circuit_id {circuit_id} not found"
612 1
            log.debug("create_schedule result %s %s", result, 404)
613 1
            raise HTTPException(404, detail=result)
614
615
        # new schedule from dict
616 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
617
618
        # If there is no schedule, create the list
619 1
        if not evc.circuit_scheduler:
620 1
            evc.circuit_scheduler = []
621
622
        # Add the new schedule
623 1
        evc.circuit_scheduler.append(new_schedule)
624
625
        # Add schedule job
626 1
        self.sched.add_circuit_job(evc, new_schedule)
627
628
        # save circuit to mongodb
629 1
        evc.sync()
630
631 1
        result = new_schedule.as_dict()
632 1
        status = 201
633
634 1
        log.debug("create_schedule result %s %s", result, status)
635 1
        return JSONResponse(result, status_code=status)
636
637 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
638 1
    @validate_openapi(spec)
639 1
    def update_schedule(self, request: Request) -> JSONResponse:
640
        """Update a schedule.
641
642
        Change all attributes from the given schedule from a EVC circuit.
643
        The schedule ID is preserved as default.
644
        Payload example:
645
            {
646
              "date": "2019-08-07T14:52:10.967Z",
647
              "interval": "string",
648
              "frequency": "1 * * *",
649
              "action": "create"
650
            }
651
        """
652 1
        data = get_json_or_400(request, self.controller.loop)
653 1
        schedule_id = request.path_params["schedule_id"]
654 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
655
656
        # Try to find a circuit schedule
657 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
658
659
        # Can not modify circuits deleted and archived
660 1
        if not found_schedule:
661 1
            result = f"schedule_id {schedule_id} not found"
662 1
            log.debug("update_schedule result %s %s", result, 404)
663 1
            raise HTTPException(404, detail=result)
664
665 1
        new_schedule = CircuitSchedule.from_dict(data)
666 1
        new_schedule.id = found_schedule.id
667
        # Remove the old schedule
668 1
        evc.circuit_scheduler.remove(found_schedule)
669
        # Append the modified schedule
670 1
        evc.circuit_scheduler.append(new_schedule)
671
672
        # Cancel all schedule jobs
673 1
        self.sched.cancel_job(found_schedule.id)
674
        # Add the new circuit schedule
675 1
        self.sched.add_circuit_job(evc, new_schedule)
676
        # Save EVC to mongodb
677 1
        evc.sync()
678
679 1
        result = new_schedule.as_dict()
680 1
        status = 200
681
682 1
        log.debug("update_schedule result %s %s", result, status)
683 1
        return JSONResponse(result, status_code=status)
684
685 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
686 1
    def delete_schedule(self, request: Request) -> JSONResponse:
687
        """Remove a circuit schedule.
688
689
        Remove the Schedule from EVC.
690
        Remove the Schedule from cron job.
691
        Save the EVC to the Storehouse.
692
        """
693 1
        schedule_id = request.path_params["schedule_id"]
694 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
695 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
696
697
        # Can not modify circuits deleted and archived
698 1
        if not found_schedule:
699 1
            result = f"schedule_id {schedule_id} not found"
700 1
            log.debug("delete_schedule result %s %s", result, 404)
701 1
            raise HTTPException(404, detail=result)
702
703
        # Remove the old schedule
704 1
        evc.circuit_scheduler.remove(found_schedule)
705
706
        # Cancel all schedule jobs
707 1
        self.sched.cancel_job(found_schedule.id)
708
        # Save EVC to mongodb
709 1
        evc.sync()
710
711 1
        result = "Schedule removed"
712 1
        status = 200
713
714 1
        log.debug("delete_schedule result %s %s", result, status)
715 1
        return JSONResponse(result, status_code=status)
716
717 1
    def _check_no_tag_duplication(
718
        self,
719
        evc_id: str,
720
        uni_a: Optional[UNI] = None,
721
        uni_z: Optional[UNI] = None
722
    ):
723
        """Check if the given EVC has UNIs with no tag and if these are
724
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
725
        Args:
726
            evc (dict): EVC to be analyzed.
727
        """
728
729
        # No UNIs
730 1
        if not (uni_a or uni_z):
731 1
            return
732
733 1
        if (not (uni_a and not uni_a.user_tag) and
734
                not (uni_z and not uni_z.user_tag)):
735 1
            return
736 1
        for circuit in self.circuits.copy().values():
737 1
            if (not circuit.archived and circuit._id != evc_id):
738 1
                if uni_a and uni_a.user_tag is None:
739 1
                    circuit.check_no_tag_duplicate(uni_a)
740 1
                if uni_z and uni_z.user_tag is None:
741 1
                    circuit.check_no_tag_duplicate(uni_z)
742
743 1
    @listen_to("kytos/topology.link_up")
744 1
    def on_link_up(self, event):
745
        """Change circuit when link is up or end_maintenance."""
746
        self.handle_link_up(event)
747
748 1
    def handle_link_up(self, event):
749
        """Change circuit when link is up or end_maintenance."""
750 1
        log.info("Event handle_link_up %s", event.content["link"])
751 1
        for evc in self.get_evcs_by_svc_level():
752 1
            if evc.is_enabled() and not evc.archived:
753 1
                with evc.lock:
754 1
                    evc.handle_link_up(event.content["link"])
755
756
    # Possibly replace this with interruptions?
757 1
    @listen_to(
758
        '.*.switch.interface.(link_up|link_down|created|deleted)'
759
    )
760 1
    def on_interface_link_change(self, event: KytosEvent):
761
        """
762
        Handler for interface link_up and link_down events.
763
        """
764
        self.handle_on_interface_link_change(event)
765
766 1
    def handle_on_interface_link_change(self, event: KytosEvent):
767
        """
768
        Handler to sort interface events {link_(up, down), create, deleted}
769
770
        To avoid multiple database updated (link flap):
771
        Every interface is identfied and processed in parallel.
772
        Once an interface event is received a time is started.
773
        While time is running self._intf_events will be updated.
774
        After time has passed last received event will be processed.
775
        """
776 1
        iface = event.content.get("interface")
777 1
        with self._lock_interfaces[iface.id]:
778 1
            _now = event.timestamp
779
            # Return out of order events
780 1
            if (
781
                iface.id in self._intf_events
782
                and self._intf_events[iface.id]["event"].timestamp > _now
783
            ):
784 1
                return
785 1
            self._intf_events[iface.id].update({"event": event})
786 1
            if "last_acquired" in self._intf_events[iface.id]:
787 1
                return
788 1
            self._intf_events[iface.id].update({"last_acquired": now()})
789 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
790 1
        with self._lock_interfaces[iface.id]:
791 1
            event = self._intf_events[iface.id]["event"]
792 1
            self._intf_events[iface.id].pop('last_acquired', None)
793 1
            _, _, event_type = event.name.rpartition('.')
794 1
            if event_type in ('link_up', 'created'):
795 1
                self.handle_interface_link_up(iface)
796 1
            elif event_type in ('link_down', 'deleted'):
797 1
                self.handle_interface_link_down(iface)
798
799 1
    def handle_interface_link_up(self, interface):
800
        """
801
        Handler for interface link_up events
802
        """
803
        log.info("Event handle_interface_link_up %s", interface)
804
        for evc in self.get_evcs_by_svc_level():
805
            with evc.lock:
806
                evc.handle_interface_link_up(
807
                    interface
808
                )
809
810 1
    def handle_interface_link_down(self, interface):
811
        """
812
        Handler for interface link_down events
813
        """
814
        log.info("Event handle_interface_link_down %s", interface)
815
        for evc in self.get_evcs_by_svc_level():
816
            with evc.lock:
817
                evc.handle_interface_link_down(
818
                    interface
819
                )
820
821 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
822 1
    def on_link_down(self, event):
823
        """Change circuit when link is down or under_mantenance."""
824
        self.handle_link_down(event)
825
826
    # pylint: disable=too-many-branches
827
    # pylint: disable=too-many-locals
828 1
    def handle_link_down(self, event):
829
        """Change circuit when link is down or under_mantenance."""
830 1
        link = event.content["link"]
831 1
        log.info("Event handle_link_down %s", link)
832 1
        switch_flows = {}
833 1
        evcs_with_failover = []
834 1
        evcs_normal = []
835 1
        check_failover = []
836 1
        failover_event_contents = {}
837
838 1
        for evc in self.get_evcs_by_svc_level():
839 1
            with evc.lock:
840 1
                if evc.is_affected_by_link(link):
841 1
                    evc.affected_by_link_at = event.timestamp
842
                    # if there is no failover path, handles link down the
843
                    # tradditional way
844 1
                    if (
845
                        not evc.failover_path or
846
                        evc.is_failover_path_affected_by_link(link)
847
                    ):
848 1
                        evcs_normal.append(evc)
849 1
                        continue
850 1
                    try:
851 1
                        dpid_flows = evc.get_failover_flows()
852 1
                        evc.old_path = evc.current_path
853 1
                        evc.current_path = evc.failover_path
854 1
                        evc.failover_path = Path([])
855
                    # pylint: disable=broad-except
856 1
                    except Exception:
857 1
                        err = traceback.format_exc().replace("\n", ", ")
858 1
                        log.error(
859
                            "Ignore Failover path for "
860
                            f"{evc} due to error: {err}"
861
                        )
862 1
                        evcs_normal.append(evc)
863 1
                        continue
864 1
                    for dpid, flows in dpid_flows.items():
865 1
                        switch_flows.setdefault(dpid, [])
866 1
                        switch_flows[dpid].extend(flows)
867 1
                    evcs_with_failover.append(evc)
868 1
                    failover_event_contents[evc.id] = map_evc_event_content(
869
                        evc,
870
                        flows={k: v.copy() for k, v in switch_flows.items()}
871
                    )
872 1
                elif evc.is_failover_path_affected_by_link(link):
873 1
                    evc.old_path = evc.failover_path
874 1
                    evc.failover_path = Path([])
875 1
                    check_failover.append(evc)
876
877 1
        if failover_event_contents:
878 1
            emit_event(self.controller, "failover_link_down",
879
                       content=deepcopy(failover_event_contents))
880 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
881
882 1
        for evc in evcs_normal:
883 1
            emit_event(
884
                self.controller,
885
                "evc_affected_by_link_down",
886
                content={"link": link} | map_evc_event_content(evc)
887
            )
888
889 1
        evcs_to_update = []
890 1
        for evc in evcs_with_failover:
891 1
            evcs_to_update.append(evc.as_dict())
892 1
            log.info(
893
                f"{evc} redeployed with failover due to link down {link.id}"
894
            )
895 1
        for evc in check_failover:
896 1
            evcs_to_update.append(evc.as_dict())
897
898 1
        self.mongo_controller.update_evcs(evcs_to_update)
899
900 1
        emit_event(
901
            self.controller,
902
            "cleanup_evcs_old_path",
903
            content={"evcs": evcs_with_failover + check_failover}
904
        )
905
906 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
907 1
    def on_evc_affected_by_link_down(self, event):
908
        """Change circuit when link is down or under_mantenance."""
909
        self.handle_evc_affected_by_link_down(event)
910
911 1
    def handle_evc_affected_by_link_down(self, event):
912
        """Change circuit when link is down or under_mantenance."""
913 1
        evc = self.circuits.get(event.content["evc_id"])
914 1
        link = event.content['link']
915 1
        if not evc:
916 1
            return
917 1
        with evc.lock:
918 1
            if not evc.is_affected_by_link(link):
919
                return
920 1
            result = evc.handle_link_down()
921 1
        event_name = "error_redeploy_link_down"
922 1
        if result:
923 1
            log.info(f"{evc} redeployed due to link down {link.id}")
924 1
            event_name = "redeployed_link_down"
925 1
        emit_event(self.controller, event_name,
926
                   content=map_evc_event_content(evc))
927
928 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
929 1
    def on_evc_deployed(self, event):
930
        """Handle EVC deployed|redeployed_link_down."""
931
        self.handle_evc_deployed(event)
932
933 1
    def handle_evc_deployed(self, event):
934
        """Setup failover path on evc deployed."""
935
        evc = self.circuits.get(event.content["evc_id"])
936
        if not evc:
937
            return
938
        evc.try_setup_failover_path()
939
940 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
941 1
    def on_cleanup_evcs_old_path(self, event):
942
        """Handle cleanup evcs old path."""
943
        self.handle_cleanup_evcs_old_path(event)
944
945 1
    def handle_cleanup_evcs_old_path(self, event):
946
        """Handle cleanup evcs old path."""
947 1
        evcs = event.content.get("evcs", [])
948 1
        event_contents: dict[str, dict] = defaultdict(list)
949 1
        total_flows = {}
950 1
        for evc in evcs:
951 1
            if not evc.old_path:
952 1
                continue
953 1
            with evc.lock:
954 1
                removed_flows = {}
955 1
                try:
956 1
                    nni_flows = prepare_delete_flow(
957
                        evc._prepare_nni_flows(evc.old_path)
958
                    )
959 1
                    uni_flows = prepare_delete_flow(
960
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
961
                    )
962 1
                    removed_flows = merge_flow_dicts(
963
                        nni_flows, uni_flows
964
                    )
965
                # pylint: disable=broad-except
966
                except Exception:
967
                    err = traceback.format_exc().replace("\n", ", ")
968
                    log.error(f"Fail to remove {evc} old_path: {err}")
969
                    continue
970 1
            if removed_flows:
971 1
                total_flows = merge_flow_dicts(removed_flows, total_flows)
972 1
                content = map_evc_event_content(
973
                    evc,
974
                    removed_flows=deepcopy(removed_flows),
975
                    current_path=evc.current_path.as_dict(),
976
                )
977 1
                event_contents[evc.id] = content
978 1
        if event_contents:
979 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
980 1
            emit_event(self.controller, "failover_old_path",
981
                       content=event_contents)
982
983 1
    @listen_to("kytos/topology.topology_loaded")
984 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
985
        """Load EVCs once the topology is available."""
986
        self.load_all_evcs()
987
988 1
    def load_all_evcs(self):
989
        """Try to load all EVCs on startup."""
990 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
991 1
        for circuit_id, circuit in circuits:
992 1
            if circuit_id not in self.circuits:
993 1
                self._load_evc(circuit)
994 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
995
                   timeout=1)
996
997 1
    def _load_evc(self, circuit_dict):
998
        """Load one EVC from mongodb to memory."""
999 1
        try:
1000 1
            evc = self._evc_from_dict(circuit_dict)
1001 1
        except (ValueError, KytosTagError) as exception:
1002 1
            log.error(
1003
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1004
            )
1005 1
            return None
1006 1
        if evc.archived:
1007 1
            return None
1008
1009 1
        self.circuits.setdefault(evc.id, evc)
1010 1
        self.sched.add(evc)
1011 1
        return evc
1012
1013 1
    @listen_to("kytos/flow_manager.flow.error")
1014 1
    def on_flow_mod_error(self, event):
1015
        """Handle flow mod errors related to an EVC."""
1016
        self.handle_flow_mod_error(event)
1017
1018 1
    def handle_flow_mod_error(self, event):
1019
        """Handle flow mod errors related to an EVC."""
1020 1
        flow = event.content["flow"]
1021 1
        command = event.content.get("error_command")
1022 1
        if command != "add":
1023
            return
1024 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1025 1
        if evc:
1026 1
            with evc.lock:
1027 1
                evc.remove_current_flows(sync=False)
1028 1
                evc.remove_failover_flows(sync=True)
1029
1030 1
    def _evc_dict_with_instances(self, evc_dict):
1031
        """Convert some dict values to instance of EVC classes.
1032
1033
        This method will convert: [UNI, Link]
1034
        """
1035 1
        data = evc_dict.copy()  # Do not modify the original dict
1036 1
        for attribute, value in data.items():
1037
            # Get multiple attributes.
1038
            # Ex: uni_a, uni_z
1039 1
            if "uni" in attribute:
1040 1
                try:
1041 1
                    data[attribute] = self._uni_from_dict(value)
1042 1
                except ValueError as exception:
1043 1
                    result = "Error creating UNI: Invalid value"
1044 1
                    raise ValueError(result) from exception
1045
1046 1
            if attribute == "circuit_scheduler":
1047 1
                data[attribute] = []
1048 1
                for schedule in value:
1049 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1050
1051
            # Get multiple attributes.
1052
            # Ex: primary_links,
1053
            #     backup_links,
1054
            #     current_links_cache,
1055
            #     primary_links_cache,
1056
            #     backup_links_cache
1057 1
            if "links" in attribute:
1058 1
                data[attribute] = [
1059
                    self._link_from_dict(link) for link in value
1060
                ]
1061
1062
            # Ex: current_path,
1063
            #     primary_path,
1064
            #     backup_path
1065 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1066 1
                data[attribute] = Path(
1067
                    [self._link_from_dict(link) for link in value]
1068
                )
1069
1070 1
        return data
1071
1072 1
    def _evc_from_dict(self, evc_dict):
1073 1
        data = self._evc_dict_with_instances(evc_dict)
1074 1
        data["table_group"] = self.table_group
1075 1
        return EVC(self.controller, **data)
1076
1077 1
    def _uni_from_dict(self, uni_dict):
1078
        """Return a UNI object from python dict."""
1079 1
        if uni_dict is None:
1080 1
            return False
1081
1082 1
        interface_id = uni_dict.get("interface_id")
1083 1
        interface = self.controller.get_interface_by_id(interface_id)
1084 1
        if interface is None:
1085 1
            result = (
1086
                "Error creating UNI:"
1087
                + f"Could not instantiate interface {interface_id}"
1088
            )
1089 1
            raise ValueError(result) from ValueError
1090 1
        tag_convert = {1: "vlan"}
1091 1
        tag_dict = uni_dict.get("tag", None)
1092 1
        if tag_dict:
1093 1
            tag_type = tag_dict.get("tag_type")
1094 1
            tag_type = tag_convert.get(tag_type, tag_type)
1095 1
            tag_value = tag_dict.get("value")
1096 1
            if isinstance(tag_value, list):
1097 1
                tag_value = get_tag_ranges(tag_value)
1098 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1099 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1100
            else:
1101 1
                tag = TAG(tag_type, tag_value)
1102
        else:
1103 1
            tag = None
1104 1
        uni = UNI(interface, tag)
1105 1
        return uni
1106
1107 1
    def _link_from_dict(self, link_dict):
1108
        """Return a Link object from python dict."""
1109 1
        id_a = link_dict.get("endpoint_a").get("id")
1110 1
        id_b = link_dict.get("endpoint_b").get("id")
1111
1112 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1113 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1114 1
        if not endpoint_a:
1115 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1116 1
            raise ValueError(error_msg)
1117 1
        if not endpoint_b:
1118
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1119
            raise ValueError(error_msg)
1120
1121 1
        link = Link(endpoint_a, endpoint_b)
1122 1
        if "metadata" in link_dict:
1123 1
            link.extend_metadata(link_dict.get("metadata"))
1124
1125 1
        s_vlan = link.get_metadata("s_vlan")
1126 1
        if s_vlan:
1127 1
            tag = TAG.from_dict(s_vlan)
1128 1
            if tag is False:
1129
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1130
                raise ValueError(error_msg)
1131 1
            link.update_metadata("s_vlan", tag)
1132 1
        return link
1133
1134 1
    def _find_evc_by_schedule_id(self, schedule_id):
1135
        """
1136
        Find an EVC and CircuitSchedule based on schedule_id.
1137
1138
        :param schedule_id: Schedule ID
1139
        :return: EVC and Schedule
1140
        """
1141 1
        circuits = self._get_circuits_buffer()
1142 1
        found_schedule = None
1143 1
        evc = None
1144
1145
        # pylint: disable=unused-variable
1146 1
        for c_id, circuit in circuits.items():
1147 1
            for schedule in circuit.circuit_scheduler:
1148 1
                if schedule.id == schedule_id:
1149 1
                    found_schedule = schedule
1150 1
                    evc = circuit
1151 1
                    break
1152 1
            if found_schedule:
1153 1
                break
1154 1
        return evc, found_schedule
1155
1156 1
    def _get_circuits_buffer(self):
1157
        """
1158
        Return the circuit buffer.
1159
1160
        If the buffer is empty, try to load data from mongodb.
1161
        """
1162 1
        if not self.circuits:
1163
            # Load circuits from mongodb to buffer
1164 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1165 1
            for c_id, circuit in circuits.items():
1166 1
                evc = self._evc_from_dict(circuit)
1167 1
                self.circuits[c_id] = evc
1168 1
        return self.circuits
1169
1170
    # pylint: disable=attribute-defined-outside-init
1171 1
    @alisten_to("kytos/of_multi_table.enable_table")
1172 1
    async def on_table_enabled(self, event):
1173
        """Handle a recently table enabled."""
1174 1
        table_group = event.content.get("mef_eline", None)
1175 1
        if not table_group:
1176 1
            return
1177 1
        for group in table_group:
1178 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1179 1
                log.error(f'The table group "{group}" is not allowed for '
1180
                          f'mef_eline. Allowed table groups are '
1181
                          f'{settings.TABLE_GROUP_ALLOWED}')
1182 1
                return
1183 1
        self.table_group.update(table_group)
1184 1
        content = {"group_table": self.table_group}
1185 1
        name = "kytos/mef_eline.enable_table"
1186
        await aemit_event(self.controller, name, content)
1187