Passed
Pull Request — master (#491)
by Aldo
04:10
created

build.main.Main._link_from_dict()   B

Complexity

Conditions 6

Size

Total Lines 26
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 6.2163

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 26
rs 8.4186
c 0
b 0
f 0
ccs 18
cts 22
cp 0.8182
cc 6
nop 2
crap 6.2163
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI,
28
                                              EVCPathNotDeleted, InvalidPath)
29 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
30
                                          Path)
31 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
32 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
33
                                         emit_event, get_vlan_tags_and_masks,
34
                                         map_evc_event_content,
35
                                         merge_flow_dicts,
36
                                         send_flow_mods_event)
37
38
39
# pylint: disable=too-many-public-methods
40 1
class Main(KytosNApp):
41
    """Main class of amlight/mef_eline NApp.
42
43
    This class is the entry point for this napp.
44
    """
45
46 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
47
48 1
    def setup(self):
49
        """Replace the '__init__' method for the KytosNApp subclass.
50
51
        The setup method is automatically called by the controller when your
52
        application is loaded.
53
54
        So, if you have any setup routine, insert it here.
55
        """
56
        # object used to scheduler circuit events
57 1
        self.sched = Scheduler()
58
59
        # object to save and load circuits
60 1
        self.mongo_controller = self.get_eline_controller()
61 1
        self.mongo_controller.bootstrap_indexes()
62
63
        # set the controller that will manager the dynamic paths
64 1
        DynamicPathManager.set_controller(self.controller)
65
66
        # dictionary of EVCs created. It acts as a circuit buffer.
67
        # Every create/update/delete must be synced to mongodb.
68 1
        self.circuits = {}
69
70 1
        self._intf_events = defaultdict(dict)
71 1
        self._lock_interfaces = defaultdict(Lock)
72 1
        self.table_group = {"epl": 0, "evpl": 0}
73 1
        self._lock = Lock()
74 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
75
76 1
        self.load_all_evcs()
77 1
        self._topology_updated_at = None
78
79 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
80
        """Get circuits sorted by desc service level and asc creation_time.
81
82
        In the future, as more ops are offloaded it should be get from the DB.
83
        """
84 1
        if enable_filter:
85 1
            return sorted(
86
                          [circuit for circuit in self.circuits.values()
87
                           if circuit.is_enabled()],
88
                          key=lambda x: (-x.service_level, x.creation_time),
89
            )
90 1
        return sorted(self.circuits.values(),
91
                      key=lambda x: (-x.service_level, x.creation_time))
92
93 1
    @staticmethod
94 1
    def get_eline_controller():
95
        """Return the ELineController instance."""
96
        return controllers.ELineController()
97
98 1
    def execute(self):
99
        """Execute once when the napp is running."""
100 1
        if self._lock.locked():
101 1
            return
102 1
        log.debug("Starting consistency routine")
103 1
        with self._lock:
104 1
            self.execute_consistency()
105 1
        log.debug("Finished consistency routine")
106
107 1
    def should_be_checked(self, circuit):
108
        "Verify if the circuit meets the necessary conditions to be checked"
109
        # pylint: disable=too-many-boolean-expressions
110 1
        if (
111
                circuit.is_enabled()
112
                and not circuit.is_active()
113
                and not circuit.lock.locked()
114
                and not circuit.has_recent_removed_flow()
115
                and not circuit.is_recent_updated()
116
                and circuit.are_unis_active(self.controller.switches)
117
                # if a inter-switch EVC does not have current_path, it does not
118
                # make sense to run sdntrace on it
119
                and (circuit.is_intra_switch() or circuit.current_path)
120
                ):
121 1
            return True
122
        return False
123
124 1
    def execute_consistency(self):
125
        """Execute consistency routine."""
126 1
        circuits_to_check = []
127 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
128 1
            if self.should_be_checked(circuit):
129 1
                circuits_to_check.append(circuit)
130 1
            circuit.try_setup_failover_path()
131 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
132 1
        for circuit in circuits_to_check:
133 1
            is_checked = circuits_checked.get(circuit.id)
134 1
            if is_checked:
135 1
                circuit.execution_rounds = 0
136 1
                log.info(f"{circuit} enabled but inactive - activating")
137 1
                with circuit.lock:
138 1
                    circuit.activate()
139 1
                    circuit.sync()
140
            else:
141 1
                circuit.execution_rounds += 1
142 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
143 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
144 1
                    with circuit.lock:
145 1
                        circuit.deploy()
146
147 1
    def shutdown(self):
148
        """Execute when your napp is unloaded.
149
150
        If you have some cleanup procedure, insert it here.
151
        """
152
153 1
    @rest("/v2/evc/", methods=["GET"])
154 1
    def list_circuits(self, request: Request) -> JSONResponse:
155
        """Endpoint to return circuits stored.
156
157
        archive query arg if defined (not null) will be filtered
158
        accordingly, by default only non archived evcs will be listed
159
        """
160 1
        log.debug("list_circuits /v2/evc")
161 1
        args = request.query_params
162 1
        archived = args.get("archived", "false").lower()
163 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
164 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
165
                                                      metadata=args)
166 1
        circuits = circuits['circuits']
167 1
        return JSONResponse(circuits)
168
169 1
    @rest("/v2/evc/schedule", methods=["GET"])
170 1
    def list_schedules(self, _request: Request) -> JSONResponse:
171
        """Endpoint to return all schedules stored for all circuits.
172
173
        Return a JSON with the following template:
174
        [{"schedule_id": <schedule_id>,
175
         "circuit_id": <circuit_id>,
176
         "schedule": <schedule object>}]
177
        """
178 1
        log.debug("list_schedules /v2/evc/schedule")
179 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
180 1
        if not circuits:
181 1
            result = {}
182 1
            status = 200
183 1
            return JSONResponse(result, status_code=status)
184
185 1
        result = []
186 1
        status = 200
187 1
        for circuit in circuits:
188 1
            circuit_scheduler = circuit.get("circuit_scheduler")
189 1
            if circuit_scheduler:
190 1
                for scheduler in circuit_scheduler:
191 1
                    value = {
192
                        "schedule_id": scheduler.get("id"),
193
                        "circuit_id": circuit.get("id"),
194
                        "schedule": scheduler,
195
                    }
196 1
                    result.append(value)
197
198 1
        log.debug("list_schedules result %s %s", result, status)
199 1
        return JSONResponse(result, status_code=status)
200
201 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
202 1
    def get_circuit(self, request: Request) -> JSONResponse:
203
        """Endpoint to return a circuit based on id."""
204 1
        circuit_id = request.path_params["circuit_id"]
205 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
206 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
207 1
        if not circuit:
208 1
            result = f"circuit_id {circuit_id} not found"
209 1
            log.debug("get_circuit result %s %s", result, 404)
210 1
            raise HTTPException(404, detail=result)
211 1
        status = 200
212 1
        log.debug("get_circuit result %s %s", circuit, status)
213 1
        return JSONResponse(circuit, status_code=status)
214
215
    # pylint: disable=too-many-branches, too-many-statements
216 1
    @rest("/v2/evc/", methods=["POST"])
217 1
    @validate_openapi(spec)
218 1
    def create_circuit(self, request: Request) -> JSONResponse:
219
        """Try to create a new circuit.
220
221
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
222
        UNI_Z's requested C-VID are available from the interfaces' pools. This
223
        is checked when creating the UNI object.
224
225
        Then, E-Line NApp requests a primary and a backup path to the
226
        Pathfinder NApp using the attributes primary_links and backup_links
227
        submitted via REST
228
229
        # For each link composing paths in #3:
230
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
231
        #  - Using the S-VID obtained, generate abstract flow entries to be
232
        #    sent to FlowManager
233
234
        Push abstract flow entries to FlowManager and FlowManager pushes
235
        OpenFlow entries to datapaths
236
237
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
238
        creation
239
240
        Finnaly, notify user of the status of its request.
241
        """
242
        # Try to create the circuit object
243 1
        log.debug("create_circuit /v2/evc/")
244 1
        data = get_json_or_400(request, self.controller.loop)
245
246 1
        try:
247 1
            evc = self._evc_from_dict(data)
248 1
        except (ValueError, KytosTagError) as exception:
249 1
            log.debug("create_circuit result %s %s", exception, 400)
250 1
            raise HTTPException(400, detail=str(exception)) from exception
251 1
        try:
252 1
            check_disabled_component(evc.uni_a, evc.uni_z)
253 1
        except DisabledSwitch as exception:
254 1
            log.debug("create_circuit result %s %s", exception, 409)
255 1
            raise HTTPException(
256
                    409,
257
                    detail=f"Path is not valid: {exception}"
258
                ) from exception
259
260 1
        if evc.primary_path:
261 1
            try:
262 1
                evc.primary_path.is_valid(
263
                    evc.uni_a.interface.switch,
264
                    evc.uni_z.interface.switch,
265
                    bool(evc.circuit_scheduler),
266
                )
267 1
            except InvalidPath as exception:
268 1
                raise HTTPException(
269
                    400,
270
                    detail=f"primary_path is not valid: {exception}"
271
                ) from exception
272 1
        if evc.backup_path:
273 1
            try:
274 1
                evc.backup_path.is_valid(
275
                    evc.uni_a.interface.switch,
276
                    evc.uni_z.interface.switch,
277
                    bool(evc.circuit_scheduler),
278
                )
279 1
            except InvalidPath as exception:
280 1
                raise HTTPException(
281
                    400,
282
                    detail=f"backup_path is not valid: {exception}"
283
                ) from exception
284
285 1
        if not evc._tag_lists_equal():
286 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
287 1
            raise HTTPException(400, detail=detail)
288
289 1
        try:
290 1
            evc._validate_has_primary_or_dynamic()
291 1
        except ValueError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        try:
295 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
296
        except DuplicatedNoTagUNI as exception:
297
            log.debug("create_circuit result %s %s", exception, 409)
298
            raise HTTPException(409, detail=str(exception)) from exception
299
300 1
        try:
301 1
            self._use_uni_tags(evc)
302 1
        except KytosTagError as exception:
303 1
            raise HTTPException(400, detail=str(exception)) from exception
304
305
        # save circuit
306 1
        try:
307 1
            evc.sync()
308
        except ValidationError as exception:
309
            raise HTTPException(400, detail=str(exception)) from exception
310
311
        # store circuit in dictionary
312 1
        self.circuits[evc.id] = evc
313
314
        # Schedule the circuit deploy
315 1
        self.sched.add(evc)
316
317
        # Circuit has no schedule, deploy now
318 1
        deployed = False
319 1
        if not evc.circuit_scheduler:
320 1
            with evc.lock:
321 1
                deployed = evc.deploy()
322
323
        # Notify users
324 1
        result = {"circuit_id": evc.id, "deployed": deployed}
325 1
        status = 201
326 1
        log.debug("create_circuit result %s %s", result, status)
327 1
        emit_event(self.controller, name="created",
328
                   content=map_evc_event_content(evc))
329 1
        return JSONResponse(result, status_code=status)
330
331 1
    @staticmethod
332 1
    def _use_uni_tags(evc):
333 1
        uni_a = evc.uni_a
334 1
        evc._use_uni_vlan(uni_a)
335 1
        try:
336 1
            uni_z = evc.uni_z
337 1
            evc._use_uni_vlan(uni_z)
338 1
        except KytosTagError as err:
339 1
            evc.make_uni_vlan_available(uni_a)
340 1
            raise err
341
342 1
    @listen_to('kytos/flow_manager.flow.removed')
343 1
    def on_flow_delete(self, event):
344
        """Capture delete messages to keep track when flows got removed."""
345
        self.handle_flow_delete(event)
346
347 1
    def handle_flow_delete(self, event):
348
        """Keep track when the EVC got flows removed by deriving its cookie."""
349 1
        flow = event.content["flow"]
350 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
351 1
        if evc:
352 1
            log.debug("Flow removed in EVC %s", evc.id)
353 1
            evc.set_flow_removed_at()
354
355 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
356 1
    @validate_openapi(spec)
357 1
    def update(self, request: Request) -> JSONResponse:
358
        """Update a circuit based on payload.
359
360
        The EVC attributes (creation_time, active, current_path,
361
        failover_path, _id, archived) can't be updated.
362
        """
363 1
        data = get_json_or_400(request, self.controller.loop)
364 1
        circuit_id = request.path_params["circuit_id"]
365 1
        log.debug("update /v2/evc/%s", circuit_id)
366 1
        try:
367 1
            evc = self.circuits[circuit_id]
368 1
        except KeyError:
369 1
            result = f"circuit_id {circuit_id} not found"
370 1
            log.debug("update result %s %s", result, 404)
371 1
            raise HTTPException(404, detail=result) from KeyError
372
373 1
        try:
374 1
            updated_data = self._evc_dict_with_instances(data)
375 1
            self._check_no_tag_duplication(
376
                circuit_id, updated_data.get("uni_a"),
377
                updated_data.get("uni_z")
378
            )
379 1
            enable, redeploy = evc.update(**updated_data)
380 1
        except (ValueError, KytosTagError, ValidationError) as exception:
381 1
            log.debug("update result %s %s", exception, 400)
382 1
            raise HTTPException(400, detail=str(exception)) from exception
383 1
        except DuplicatedNoTagUNI as exception:
384
            log.debug("update result %s %s", exception, 409)
385
            raise HTTPException(409, detail=str(exception)) from exception
386 1
        except DisabledSwitch as exception:
387 1
            log.debug("update result %s %s", exception, 409)
388 1
            raise HTTPException(
389
                    409,
390
                    detail=f"Path is not valid: {exception}"
391
                ) from exception
392 1
        redeployed = False
393 1
        if evc.is_active():
394 1
            if enable is False:  # disable if active
395 1
                with evc.lock:
396 1
                    try:
397 1
                        evc.remove()
398 1
                    except EVCPathNotDeleted as err:
399 1
                        msg = (f"{evc} updated but error happened"
400
                               f" when deleting flows: {err}")
401 1
                        log.error(msg)
402 1
                        raise HTTPException(400, detail=str(err)) from err
403 1
            elif redeploy is not None:  # redeploy if active
404 1
                with evc.lock:
405 1
                    try:
406 1
                        evc.remove()
407 1
                    except EVCPathNotDeleted as err:
408 1
                        msg = (f"{evc} updated but error happened"
409
                               f" when deleting flows: {err}")
410 1
                        log.error(msg)
411 1
                        raise HTTPException(400, detail=str(err)) from err
412
                    redeployed = evc.deploy()
413
        else:
414 1
            if enable is True:  # enable if inactive
415 1
                with evc.lock:
416 1
                    redeployed = evc.deploy()
417 1
            elif evc.is_enabled() and redeploy:
418 1
                with evc.lock:
419 1
                    try:
420 1
                        evc.remove()
421 1
                    except EVCPathNotDeleted as err:
422 1
                        msg = (f"{evc} updated but error happened"
423
                               f" when deleting flows: {err}")
424 1
                        log.error(msg)
425 1
                        raise HTTPException(400, detail=str(msg)) from err
426 1
                    redeployed = evc.deploy()
427 1
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
428 1
        status = 200
429
430 1
        log.debug("update result %s %s", result, status)
431 1
        emit_event(self.controller, "updated",
432
                   content=map_evc_event_content(evc, **data))
433 1
        return JSONResponse(result, status_code=status)
434
435 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
436 1
    def delete_circuit(self, request: Request) -> JSONResponse:
437
        """Remove a circuit.
438
439
        First, the flows are removed from the switches, and then the EVC is
440
        disabled.
441
        """
442 1
        circuit_id = request.path_params["circuit_id"]
443 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
444 1
        try:
445 1
            evc = self.circuits[circuit_id]
446 1
        except KeyError:
447 1
            result = f"circuit_id {circuit_id} not found"
448 1
            log.debug("delete_circuit result %s %s", result, 404)
449 1
            raise HTTPException(404, detail=result) from KeyError
450
451 1
        with evc.lock:
452 1
            log.info("Removing %s", evc)
453 1
            if not evc.archived:
454 1
                evc.deactivate()
455 1
                evc.disable()
456 1
                self.sched.remove(evc)
457 1
                path_name = 'current_path'
458 1
                try:
459 1
                    evc.remove_current_flows(sync=False)
460 1
                    path_name = 'failover_path'
461 1
                    evc.remove_failover_flows(sync=False)
462 1
                except EVCPathNotDeleted as err:
463 1
                    log.error(f"{evc} did not deleted flows: {err}")
464 1
                    evc.deactivate_set_error(
465
                        path_name, f'Error while removing path: {err}'
466
                    )
467 1
                    evc.sync()
468 1
                    raise HTTPException(400, detail=str(err)) from err
469 1
                evc.archive()
470 1
                evc.remove_uni_tags()
471 1
                evc.sync()
472 1
                if circuit_id in self.circuits:
473 1
                    self.circuits.pop(circuit_id)
474 1
                    emit_event(
475
                        self.controller, "deleted",
476
                        content=map_evc_event_content(evc)
477
                    )
478
            else:
479 1
                self.circuits.pop(circuit_id, None)
480
481 1
        log.info("EVC removed. %s", evc)
482 1
        result = {"response": f"Circuit {circuit_id} removed"}
483 1
        status = 200
484 1
        log.debug("delete_circuit result %s %s", result, status)
485
486 1
        return JSONResponse(result, status_code=status)
487
488 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
489 1
    def get_metadata(self, request: Request) -> JSONResponse:
490
        """Get metadata from an EVC."""
491 1
        circuit_id = request.path_params["circuit_id"]
492 1
        try:
493 1
            return (
494
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
495
            )
496
        except KeyError as error:
497
            raise HTTPException(
498
                404,
499
                detail=f"circuit_id {circuit_id} not found."
500
            ) from error
501
502 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
503 1
    @validate_openapi(spec)
504 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
505
        """Add metadata to a bulk of EVCs."""
506 1
        data = get_json_or_400(request, self.controller.loop)
507 1
        circuit_ids = data.pop("circuit_ids")
508
509 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
510
511 1
        fail_evcs = []
512 1
        for _id in circuit_ids:
513 1
            try:
514 1
                evc = self.circuits[_id]
515 1
                evc.extend_metadata(data)
516 1
            except KeyError:
517 1
                fail_evcs.append(_id)
518
519 1
        if fail_evcs:
520 1
            raise HTTPException(404, detail=fail_evcs)
521 1
        return JSONResponse("Operation successful", status_code=201)
522
523 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
524 1
    @validate_openapi(spec)
525 1
    def add_metadata(self, request: Request) -> JSONResponse:
526
        """Add metadata to an EVC."""
527 1
        circuit_id = request.path_params["circuit_id"]
528 1
        metadata = get_json_or_400(request, self.controller.loop)
529 1
        if not isinstance(metadata, dict):
530
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
531 1
        try:
532 1
            evc = self.circuits[circuit_id]
533 1
        except KeyError as error:
534 1
            raise HTTPException(
535
                404,
536
                detail=f"circuit_id {circuit_id} not found."
537
            ) from error
538
539 1
        evc.extend_metadata(metadata)
540 1
        evc.sync()
541 1
        return JSONResponse("Operation successful", status_code=201)
542
543 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
544 1
    @validate_openapi(spec)
545 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
546
        """Delete metada from a bulk of EVCs"""
547 1
        data = get_json_or_400(request, self.controller.loop)
548 1
        key = request.path_params["key"]
549 1
        circuit_ids = data.pop("circuit_ids")
550 1
        self.mongo_controller.update_evcs_metadata(
551
            circuit_ids, {key: ""}, "del"
552
        )
553
554 1
        fail_evcs = []
555 1
        for _id in circuit_ids:
556 1
            try:
557 1
                evc = self.circuits[_id]
558 1
                evc.remove_metadata(key)
559 1
            except KeyError:
560 1
                fail_evcs.append(_id)
561
562 1
        if fail_evcs:
563 1
            raise HTTPException(404, detail=fail_evcs)
564 1
        return JSONResponse("Operation successful")
565
566 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
567 1
    def delete_metadata(self, request: Request) -> JSONResponse:
568
        """Delete metadata from an EVC."""
569 1
        circuit_id = request.path_params["circuit_id"]
570 1
        key = request.path_params["key"]
571 1
        try:
572 1
            evc = self.circuits[circuit_id]
573 1
        except KeyError as error:
574 1
            raise HTTPException(
575
                404,
576
                detail=f"circuit_id {circuit_id} not found."
577
            ) from error
578
579 1
        evc.remove_metadata(key)
580 1
        evc.sync()
581 1
        return JSONResponse("Operation successful")
582
583 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
584 1
    def redeploy(self, request: Request) -> JSONResponse:
585
        """Endpoint to force the redeployment of an EVC."""
586 1
        circuit_id = request.path_params["circuit_id"]
587 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
588 1
        try:
589 1
            evc = self.circuits[circuit_id]
590 1
        except KeyError:
591 1
            raise HTTPException(
592
                404,
593
                detail=f"circuit_id {circuit_id} not found"
594
            ) from KeyError
595 1
        deployed = False
596 1
        error_msg = ""
597 1
        if evc.is_enabled():
598 1
            with evc.lock:
599 1
                path_name = 'current_path'
600 1
                try:
601 1
                    evc.remove_current_flows(sync=False)
602 1
                    path_name = 'failover_path'
603 1
                    evc.remove_failover_flows(sync=False)
604 1
                except EVCPathNotDeleted as err:
605 1
                    error_msg = f"Error deleting: {err}"
606 1
                    log.error(f"{evc} " + error_msg)
607 1
                    evc.deactivate_set_error(
608
                        path_name, f'Error while removing path: {err}'
609
                    )
610 1
                    evc.sync()
611 1
                if not error_msg:
612 1
                    deployed = evc.deploy()
613 1
        if deployed:
614 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
615 1
            status = 202
616
        else:
617 1
            result = {
618
                "response": f"Circuit {circuit_id} is disabled." + error_msg
619
            }
620 1
            status = 409
621
622 1
        return JSONResponse(result, status_code=status)
623
624 1
    @rest("/v2/evc/schedule/", methods=["POST"])
625 1
    @validate_openapi(spec)
626 1
    def create_schedule(self, request: Request) -> JSONResponse:
627
        """
628
        Create a new schedule for a given circuit.
629
630
        This service do no check if there are conflicts with another schedule.
631
        Payload example:
632
            {
633
              "circuit_id":"aa:bb:cc",
634
              "schedule": {
635
                "date": "2019-08-07T14:52:10.967Z",
636
                "interval": "string",
637
                "frequency": "1 * * * *",
638
                "action": "create"
639
              }
640
            }
641
        """
642 1
        log.debug("create_schedule /v2/evc/schedule/")
643 1
        data = get_json_or_400(request, self.controller.loop)
644 1
        circuit_id = data["circuit_id"]
645 1
        schedule_data = data["schedule"]
646
647
        # Get EVC from circuits buffer
648 1
        circuits = self._get_circuits_buffer()
649
650
        # get the circuit
651 1
        evc = circuits.get(circuit_id)
652
653
        # get the circuit
654 1
        if not evc:
655 1
            result = f"circuit_id {circuit_id} not found"
656 1
            log.debug("create_schedule result %s %s", result, 404)
657 1
            raise HTTPException(404, detail=result)
658
659
        # new schedule from dict
660 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
661
662
        # If there is no schedule, create the list
663 1
        if not evc.circuit_scheduler:
664 1
            evc.circuit_scheduler = []
665
666
        # Add the new schedule
667 1
        evc.circuit_scheduler.append(new_schedule)
668
669
        # Add schedule job
670 1
        self.sched.add_circuit_job(evc, new_schedule)
671
672
        # save circuit to mongodb
673 1
        evc.sync()
674
675 1
        result = new_schedule.as_dict()
676 1
        status = 201
677
678 1
        log.debug("create_schedule result %s %s", result, status)
679 1
        return JSONResponse(result, status_code=status)
680
681 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
682 1
    @validate_openapi(spec)
683 1
    def update_schedule(self, request: Request) -> JSONResponse:
684
        """Update a schedule.
685
686
        Change all attributes from the given schedule from a EVC circuit.
687
        The schedule ID is preserved as default.
688
        Payload example:
689
            {
690
              "date": "2019-08-07T14:52:10.967Z",
691
              "interval": "string",
692
              "frequency": "1 * * *",
693
              "action": "create"
694
            }
695
        """
696 1
        data = get_json_or_400(request, self.controller.loop)
697 1
        schedule_id = request.path_params["schedule_id"]
698 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
699
700
        # Try to find a circuit schedule
701 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
702
703
        # Can not modify circuits deleted and archived
704 1
        if not found_schedule:
705 1
            result = f"schedule_id {schedule_id} not found"
706 1
            log.debug("update_schedule result %s %s", result, 404)
707 1
            raise HTTPException(404, detail=result)
708
709 1
        new_schedule = CircuitSchedule.from_dict(data)
710 1
        new_schedule.id = found_schedule.id
711
        # Remove the old schedule
712 1
        evc.circuit_scheduler.remove(found_schedule)
713
        # Append the modified schedule
714 1
        evc.circuit_scheduler.append(new_schedule)
715
716
        # Cancel all schedule jobs
717 1
        self.sched.cancel_job(found_schedule.id)
718
        # Add the new circuit schedule
719 1
        self.sched.add_circuit_job(evc, new_schedule)
720
        # Save EVC to mongodb
721 1
        evc.sync()
722
723 1
        result = new_schedule.as_dict()
724 1
        status = 200
725
726 1
        log.debug("update_schedule result %s %s", result, status)
727 1
        return JSONResponse(result, status_code=status)
728
729 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
730 1
    def delete_schedule(self, request: Request) -> JSONResponse:
731
        """Remove a circuit schedule.
732
733
        Remove the Schedule from EVC.
734
        Remove the Schedule from cron job.
735
        Save the EVC to the Storehouse.
736
        """
737 1
        schedule_id = request.path_params["schedule_id"]
738 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
739 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
740
741
        # Can not modify circuits deleted and archived
742 1
        if not found_schedule:
743 1
            result = f"schedule_id {schedule_id} not found"
744 1
            log.debug("delete_schedule result %s %s", result, 404)
745 1
            raise HTTPException(404, detail=result)
746
747
        # Remove the old schedule
748 1
        evc.circuit_scheduler.remove(found_schedule)
749
750
        # Cancel all schedule jobs
751 1
        self.sched.cancel_job(found_schedule.id)
752
        # Save EVC to mongodb
753 1
        evc.sync()
754
755 1
        result = "Schedule removed"
756 1
        status = 200
757
758 1
        log.debug("delete_schedule result %s %s", result, status)
759 1
        return JSONResponse(result, status_code=status)
760
761 1
    def _check_no_tag_duplication(
762
        self,
763
        evc_id: str,
764
        uni_a: Optional[UNI] = None,
765
        uni_z: Optional[UNI] = None
766
    ):
767
        """Check if the given EVC has UNIs with no tag and if these are
768
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
769
        Args:
770
            evc (dict): EVC to be analyzed.
771
        """
772
773
        # No UNIs
774 1
        if not (uni_a or uni_z):
775 1
            return
776
777 1
        if (not (uni_a and not uni_a.user_tag) and
778
                not (uni_z and not uni_z.user_tag)):
779 1
            return
780 1
        for circuit in self.circuits.copy().values():
781 1
            if (not circuit.archived and circuit._id != evc_id):
782 1
                if uni_a and uni_a.user_tag is None:
783 1
                    circuit.check_no_tag_duplicate(uni_a)
784 1
                if uni_z and uni_z.user_tag is None:
785 1
                    circuit.check_no_tag_duplicate(uni_z)
786
787 1
    @listen_to("kytos/topology.link_up")
788 1
    def on_link_up(self, event):
789
        """Change circuit when link is up or end_maintenance."""
790
        self.handle_link_up(event)
791
792 1
    def handle_link_up(self, event):
793
        """Change circuit when link is up or end_maintenance."""
794 1
        log.info("Event handle_link_up %s", event.content["link"])
795 1
        for evc in self.get_evcs_by_svc_level():
796 1
            if evc.is_enabled() and not evc.archived:
797 1
                with evc.lock:
798 1
                    evc.handle_link_up(event.content["link"])
799
800
    # Possibly replace this with interruptions?
801 1
    @listen_to(
802
        '.*.switch.interface.(link_up|link_down|created|deleted)'
803
    )
804 1
    def on_interface_link_change(self, event: KytosEvent):
805
        """
806
        Handler for interface link_up and link_down events.
807
        """
808
        self.handle_on_interface_link_change(event)
809
810 1
    def handle_on_interface_link_change(self, event: KytosEvent):
811
        """
812
        Handler to sort interface events {link_(up, down), create, deleted}
813
814
        To avoid multiple database updated (link flap):
815
        Every interface is identfied and processed in parallel.
816
        Once an interface event is received a time is started.
817
        While time is running self._intf_events will be updated.
818
        After time has passed last received event will be processed.
819
        """
820 1
        iface = event.content.get("interface")
821 1
        with self._lock_interfaces[iface.id]:
822 1
            _now = event.timestamp
823
            # Return out of order events
824 1
            if (
825
                iface.id in self._intf_events
826
                and self._intf_events[iface.id]["event"].timestamp > _now
827
            ):
828 1
                return
829 1
            self._intf_events[iface.id].update({"event": event})
830 1
            if "last_acquired" in self._intf_events[iface.id]:
831 1
                return
832 1
            self._intf_events[iface.id].update({"last_acquired": now()})
833 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
834 1
        with self._lock_interfaces[iface.id]:
835 1
            event = self._intf_events[iface.id]["event"]
836 1
            self._intf_events[iface.id].pop('last_acquired', None)
837 1
            _, _, event_type = event.name.rpartition('.')
838 1
            if event_type in ('link_up', 'created'):
839 1
                self.handle_interface_link_up(iface)
840 1
            elif event_type in ('link_down', 'deleted'):
841 1
                self.handle_interface_link_down(iface)
842
843 1
    def handle_interface_link_up(self, interface):
844
        """
845
        Handler for interface link_up events
846
        """
847
        log.info("Event handle_interface_link_up %s", interface)
848
        for evc in self.get_evcs_by_svc_level():
849
            with evc.lock:
850
                evc.handle_interface_link_up(
851
                    interface
852
                )
853
854 1
    def handle_interface_link_down(self, interface):
855
        """
856
        Handler for interface link_down events
857
        """
858
        log.info("Event handle_interface_link_down %s", interface)
859
        for evc in self.get_evcs_by_svc_level():
860
            with evc.lock:
861
                evc.handle_interface_link_down(
862
                    interface
863
                )
864
865 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
866 1
    def on_link_down(self, event):
867
        """Change circuit when link is down or under_mantenance."""
868
        self.handle_link_down(event)
869
870
    # pylint: disable=too-many-branches
871
    # pylint: disable=too-many-locals
872 1
    def handle_link_down(self, event):
873
        """Change circuit when link is down or under_mantenance."""
874 1
        link = event.content["link"]
875 1
        log.info("Event handle_link_down %s", link)
876 1
        switch_flows = {}
877 1
        evcs_with_failover = []
878 1
        evcs_normal = []
879 1
        check_failover = []
880 1
        failover_event_contents = {}
881
882 1
        for evc in self.get_evcs_by_svc_level():
883 1
            with evc.lock:
884 1
                if evc.is_affected_by_link(link):
885 1
                    evc.affected_by_link_at = event.timestamp
886
                    # if there is no failover path, handles link down the
887
                    # tradditional way
888 1
                    if (
889
                        not evc.failover_path or
890
                        evc.is_failover_path_affected_by_link(link)
891
                    ):
892 1
                        evcs_normal.append(evc)
893 1
                        continue
894 1
                    try:
895 1
                        dpid_flows = evc.get_failover_flows()
896 1
                        evc.old_path = evc.current_path
897 1
                        evc.current_path = evc.failover_path
898 1
                        evc.failover_path = Path([])
899
                    # pylint: disable=broad-except
900 1
                    except Exception:
901 1
                        err = traceback.format_exc().replace("\n", ", ")
902 1
                        log.error(
903
                            "Ignore Failover path for "
904
                            f"{evc} due to error: {err}"
905
                        )
906 1
                        evcs_normal.append(evc)
907 1
                        continue
908 1
                    for dpid, flows in dpid_flows.items():
909 1
                        switch_flows.setdefault(dpid, [])
910 1
                        switch_flows[dpid].extend(flows)
911 1
                    evcs_with_failover.append(evc)
912 1
                    failover_event_contents[evc.id] = map_evc_event_content(
913
                        evc,
914
                        flows={k: v.copy() for k, v in switch_flows.items()}
915
                    )
916 1
                elif evc.is_failover_path_affected_by_link(link):
917 1
                    evc.old_path = evc.failover_path
918 1
                    evc.failover_path = Path([])
919 1
                    check_failover.append(evc)
920
921 1
        if failover_event_contents:
922 1
            emit_event(self.controller, "failover_link_down",
923
                       content=failover_event_contents)
924 1
        send_flow_mods_event(self.controller, switch_flows, 'install')
925
926 1
        for evc in evcs_normal:
927 1
            emit_event(
928
                self.controller,
929
                "evc_affected_by_link_down",
930
                content={"link": link} | map_evc_event_content(evc)
931
            )
932
933 1
        evcs_to_update = []
934 1
        for evc in evcs_with_failover:
935 1
            evcs_to_update.append(evc.as_dict())
936 1
            log.info(
937
                f"{evc} redeployed with failover due to link down {link.id}"
938
            )
939 1
        for evc in check_failover:
940 1
            evcs_to_update.append(evc.as_dict())
941
942 1
        self.mongo_controller.update_evcs(evcs_to_update)
943
944 1
        emit_event(
945
            self.controller,
946
            "cleanup_evcs_old_path",
947
            content={"evcs": evcs_with_failover + check_failover}
948
        )
949
950 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
951 1
    def on_evc_affected_by_link_down(self, event):
952
        """Change circuit when link is down or under_mantenance."""
953
        self.handle_evc_affected_by_link_down(event)
954
955 1
    def handle_evc_affected_by_link_down(self, event):
956
        """Change circuit when link is down or under_mantenance."""
957 1
        evc = self.circuits.get(event.content["evc_id"])
958 1
        link = event.content['link']
959 1
        if not evc:
960 1
            return
961 1
        with evc.lock:
962 1
            if not evc.is_affected_by_link(link):
963
                return
964 1
            result = evc.handle_link_down()
965 1
        event_name = "error_redeploy_link_down"
966 1
        if result:
967 1
            log.info(f"{evc} redeployed due to link down {link.id}")
968 1
            event_name = "redeployed_link_down"
969 1
        emit_event(self.controller, event_name,
970
                   content=map_evc_event_content(evc))
971
972 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
973 1
    def on_evc_deployed(self, event):
974
        """Handle EVC deployed|redeployed_link_down."""
975
        self.handle_evc_deployed(event)
976
977 1
    def handle_evc_deployed(self, event):
978
        """Setup failover path on evc deployed."""
979
        evc = self.circuits.get(event.content["evc_id"])
980
        if not evc:
981
            return
982
        evc.try_setup_failover_path()
983
984 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
985 1
    def on_cleanup_evcs_old_path(self, event):
986
        """Handle cleanup evcs old path."""
987
        self.handle_cleanup_evcs_old_path(event)
988
989 1
    def handle_cleanup_evcs_old_path(self, event):
990
        """Handle cleanup evcs old path."""
991 1
        evcs = event.content.get("evcs", [])
992 1
        event_contents: dict[str, dict] = defaultdict(list)
993 1
        total_flows = {}
994 1
        for evc in evcs:
995 1
            if not evc.old_path:
996 1
                continue
997 1
            with evc.lock:
998 1
                removed_flows = {}
999 1
                try:
1000 1
                    nni_flows = evc._prepare_nni_flows(evc.old_path)
1001 1
                    uni_flows = evc._prepare_uni_flows(
1002
                        evc.old_path, skip_in=True
1003
                    )
1004 1
                    removed_flows = merge_flow_dicts(nni_flows, uni_flows)
1005
                # pylint: disable=broad-except
1006
                except Exception:
1007
                    err = traceback.format_exc().replace("\n", ", ")
1008
                    log.error(f"Fail to remove {evc} old_path: {err}")
1009
                    continue
1010 1
            if removed_flows:
1011 1
                total_flows = merge_flow_dicts(removed_flows, total_flows)
1012 1
                content = map_evc_event_content(
1013
                    evc,
1014
                    removed_flows=removed_flows,
1015
                    current_path=evc.current_path.as_dict(),
1016
                )
1017 1
                event_contents[evc.id] = content
1018 1
        if event_contents:
1019 1
            send_flow_mods_event(self.controller, total_flows, 'delete')
1020 1
            emit_event(self.controller, "failover_old_path",
1021
                       content=event_contents)
1022
1023 1
    @listen_to("kytos/topology.topology_loaded")
1024 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1025
        """Load EVCs once the topology is available."""
1026
        self.load_all_evcs()
1027
1028 1
    def load_all_evcs(self):
1029
        """Try to load all EVCs on startup."""
1030 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1031 1
        for circuit_id, circuit in circuits:
1032 1
            if circuit_id not in self.circuits:
1033 1
                self._load_evc(circuit)
1034 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1035
                   timeout=1)
1036
1037 1
    def _load_evc(self, circuit_dict):
1038
        """Load one EVC from mongodb to memory."""
1039 1
        try:
1040 1
            evc = self._evc_from_dict(circuit_dict)
1041 1
        except (ValueError, KytosTagError) as exception:
1042 1
            log.error(
1043
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1044
            )
1045 1
            return None
1046 1
        if evc.archived:
1047 1
            return None
1048
1049 1
        self.circuits.setdefault(evc.id, evc)
1050 1
        self.sched.add(evc)
1051 1
        return evc
1052
1053 1
    @listen_to("kytos/flow_manager.flow.error")
1054 1
    def on_flow_mod_error(self, event):
1055
        """Handle flow mod errors related to an EVC."""
1056
        self.handle_flow_mod_error(event)
1057
1058 1
    def handle_flow_mod_error(self, event):
1059
        """Handle flow mod errors related to an EVC."""
1060 1
        flow = event.content["flow"]
1061 1
        command = event.content.get("error_command")
1062 1
        if command != "add":
1063
            return
1064 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1065 1
        if evc:
1066 1
            with evc.lock:
1067 1
                path_name = 'current_path'
1068 1
                try:
1069 1
                    evc.remove_current_flows()
1070 1
                    path_name = 'failover_path'
1071 1
                    evc.remove_failover_flows()
1072 1
                except EVCPathNotDeleted as err:
1073 1
                    evc.deactivate_set_error(
1074
                        path_name, f'Error removing path: {err}'
1075
                    )
1076 1
                    evc.sync()
1077
1078 1
    def _evc_dict_with_instances(self, evc_dict):
1079
        """Convert some dict values to instance of EVC classes.
1080
1081
        This method will convert: [UNI, Link]
1082
        """
1083 1
        data = evc_dict.copy()  # Do not modify the original dict
1084 1
        for attribute, value in data.items():
1085
            # Get multiple attributes.
1086
            # Ex: uni_a, uni_z
1087 1
            if "uni" in attribute:
1088 1
                try:
1089 1
                    data[attribute] = self._uni_from_dict(value)
1090 1
                except ValueError as exception:
1091 1
                    result = "Error creating UNI: Invalid value"
1092 1
                    raise ValueError(result) from exception
1093
1094 1
            if attribute == "circuit_scheduler":
1095 1
                data[attribute] = []
1096 1
                for schedule in value:
1097 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1098
1099
            # Get multiple attributes.
1100
            # Ex: primary_links,
1101
            #     backup_links,
1102
            #     current_links_cache,
1103
            #     primary_links_cache,
1104
            #     backup_links_cache
1105 1
            if "links" in attribute:
1106 1
                data[attribute] = [
1107
                    self._link_from_dict(link) for link in value
1108
                ]
1109
1110
            # Ex: current_path,
1111
            #     primary_path,
1112
            #     backup_path
1113 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1114 1
                data[attribute] = Path(
1115
                    [self._link_from_dict(link) for link in value]
1116
                )
1117
1118 1
        return data
1119
1120 1
    def _evc_from_dict(self, evc_dict):
1121 1
        data = self._evc_dict_with_instances(evc_dict)
1122 1
        data["table_group"] = self.table_group
1123 1
        return EVC(self.controller, **data)
1124
1125 1
    def _uni_from_dict(self, uni_dict):
1126
        """Return a UNI object from python dict."""
1127 1
        if uni_dict is None:
1128 1
            return False
1129
1130 1
        interface_id = uni_dict.get("interface_id")
1131 1
        interface = self.controller.get_interface_by_id(interface_id)
1132 1
        if interface is None:
1133 1
            result = (
1134
                "Error creating UNI:"
1135
                + f"Could not instantiate interface {interface_id}"
1136
            )
1137 1
            raise ValueError(result) from ValueError
1138 1
        tag_convert = {1: "vlan"}
1139 1
        tag_dict = uni_dict.get("tag", None)
1140 1
        if tag_dict:
1141 1
            tag_type = tag_dict.get("tag_type")
1142 1
            tag_type = tag_convert.get(tag_type, tag_type)
1143 1
            tag_value = tag_dict.get("value")
1144 1
            if isinstance(tag_value, list):
1145 1
                tag_value = get_tag_ranges(tag_value)
1146 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1147 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1148
            else:
1149 1
                tag = TAG(tag_type, tag_value)
1150
        else:
1151 1
            tag = None
1152 1
        uni = UNI(interface, tag)
1153 1
        return uni
1154
1155 1
    def _link_from_dict(self, link_dict):
1156
        """Return a Link object from python dict."""
1157 1
        id_a = link_dict.get("endpoint_a").get("id")
1158 1
        id_b = link_dict.get("endpoint_b").get("id")
1159
1160 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1161 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1162 1
        if not endpoint_a:
1163 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1164 1
            raise ValueError(error_msg)
1165 1
        if not endpoint_b:
1166
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1167
            raise ValueError(error_msg)
1168
1169 1
        link = Link(endpoint_a, endpoint_b)
1170 1
        if "metadata" in link_dict:
1171 1
            link.extend_metadata(link_dict.get("metadata"))
1172
1173 1
        s_vlan = link.get_metadata("s_vlan")
1174 1
        if s_vlan:
1175 1
            tag = TAG.from_dict(s_vlan)
1176 1
            if tag is False:
1177
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1178
                raise ValueError(error_msg)
1179 1
            link.update_metadata("s_vlan", tag)
1180 1
        return link
1181
1182 1
    def _find_evc_by_schedule_id(self, schedule_id):
1183
        """
1184
        Find an EVC and CircuitSchedule based on schedule_id.
1185
1186
        :param schedule_id: Schedule ID
1187
        :return: EVC and Schedule
1188
        """
1189 1
        circuits = self._get_circuits_buffer()
1190 1
        found_schedule = None
1191 1
        evc = None
1192
1193
        # pylint: disable=unused-variable
1194 1
        for c_id, circuit in circuits.items():
1195 1
            for schedule in circuit.circuit_scheduler:
1196 1
                if schedule.id == schedule_id:
1197 1
                    found_schedule = schedule
1198 1
                    evc = circuit
1199 1
                    break
1200 1
            if found_schedule:
1201 1
                break
1202 1
        return evc, found_schedule
1203
1204 1
    def _get_circuits_buffer(self):
1205
        """
1206
        Return the circuit buffer.
1207
1208
        If the buffer is empty, try to load data from mongodb.
1209
        """
1210 1
        if not self.circuits:
1211
            # Load circuits from mongodb to buffer
1212 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1213 1
            for c_id, circuit in circuits.items():
1214 1
                evc = self._evc_from_dict(circuit)
1215 1
                self.circuits[c_id] = evc
1216 1
        return self.circuits
1217
1218
    # pylint: disable=attribute-defined-outside-init
1219 1
    @alisten_to("kytos/of_multi_table.enable_table")
1220 1
    async def on_table_enabled(self, event):
1221
        """Handle a recently table enabled."""
1222 1
        table_group = event.content.get("mef_eline", None)
1223 1
        if not table_group:
1224 1
            return
1225 1
        for group in table_group:
1226 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1227 1
                log.error(f'The table group "{group}" is not allowed for '
1228
                          f'mef_eline. Allowed table groups are '
1229
                          f'{settings.TABLE_GROUP_ALLOWED}')
1230 1
                return
1231 1
        self.table_group.update(table_group)
1232 1
        content = {"group_table": self.table_group}
1233 1
        name = "kytos/mef_eline.enable_table"
1234
        await aemit_event(self.controller, name, content)
1235