Test Failed
Pull Request — master (#583)
by
unknown
06:03 queued 20s
created

build.main.Main.get_circuit()   A

Complexity

Conditions 2

Size

Total Lines 13
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 2
dl 0
loc 13
ccs 10
cts 10
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13
from typing import Optional
14 1
15
from pydantic import ValidationError
16 1
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21 1
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25 1
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29 1
                                              DuplicatedNoTagUNI, InvalidPath)
30
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
31 1
                                          Path)
32 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
33
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
34
                                         emit_event, get_vlan_tags_and_masks,
35
                                         map_evc_event_content,
36
                                         merge_flow_dicts, prepare_delete_flow,
37
                                         send_flow_mods_event)
38
39
40 1
# pylint: disable=too-many-public-methods
41
class Main(KytosNApp):
42
    """Main class of amlight/mef_eline NApp.
43
44
    This class is the entry point for this napp.
45
    """
46 1
47
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
48 1
49
    def setup(self):
50
        """Replace the '__init__' method for the KytosNApp subclass.
51
52
        The setup method is automatically called by the controller when your
53
        application is loaded.
54
55
        So, if you have any setup routine, insert it here.
56
        """
57 1
        # object used to scheduler circuit events
58
        self.sched = Scheduler()
59
60 1
        # object to save and load circuits
61 1
        self.mongo_controller = self.get_eline_controller()
62
        self.mongo_controller.bootstrap_indexes()
63
64 1
        # set the controller that will manager the dynamic paths
65
        DynamicPathManager.set_controller(self.controller)
66
67
        # dictionary of EVCs created. It acts as a circuit buffer.
68 1
        # Every create/update/delete must be synced to mongodb.
69
        self.circuits = dict[str, EVC]()
70 1
71 1
        self._intf_events = defaultdict(dict)
72 1
        self._lock_interfaces = defaultdict(Lock)
73 1
        self.table_group = {"epl": 0, "evpl": 0}
74 1
        self._lock = Lock()
75
        self.multi_evc_lock = Lock()
76 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
77 1
78
        self.load_all_evcs()
79 1
        self._topology_updated_at = None
80
81
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
82
        """Get circuits sorted by desc service level and asc creation_time.
83
84 1
        In the future, as more ops are offloaded it should be get from the DB.
85 1
        """
86
        if enable_filter:
87
            return sorted(
88
                          [circuit for circuit in self.circuits.values()
89
                           if circuit.is_enabled()],
90 1
                          key=lambda x: (-x.service_level, x.creation_time),
91
            )
92
        return sorted(self.circuits.values(),
93 1
                      key=lambda x: (-x.service_level, x.creation_time))
94 1
95
    @staticmethod
96
    def get_eline_controller():
97
        """Return the ELineController instance."""
98 1
        return controllers.ELineController()
99
100 1
    def execute(self):
101 1
        """Execute once when the napp is running."""
102 1
        if self._lock.locked():
103 1
            return
104 1
        log.debug("Starting consistency routine")
105 1
        with self._lock:
106
            self.execute_consistency()
107 1
        log.debug("Finished consistency routine")
108
109
    def should_be_checked(self, circuit):
110 1
        "Verify if the circuit meets the necessary conditions to be checked"
111
        # pylint: disable=too-many-boolean-expressions
112
        if (
113
                circuit.is_enabled()
114
                and not circuit.is_active()
115
                and not circuit.lock.locked()
116
                and not circuit.has_recent_removed_flow()
117
                and not circuit.is_recent_updated()
118
                and circuit.are_unis_active(self.controller.switches)
119
                # if a inter-switch EVC does not have current_path, it does not
120
                # make sense to run sdntrace on it
121 1
                and (circuit.is_intra_switch() or circuit.current_path)
122
                ):
123
            return True
124 1
        return False
125
126 1
    def execute_consistency(self):
127 1
        """Execute consistency routine."""
128 1
        circuits_to_check = []
129 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
130 1
            if self.should_be_checked(circuit):
131 1
                circuits_to_check.append(circuit)
132 1
            circuit.try_setup_failover_path()
133 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
134 1
        for circuit in circuits_to_check:
135 1
            is_checked = circuits_checked.get(circuit.id)
136 1
            if is_checked:
137 1
                circuit.execution_rounds = 0
138 1
                log.info(f"{circuit} enabled but inactive - activating")
139 1
                with circuit.lock:
140
                    circuit.activate()
141 1
                    circuit.sync()
142 1
            else:
143 1
                circuit.execution_rounds += 1
144 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
145 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
146
                    with circuit.lock:
147 1
                        circuit.deploy()
148
149
    def shutdown(self):
150
        """Execute when your napp is unloaded.
151
152
        If you have some cleanup procedure, insert it here.
153 1
        """
154 1
155
    @rest("/v2/evc/", methods=["GET"])
156
    def list_circuits(self, request: Request) -> JSONResponse:
157
        """Endpoint to return circuits stored.
158
159
        archive query arg if defined (not null) will be filtered
160 1
        accordingly, by default only non archived evcs will be listed
161 1
        """
162 1
        log.debug("list_circuits /v2/evc")
163 1
        args = request.query_params
164 1
        archived = args.get("archived", "false").lower()
165
        args = {k: v for k, v in args.items() if k not in {"archived"}}
166 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
167 1
                                                      metadata=args)
168
        circuits = circuits['circuits']
169 1
        return JSONResponse(circuits)
170 1
171
    @rest("/v2/evc/schedule", methods=["GET"])
172
    def list_schedules(self, _request: Request) -> JSONResponse:
173
        """Endpoint to return all schedules stored for all circuits.
174
175
        Return a JSON with the following template:
176
        [{"schedule_id": <schedule_id>,
177
         "circuit_id": <circuit_id>,
178 1
         "schedule": <schedule object>}]
179 1
        """
180 1
        log.debug("list_schedules /v2/evc/schedule")
181 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
182 1
        if not circuits:
183 1
            result = {}
184
            status = 200
185 1
            return JSONResponse(result, status_code=status)
186 1
187 1
        result = []
188 1
        status = 200
189 1
        for circuit in circuits:
190 1
            circuit_scheduler = circuit.get("circuit_scheduler")
191 1
            if circuit_scheduler:
192
                for scheduler in circuit_scheduler:
193
                    value = {
194
                        "schedule_id": scheduler.get("id"),
195
                        "circuit_id": circuit.get("id"),
196 1
                        "schedule": scheduler,
197
                    }
198 1
                    result.append(value)
199 1
200
        log.debug("list_schedules result %s %s", result, status)
201 1
        return JSONResponse(result, status_code=status)
202 1
203
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
204 1
    def get_circuit(self, request: Request) -> JSONResponse:
205 1
        """Endpoint to return a circuit based on id."""
206 1
        circuit_id = request.path_params["circuit_id"]
207 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
208 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
209 1
        if not circuit:
210 1
            result = f"circuit_id {circuit_id} not found"
211 1
            log.debug("get_circuit result %s %s", result, 404)
212 1
            raise HTTPException(404, detail=result)
213 1
        status = 200
214
        log.debug("get_circuit result %s %s", circuit, status)
215
        return JSONResponse(circuit, status_code=status)
216 1
217 1
    # pylint: disable=too-many-branches, too-many-statements
218 1
    @rest("/v2/evc/", methods=["POST"])
219
    @validate_openapi(spec)
220
    def create_circuit(self, request: Request) -> JSONResponse:
221
        """Try to create a new circuit.
222
223
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
224
        UNI_Z's requested C-VID are available from the interfaces' pools. This
225
        is checked when creating the UNI object.
226
227
        Then, E-Line NApp requests a primary and a backup path to the
228
        Pathfinder NApp using the attributes primary_links and backup_links
229
        submitted via REST
230
231
        # For each link composing paths in #3:
232
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
233
        #  - Using the S-VID obtained, generate abstract flow entries to be
234
        #    sent to FlowManager
235
236
        Push abstract flow entries to FlowManager and FlowManager pushes
237
        OpenFlow entries to datapaths
238
239
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
240
        creation
241
242
        Finnaly, notify user of the status of its request.
243 1
        """
244 1
        # Try to create the circuit object
245
        log.debug("create_circuit /v2/evc/")
246 1
        data = get_json_or_400(request, self.controller.loop)
247 1
248 1
        try:
249 1
            evc = self._evc_from_dict(data)
250 1
        except (ValueError, KytosTagError) as exception:
251 1
            log.debug("create_circuit result %s %s", exception, 400)
252 1
            raise HTTPException(400, detail=str(exception)) from exception
253 1
        try:
254 1
            check_disabled_component(evc.uni_a, evc.uni_z)
255 1
        except DisabledSwitch as exception:
256
            log.debug("create_circuit result %s %s", exception, 409)
257
            raise HTTPException(
258
                    409,
259
                    detail=f"Path is not valid: {exception}"
260 1
                ) from exception
261 1
262 1
        if evc.primary_path:
263
            try:
264
                evc.primary_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267 1
                    bool(evc.circuit_scheduler),
268 1
                )
269
            except InvalidPath as exception:
270
                raise HTTPException(
271
                    400,
272 1
                    detail=f"primary_path is not valid: {exception}"
273 1
                ) from exception
274 1
        if evc.backup_path:
275
            try:
276
                evc.backup_path.is_valid(
277
                    evc.uni_a.interface.switch,
278
                    evc.uni_z.interface.switch,
279 1
                    bool(evc.circuit_scheduler),
280 1
                )
281
            except InvalidPath as exception:
282
                raise HTTPException(
283
                    400,
284
                    detail=f"backup_path is not valid: {exception}"
285 1
                ) from exception
286 1
287 1
        if not evc._tag_lists_equal():
288
            detail = "UNI_A and UNI_Z tag lists should be the same."
289 1
            raise HTTPException(400, detail=detail)
290 1
291 1
        try:
292 1
            evc._validate_has_primary_or_dynamic()
293
        except ValueError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295 1
296
        try:
297
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
298
        except DuplicatedNoTagUNI as exception:
299
            log.debug("create_circuit result %s %s", exception, 409)
300 1
            raise HTTPException(409, detail=str(exception)) from exception
301 1
302 1
        try:
303 1
            self._use_uni_tags(evc)
304
        except KytosTagError as exception:
305
            raise HTTPException(400, detail=str(exception)) from exception
306 1
307 1
        # save circuit
308
        try:
309
            evc.sync()
310
        except ValidationError as exception:
311
            raise HTTPException(400, detail=str(exception)) from exception
312 1
313
        # store circuit in dictionary
314
        self.circuits[evc.id] = evc
315 1
316
        # Schedule the circuit deploy
317
        self.sched.add(evc)
318 1
319 1
        # Circuit has no schedule, deploy now
320 1
        deployed = False
321 1
        if not evc.circuit_scheduler:
322
            with evc.lock:
323
                deployed = evc.deploy()
324 1
325 1
        # Notify users
326 1
        result = {"circuit_id": evc.id, "deployed": deployed}
327 1
        status = 201
328
        log.debug("create_circuit result %s %s", result, status)
329 1
        emit_event(self.controller, name="created",
330
                   content=map_evc_event_content(evc))
331 1
        return JSONResponse(result, status_code=status)
332 1
333 1
    @staticmethod
334 1
    def _use_uni_tags(evc):
335 1
        uni_a = evc.uni_a
336 1
        evc._use_uni_vlan(uni_a)
337 1
        try:
338 1
            uni_z = evc.uni_z
339 1
            evc._use_uni_vlan(uni_z)
340 1
        except KytosTagError as err:
341
            evc.make_uni_vlan_available(uni_a)
342 1
            raise err
343 1
344
    @listen_to('kytos/flow_manager.flow.removed')
345
    def on_flow_delete(self, event):
346
        """Capture delete messages to keep track when flows got removed."""
347 1
        self.handle_flow_delete(event)
348
349 1
    def handle_flow_delete(self, event):
350 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
351 1
        flow = event.content["flow"]
352 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
353 1
        if evc:
354
            log.debug("Flow removed in EVC %s", evc.id)
355 1
            evc.set_flow_removed_at()
356 1
357 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
358
    @validate_openapi(spec)
359
    def update(self, request: Request) -> JSONResponse:
360
        """Update a circuit based on payload.
361
362
        The EVC attributes (creation_time, active, current_path,
363 1
        failover_path, _id, archived) can't be updated.
364 1
        """
365 1
        data = get_json_or_400(request, self.controller.loop)
366 1
        circuit_id = request.path_params["circuit_id"]
367 1
        log.debug("update /v2/evc/%s", circuit_id)
368 1
        try:
369 1
            evc = self.circuits[circuit_id]
370 1
        except KeyError:
371 1
            result = f"circuit_id {circuit_id} not found"
372
            log.debug("update result %s %s", result, 404)
373 1
            raise HTTPException(404, detail=result) from KeyError
374 1
375 1
        try:
376
            updated_data = self._evc_dict_with_instances(data)
377
            self._check_no_tag_duplication(
378
                circuit_id, updated_data.get("uni_a"),
379 1
                updated_data.get("uni_z")
380 1
            )
381 1
            enable, redeploy = evc.update(**updated_data)
382 1
        except (ValueError, KytosTagError, ValidationError) as exception:
383 1
            log.debug("update result %s %s", exception, 400)
384
            raise HTTPException(400, detail=str(exception)) from exception
385
        except DuplicatedNoTagUNI as exception:
386 1
            log.debug("update result %s %s", exception, 409)
387 1
            raise HTTPException(409, detail=str(exception)) from exception
388 1
        except DisabledSwitch as exception:
389
            log.debug("update result %s %s", exception, 409)
390
            raise HTTPException(
391
                    409,
392 1
                    detail=f"Path is not valid: {exception}"
393 1
                ) from exception
394
        redeployed = False
395
        if evc.is_active():
396
            if enable is False:  # disable if active
397
                with evc.lock:
398
                    evc.remove()
399
            elif redeploy is not None:  # redeploy if active
400
                with evc.lock:
401
                    evc.remove()
402 1
                    redeployed = evc.deploy()
403 1
        else:
404 1
            if enable is True:  # enable if inactive
405 1
                with evc.lock:
406 1
                    redeployed = evc.deploy()
407 1
            elif evc.is_enabled() and redeploy:
408 1
                with evc.lock:
409 1
                    evc.remove()
410 1
                    redeployed = evc.deploy()
411
        result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
412 1
        status = 200
413 1
414
        log.debug("update result %s %s", result, status)
415 1
        emit_event(self.controller, "updated",
416
                   content=map_evc_event_content(evc, **data))
417 1
        return JSONResponse(result, status_code=status)
418 1
419
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
420
    def delete_circuit(self, request: Request) -> JSONResponse:
421
        """Remove a circuit.
422
423
        First, the flows are removed from the switches, and then the EVC is
424 1
        disabled.
425 1
        """
426 1
        circuit_id = request.path_params["circuit_id"]
427 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
428 1
        try:
429 1
            evc = self.circuits.pop(circuit_id)
430 1
        except KeyError:
431 1
            result = f"circuit_id {circuit_id} not found"
432 1
            log.debug("delete_circuit result %s %s", result, 404)
433
            raise HTTPException(404, detail=result) from KeyError
434 1
        log.info("Removing %s", evc)
435 1
436 1
        with evc.lock:
437 1
            if not evc.archived:
438 1
                evc.deactivate()
439 1
                evc.disable()
440 1
                self.sched.remove(evc)
441 1
                evc.remove_current_flows(sync=False)
442 1
                evc.remove_failover_flows(sync=False)
443 1
                evc.archive()
444 1
                evc.remove_uni_tags()
445
                evc.sync()
446
                emit_event(
447
                    self.controller, "deleted",
448
                    content=map_evc_event_content(evc)
449 1
                )
450 1
451 1
        log.info("EVC removed. %s", evc)
452 1
        result = {"response": f"Circuit {circuit_id} removed"}
453
        status = 200
454 1
        log.debug("delete_circuit result %s %s", result, status)
455
456 1
        return JSONResponse(result, status_code=status)
457 1
458
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
459 1
    def get_metadata(self, request: Request) -> JSONResponse:
460 1
        """Get metadata from an EVC."""
461 1
        circuit_id = request.path_params["circuit_id"]
462
        try:
463
            return (
464
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
465
            )
466
        except KeyError as error:
467
            raise HTTPException(
468
                404,
469
                detail=f"circuit_id {circuit_id} not found."
470 1
            ) from error
471 1
472 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
473
    @validate_openapi(spec)
474 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
475 1
        """Add metadata to a bulk of EVCs."""
476
        data = get_json_or_400(request, self.controller.loop)
477 1
        circuit_ids = data.pop("circuit_ids")
478
479 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
480 1
481 1
        fail_evcs = []
482 1
        for _id in circuit_ids:
483 1
            try:
484 1
                evc = self.circuits[_id]
485 1
                evc.extend_metadata(data)
486
            except KeyError:
487 1
                fail_evcs.append(_id)
488 1
489 1
        if fail_evcs:
490
            raise HTTPException(404, detail=fail_evcs)
491 1
        return JSONResponse("Operation successful", status_code=201)
492 1
493 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
494
    @validate_openapi(spec)
495 1
    def add_metadata(self, request: Request) -> JSONResponse:
496 1
        """Add metadata to an EVC."""
497 1
        circuit_id = request.path_params["circuit_id"]
498
        metadata = get_json_or_400(request, self.controller.loop)
499 1
        if not isinstance(metadata, dict):
500 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
501 1
        try:
502 1
            evc = self.circuits[circuit_id]
503
        except KeyError as error:
504
            raise HTTPException(
505
                404,
506
                detail=f"circuit_id {circuit_id} not found."
507 1
            ) from error
508 1
509 1
        evc.extend_metadata(metadata)
510
        evc.sync()
511 1
        return JSONResponse("Operation successful", status_code=201)
512 1
513 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
514
    @validate_openapi(spec)
515 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
516 1
        """Delete metada from a bulk of EVCs"""
517 1
        data = get_json_or_400(request, self.controller.loop)
518 1
        key = request.path_params["key"]
519
        circuit_ids = data.pop("circuit_ids")
520
        self.mongo_controller.update_evcs_metadata(
521
            circuit_ids, {key: ""}, "del"
522 1
        )
523 1
524 1
        fail_evcs = []
525 1
        for _id in circuit_ids:
526 1
            try:
527 1
                evc = self.circuits[_id]
528 1
                evc.remove_metadata(key)
529
            except KeyError:
530 1
                fail_evcs.append(_id)
531 1
532 1
        if fail_evcs:
533
            raise HTTPException(404, detail=fail_evcs)
534 1
        return JSONResponse("Operation successful")
535 1
536
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
537 1
    def delete_metadata(self, request: Request) -> JSONResponse:
538 1
        """Delete metadata from an EVC."""
539 1
        circuit_id = request.path_params["circuit_id"]
540 1
        key = request.path_params["key"]
541 1
        try:
542 1
            evc = self.circuits[circuit_id]
543
        except KeyError as error:
544
            raise HTTPException(
545
                404,
546
                detail=f"circuit_id {circuit_id} not found."
547 1
            ) from error
548 1
549 1
        evc.remove_metadata(key)
550
        evc.sync()
551 1
        return JSONResponse("Operation successful")
552 1
553
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
554 1
    def redeploy(self, request: Request) -> JSONResponse:
555 1
        """Endpoint to force the redeployment of an EVC."""
556
        circuit_id = request.path_params["circuit_id"]
557
        try_avoid_same_s_vlan = request.query_params.get(
558 1
            "try_avoid_same_s_vlan", "true"
559 1
        )
560
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
561
        if try_avoid_same_s_vlan not in {"true", "false"}:
562 1
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
563 1
            raise HTTPException(400, detail=msg)
564 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
565 1
        try:
566 1
            evc = self.circuits[circuit_id]
567
        except KeyError:
568
            raise HTTPException(
569
                404,
570 1
                detail=f"circuit_id {circuit_id} not found"
571 1
            ) from KeyError
572 1
        deployed = False
573 1
        if evc.is_enabled():
574
            with evc.lock:
575
                path_dict = evc.remove_current_flows(
576
                    sync=False,
577 1
                    return_path=try_avoid_same_s_vlan == "true"
578 1
                )
579 1
                evc.remove_failover_flows(sync=True)
580 1
                deployed = evc.deploy(path_dict)
581 1
        if deployed:
582
            result = {"response": f"Circuit {circuit_id} redeploy received."}
583 1
            status = 202
584
        else:
585
            result = {
586 1
                "response": f"Circuit {circuit_id} is disabled."
587
            }
588 1
            status = 409
589
590 1
        return JSONResponse(result, status_code=status)
591 1
592 1
    @rest("/v2/evc/schedule/", methods=["POST"])
593
    @validate_openapi(spec)
594
    def create_schedule(self, request: Request) -> JSONResponse:
595
        """
596
        Create a new schedule for a given circuit.
597
598
        This service do no check if there are conflicts with another schedule.
599
        Payload example:
600
            {
601
              "circuit_id":"aa:bb:cc",
602
              "schedule": {
603
                "date": "2019-08-07T14:52:10.967Z",
604
                "interval": "string",
605
                "frequency": "1 * * * *",
606
                "action": "create"
607
              }
608 1
            }
609 1
        """
610 1
        log.debug("create_schedule /v2/evc/schedule/")
611 1
        data = get_json_or_400(request, self.controller.loop)
612
        circuit_id = data["circuit_id"]
613
        schedule_data = data["schedule"]
614 1
615
        # Get EVC from circuits buffer
616
        circuits = self._get_circuits_buffer()
617 1
618
        # get the circuit
619
        evc = circuits.get(circuit_id)
620 1
621 1
        # get the circuit
622 1
        if not evc:
623 1
            result = f"circuit_id {circuit_id} not found"
624
            log.debug("create_schedule result %s %s", result, 404)
625
            raise HTTPException(404, detail=result)
626 1
627
        # new schedule from dict
628
        new_schedule = CircuitSchedule.from_dict(schedule_data)
629 1
630 1
        # If there is no schedule, create the list
631
        if not evc.circuit_scheduler:
632
            evc.circuit_scheduler = []
633 1
634
        # Add the new schedule
635
        evc.circuit_scheduler.append(new_schedule)
636 1
637
        # Add schedule job
638
        self.sched.add_circuit_job(evc, new_schedule)
639 1
640
        # save circuit to mongodb
641 1
        evc.sync()
642 1
643
        result = new_schedule.as_dict()
644 1
        status = 201
645 1
646
        log.debug("create_schedule result %s %s", result, status)
647 1
        return JSONResponse(result, status_code=status)
648 1
649 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
650
    @validate_openapi(spec)
651
    def update_schedule(self, request: Request) -> JSONResponse:
652
        """Update a schedule.
653
654
        Change all attributes from the given schedule from a EVC circuit.
655
        The schedule ID is preserved as default.
656
        Payload example:
657
            {
658
              "date": "2019-08-07T14:52:10.967Z",
659
              "interval": "string",
660
              "frequency": "1 * * *",
661
              "action": "create"
662 1
            }
663 1
        """
664 1
        data = get_json_or_400(request, self.controller.loop)
665
        schedule_id = request.path_params["schedule_id"]
666
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
667 1
668
        # Try to find a circuit schedule
669
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
670 1
671 1
        # Can not modify circuits deleted and archived
672 1
        if not found_schedule:
673 1
            result = f"schedule_id {schedule_id} not found"
674
            log.debug("update_schedule result %s %s", result, 404)
675 1
            raise HTTPException(404, detail=result)
676 1
677
        new_schedule = CircuitSchedule.from_dict(data)
678 1
        new_schedule.id = found_schedule.id
679
        # Remove the old schedule
680 1
        evc.circuit_scheduler.remove(found_schedule)
681
        # Append the modified schedule
682
        evc.circuit_scheduler.append(new_schedule)
683 1
684
        # Cancel all schedule jobs
685 1
        self.sched.cancel_job(found_schedule.id)
686
        # Add the new circuit schedule
687 1
        self.sched.add_circuit_job(evc, new_schedule)
688
        # Save EVC to mongodb
689 1
        evc.sync()
690 1
691
        result = new_schedule.as_dict()
692 1
        status = 200
693 1
694
        log.debug("update_schedule result %s %s", result, status)
695 1
        return JSONResponse(result, status_code=status)
696 1
697
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
698
    def delete_schedule(self, request: Request) -> JSONResponse:
699
        """Remove a circuit schedule.
700
701
        Remove the Schedule from EVC.
702
        Remove the Schedule from cron job.
703 1
        Save the EVC to the Storehouse.
704 1
        """
705 1
        schedule_id = request.path_params["schedule_id"]
706
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
707
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
708 1
709 1
        # Can not modify circuits deleted and archived
710 1
        if not found_schedule:
711 1
            result = f"schedule_id {schedule_id} not found"
712
            log.debug("delete_schedule result %s %s", result, 404)
713
            raise HTTPException(404, detail=result)
714 1
715
        # Remove the old schedule
716
        evc.circuit_scheduler.remove(found_schedule)
717 1
718
        # Cancel all schedule jobs
719 1
        self.sched.cancel_job(found_schedule.id)
720
        # Save EVC to mongodb
721 1
        evc.sync()
722 1
723
        result = "Schedule removed"
724 1
        status = 200
725 1
726
        log.debug("delete_schedule result %s %s", result, status)
727 1
        return JSONResponse(result, status_code=status)
728
729
    def _check_no_tag_duplication(
730
        self,
731
        evc_id: str,
732
        uni_a: Optional[UNI] = None,
733
        uni_z: Optional[UNI] = None
734
    ):
735
        """Check if the given EVC has UNIs with no tag and if these are
736
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
737
        Args:
738
            evc (dict): EVC to be analyzed.
739
        """
740 1
741 1
        # No UNIs
742
        if not (uni_a or uni_z):
743 1
            return
744
745 1
        if (not (uni_a and not uni_a.user_tag) and
746 1
                not (uni_z and not uni_z.user_tag)):
747 1
            return
748 1
        for circuit in self.circuits.copy().values():
749 1
            if (not circuit.archived and circuit._id != evc_id):
750 1
                if uni_a and uni_a.user_tag is None:
751 1
                    circuit.check_no_tag_duplicate(uni_a)
752
                if uni_z and uni_z.user_tag is None:
753 1
                    circuit.check_no_tag_duplicate(uni_z)
754 1
755
    @listen_to("kytos/topology.link_up")
756
    def on_link_up(self, event):
757
        """Change circuit when link is up or end_maintenance."""
758 1
        self.handle_link_up(event)
759
760 1
    def handle_link_up(self, event):
761 1
        """Change circuit when link is up or end_maintenance."""
762 1
        log.info("Event handle_link_up %s", event.content["link"])
763 1
        for evc in self.get_evcs_by_svc_level():
764 1
            if evc.is_enabled() and not evc.archived:
765
                with evc.lock:
766
                    evc.handle_link_up(event.content["link"])
767 1
768
    # Possibly replace this with interruptions?
769
    @listen_to(
770 1
        '.*.switch.interface.(link_up|link_down|created|deleted)'
771
    )
772
    def on_interface_link_change(self, event: KytosEvent):
773
        """
774
        Handler for interface link_up and link_down events.
775
        """
776 1
        self.handle_on_interface_link_change(event)
777
778
    def handle_on_interface_link_change(self, event: KytosEvent):
779
        """
780
        Handler to sort interface events {link_(up, down), create, deleted}
781
782
        To avoid multiple database updated (link flap):
783
        Every interface is identfied and processed in parallel.
784
        Once an interface event is received a time is started.
785
        While time is running self._intf_events will be updated.
786 1
        After time has passed last received event will be processed.
787 1
        """
788 1
        iface = event.content.get("interface")
789
        with self._lock_interfaces[iface.id]:
790 1
            _now = event.timestamp
791
            # Return out of order events
792
            if (
793
                iface.id in self._intf_events
794 1
                and self._intf_events[iface.id]["event"].timestamp > _now
795 1
            ):
796 1
                return
797 1
            self._intf_events[iface.id].update({"event": event})
798 1
            if "last_acquired" in self._intf_events[iface.id]:
799 1
                return
800 1
            self._intf_events[iface.id].update({"last_acquired": now()})
801 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
802 1
        with self._lock_interfaces[iface.id]:
803 1
            event = self._intf_events[iface.id]["event"]
804 1
            self._intf_events[iface.id].pop('last_acquired', None)
805 1
            _, _, event_type = event.name.rpartition('.')
806 1
            if event_type in ('link_up', 'created'):
807 1
                self.handle_interface_link_up(iface)
808
            elif event_type in ('link_down', 'deleted'):
809 1
                self.handle_interface_link_down(iface)
810
811
    def handle_interface_link_up(self, interface):
812
        """
813
        Handler for interface link_up events
814
        """
815
        log.info("Event handle_interface_link_up %s", interface)
816
        for evc in self.get_evcs_by_svc_level():
817
            with evc.lock:
818
                evc.handle_interface_link_up(
819
                    interface
820 1
                )
821
822
    def handle_interface_link_down(self, interface):
823
        """
824
        Handler for interface link_down events
825
        """
826
        log.info("Event handle_interface_link_down %s", interface)
827
        for evc in self.get_evcs_by_svc_level():
828
            with evc.lock:
829
                evc.handle_interface_link_down(
830
                    interface
831 1
                )
832 1
833
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
834
    def on_link_down(self, event):
835
        """Change circuit when link is down or under_mantenance."""
836
        self.handle_link_down(event)
837
838 1
    def prepare_swap_to_failover(self, evc: EVC):
839
        """Prepare an evc for switching to failover."""
840 1
        install_flows = {}
841 1
        try:
842 1
            install_flows = evc.get_failover_flows()
843 1
            evc.old_path = evc.current_path
844 1
            evc.current_path = evc.failover_path
845 1
            evc.failover_path = Path([])
846 1
        except Exception:
847
            err = traceback.format_exc().replace("\n", ", ")
848 1
            log.error(
849 1
                "Ignore Failover path for "
850 1
                f"{evc} due to error: {err}"
851 1
            )
852
        return install_flows
853
    
854 1
    def execute_swap_to_failover(self, event_contents, install_flows):
855
        """Process changes needed to commit a swap to failover."""
856
        emit_event(
857
            self.controller, "failover_link_down",
858 1
            content=deepcopy(event_contents)
859 1
        )
860 1
        send_flow_mods_event(
861 1
            self.controller,
862 1
            install_flows,
863 1
            "install"
864 1
        )
865
866 1
    def prepare_clear_old_path(self, evc: EVC):
867 1
        """Prepare an evc for clearing the old path."""
868 1
        del_flows = {}
869
        try:
870
            del_flows = prepare_delete_flow(
871
                merge_flow_dicts(
872 1
                    evc._prepare_uni_flows(evc.old_path, skip_in=True),
873 1
                    evc._prepare_nni_flows(evc.old_path)
874 1
                )
875 1
            )
876 1
        except Exception:
877 1
            err = traceback.format_exc().replace("\n", ", ")
878 1
            log.error(f"Fail to remove {evc} old_path: {err}")
879
        return del_flows
880
    
881
    def execute_clear_old_path(self, evcs: list[EVC], event_contents, delete_flows):
882 1
        """Process changes needed to commit clearing the old path"""
883 1
        send_flow_mods_event(
884 1
            self.controller,
885 1
            delete_flows,
886
            'delete'
887 1
        )
888 1
        emit_event(
889
            self.controller,
890 1
            "failover_old_path",
891
            content=event_contents
892 1
        )
893 1
        for evc in evcs:
894
            evc.old_path.make_vlans_available(self.controller)
895
            evc.old_path = Path([])
896
897
    def prepare_undeploy(self, evc: EVC):
898
        """Prepare an evc for undeploying."""
899 1
        del_flows = {}
900 1
        try:
901 1
            del_flows = prepare_delete_flow(
902 1
                merge_flow_dicts(
903
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
904
                    evc._prepare_nni_flows(evc.current_path),
905 1
                    evc._prepare_nni_flows(evc.failover_path)
906 1
                )
907
            )
908 1
        except Exception:
909
            # TODO: Add an error message here.
910 1
            pass
911
        return del_flows
912
913
    def execute_undeploy(self, evcs: list[EVC], remove_flows):
914
        """Process changes needed to commit an undeploy"""
915
        send_flow_mods_event(
916 1
            self.controller,
917 1
            remove_flows,
918
            'delete'
919
        )
920
921 1
        for evc in evcs:
922
            evc.current_path.make_vlans_available(self.controller)
923 1
            evc.failover_path.make_vlans_available(self.controller)
924 1
            evc.current_path = Path([])
925 1
            evc.failover_path = Path([])
926 1
            evc.deactivate()
927 1
            emit_event(
928 1
                self.controller,
929
                "need_redeploy",
930 1
                content={"evc_id": evc.id}
931 1
            )
932 1
            log.info(f"{evc} scheduled for redeploy")
933 1
934 1
    # pylint: disable=too-many-branches
935 1
    # pylint: disable=too-many-locals
936
    def handle_link_down(self, event):
937
        """Change circuit when link is down or under_mantenance."""
938 1
        link = event.content["link"]
939 1
        log.info("Event handle_link_down %s", link)
940
941
        with ExitStack() as exit_stack:
942
            exit_stack.enter_context(self.multi_evc_lock)
943 1
            swap_to_failover = list[EVC]()
944
            undeploy = list[EVC]()
945
            clear_failover = list[EVC]()
946
            clear_old_path = list[EVC]()
947
            evc_dict = dict[str, EVC]()
948
            evcs_to_update = list[EVC]()
949
950 1
            flow_modifications = defaultdict[str, dict[str, dict[list]]](dict)
951 1
            event_contents = defaultdict[str, dict[str, dict]](dict)
952
953
            for evc in self.get_evcs_by_svc_level():
954
                evc_dict[evc.id] = evc
955 1
                with ExitStack() as sub_stack:
956
                    sub_stack.enter_context(evc.lock)
957 1
                    if all((
958 1
                        evc.is_affected_by_link(link),
959 1
                        evc.failover_path,
960 1
                        not evc.is_failover_path_affected_by_link(link)
961 1
                    )):
962 1
                        swap_to_failover.append(evc)
963 1
                    elif all((
964 1
                        evc.is_affected_by_link(link),
965 1
                        not evc.failover_path or
966 1
                        evc.is_failover_path_affected_by_link(link)
967
                    )):
968
                        undeploy.append(evc)
969 1
                    elif all((
970
                        not evc.is_affected_by_link(link),
971
                        evc.failover_path,
972 1
                        evc.is_failover_path_affected_by_link(link)
973
                    )):
974
                        clear_failover.append(evc)
975
                    else:
976
                        continue
977
978
                    exit_stack.push(sub_stack.pop_all())
979
980 1
            # Swap from current path to failover path
981 1
982 1
            for evc in swap_to_failover:
983
                new_flows = self.prepare_swap_to_failover(evc)
984
                if new_flows:
985
                    flow_modifications[evc.id]["swap_to_failover"] =\
986
                        new_flows
987 1
                    event_contents[evc.id]["swap_to_failover"] =\
988 1
                        map_evc_event_content(
989 1
                            evc,
990 1
                            flows=deepcopy(new_flows)
991 1
                    )
992
                    clear_old_path.append(evc)
993
                else:
994 1
                    del flow_modifications[evc.id]
995 1
                    del event_contents[evc.id]
996
                    undeploy.append(evc)
997
998
            for evc in clear_failover:
999 1
                evc.old_path = evc.failover_path
1000
                evc.failover_path = Path([])
1001 1
                clear_old_path.append(evc)
1002 1
1003 1
            # Clear out old flows
1004 1
1005 1
            for evc in clear_old_path:
1006
                del_flows = self.prepare_clear_old_path(evc)
1007
                if del_flows:
1008 1
                    flow_modifications[evc.id]["clear_old_path"] = del_flows
1009
                    event_contents[evc.id]["clear_old_path"] = map_evc_event_content(
1010 1
                        evc,
1011 1
                        removed_flows=deepcopy(del_flows)
1012 1
                    )
1013 1
                else:
1014
                    if "swap_to_failover" in flow_modifications[evc.id]:
1015
                        evc.failover_path = evc.current_path
1016 1
                        evc.current_path = evc.old_path
1017 1
                    else:
1018 1
                        evc.failover_path = evc.old_path
1019
                    evc.old_path = Path([])
1020 1
                    del flow_modifications[evc.id]
1021 1
                    del event_contents[evc.id]
1022 1
                    undeploy.append(evc)
1023
1024 1
            swap_to_failover_flows = {}
1025 1
            swap_to_failover_event_contents = {}
1026
1027
            clear_old_path_flows = {}
1028
            clear_old_path_event_contents = {}
1029 1
1030
            clear_old_path_reservations = list[EVC]()
1031 1
1032 1
            for modified_evc_id in flow_modifications:
1033 1
                evc = evc_dict[modified_evc_id]
1034
                if "swap_to_failover" in flow_modifications[modified_evc_id]:
1035 1
                    swap_to_failover_flows = merge_flow_dicts(
1036 1
                        swap_to_failover_flows,
1037 1
                        flow_modifications[modified_evc_id]["swap_to_failover"]
1038 1
                    )
1039 1
                    swap_to_failover_event_contents[modified_evc_id] =\
1040
                        event_contents[modified_evc_id]["swap_to_failover"]
1041 1
                if "clear_old_path" in flow_modifications[modified_evc_id]:
1042
                    clear_old_path_flows = merge_flow_dicts(
1043
                        clear_old_path_flows,
1044
                        flow_modifications[modified_evc_id]["clear_old_path"]
1045
                    )
1046 1
                    clear_old_path_event_contents[modified_evc_id] =\
1047 1
                        event_contents[modified_evc_id]["clear_old_path"]
1048
                    clear_old_path_reservations.append(evc)
1049
                evcs_to_update.append(evc)
1050 1
1051 1
            if swap_to_failover_flows:
1052 1
                self.execute_swap_to_failover(
1053 1
                    swap_to_failover_event_contents,
1054 1
                    swap_to_failover_flows
1055 1
                )
1056
1057 1
            if clear_old_path_flows:
1058 1
                self.execute_clear_old_path(
1059 1
                    clear_old_path_reservations,
1060 1
                    clear_old_path_event_contents,
1061
                    clear_old_path_flows
1062
                )
1063
1064
            # Handle redeploys
1065
            # This uses a separate syncing mechanism here, so don't push to DB.
1066
            # TODO: replace with bulk update mechanism
1067
1068 1
            undeploy_flows = {}
1069 1
            undeploy_ready = list[EVC]()
1070
1071
            
1072
            # Just undeploy the evc,
1073
            # then call the redeploy process later on?
1074
1075
            for evc in undeploy:
1076 1
                del_flows = self.prepare_undeploy(evc)
1077 1
                if del_flows:
1078
                    undeploy_flows = merge_flow_dicts(
1079
                        undeploy_flows,
1080
                        del_flows
1081 1
                    )
1082
                    undeploy_ready.append(evc)
1083 1
1084 1
            if undeploy_ready:
1085 1
                self.execute_undeploy(undeploy_ready, undeploy_flows)
1086 1
                evcs_to_update.extend(undeploy_ready)
1087
1088 1
            # Push update to DB
1089
1090 1
            if evcs_to_update:
1091 1
                self.mongo_controller.update_evcs(
1092
                    [evc.as_dict() for evc in evcs_to_update]
1093 1
                )
1094 1
1095 1
    @listen_to("kytos/mef_eline.need_redeploy")
1096 1
    def on_evc_need_redeploy(self, event):
1097
        """Redeploy evcs that need to be redeployed."""
1098
        self.handle_evc_need_redeploy(event)
1099
1100 1
    def handle_evc_need_redeploy(self, event):
1101 1
        """Redeploy evcs that need to be redeployed."""
1102 1
        evc = self.circuits.get(event.content["evc_id"])
1103 1
        if evc is None:
1104 1
            return
1105 1
        with evc.lock:
1106 1
            if not evc.is_enabled() or evc.is_active():
1107 1
                return
1108 1
            result = evc.deploy_to_path()
1109 1
            # if result:
1110 1
            #     evc.setup_failover_path()
1111
        event_name = "error_redeploy_link_down"
1112 1
        if result:
1113
            log.info(f"{evc} redeployed")
1114 1
            event_name = "redeployed_link_down"
1115 1
        emit_event(self.controller, event_name,
1116 1
                   content=map_evc_event_content(evc))
1117
1118 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1119
    def on_evc_affected_by_link_down(self, event):
1120 1
        """Change circuit when link is down or under_mantenance."""
1121 1
        self.handle_evc_affected_by_link_down(event)
1122
1123 1
    def handle_evc_affected_by_link_down(self, event):
1124 1
        """Change circuit when link is down or under_mantenance."""
1125 1
        evc = self.circuits.get(event.content["evc_id"])
1126 1
        link = event.content['link']
1127 1
        if not evc:
1128 1
            return
1129
        with evc.lock:
1130
            if not evc.is_affected_by_link(link):
1131
                return
1132 1
            result = evc.handle_link_down()
1133 1
        event_name = "error_redeploy_link_down"
1134 1
        if result:
1135 1
            log.info(f"{evc} redeployed due to link down {link.id}")
1136
            event_name = "redeployed_link_down"
1137 1
        emit_event(self.controller, event_name,
1138 1
                   content=map_evc_event_content(evc))
1139 1
1140 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1141
    def on_evc_deployed(self, event):
1142
        """Handle EVC deployed|redeployed_link_down."""
1143 1
        self.handle_evc_deployed(event)
1144 1
1145
    def handle_evc_deployed(self, event):
1146 1
        """Setup failover path on evc deployed."""
1147
        evc = self.circuits.get(event.content["evc_id"])
1148
        if not evc:
1149
            return
1150
        evc.try_setup_failover_path()
1151
1152
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
1153 1
    def on_cleanup_evcs_old_path(self, event):
1154 1
        """Handle cleanup evcs old path."""
1155 1
        self.handle_cleanup_evcs_old_path(event)
1156
1157
    def handle_cleanup_evcs_old_path(self, event):
1158 1
        """Handle cleanup evcs old path."""
1159 1
        evcs = event.content.get("evcs", [])
1160 1
        event_contents: dict[str, dict] = defaultdict(list)
1161 1
        total_flows = {}
1162 1
        for evc in evcs:
1163 1
            if not evc.old_path:
1164 1
                continue
1165 1
            with evc.lock:
1166 1
                removed_flows = {}
1167
                try:
1168 1
                    nni_flows = prepare_delete_flow(
1169
                        evc._prepare_nni_flows(evc.old_path)
1170
                    )
1171
                    uni_flows = prepare_delete_flow(
1172
                        evc._prepare_uni_flows(evc.old_path, skip_in=True)
1173
                    )
1174 1
                    removed_flows = merge_flow_dicts(
1175
                        nni_flows, uni_flows
1176 1
                    )
1177 1
                # pylint: disable=broad-except
1178 1
                except Exception:
1179 1
                    err = traceback.format_exc().replace("\n", ", ")
1180 1
                    log.error(f"Fail to remove {evc} old_path: {err}")
1181
                    continue
1182
                if removed_flows:
1183 1
                    total_flows = merge_flow_dicts(total_flows, removed_flows)
1184 1
                    content = map_evc_event_content(
1185
                        evc,
1186 1
                        removed_flows=deepcopy(removed_flows),
1187 1
                        current_path=evc.current_path.as_dict(),
1188 1
                    )
1189 1
                    event_contents[evc.id] = content
1190 1
                    evc.old_path = Path([])
1191 1
        if event_contents:
1192
            send_flow_mods_event(self.controller, total_flows, 'delete')
1193
            emit_event(self.controller, "failover_old_path",
1194 1
                       content=event_contents)
1195 1
1196 1
    @listen_to("kytos/topology.topology_loaded")
1197 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1198 1
        """Load EVCs once the topology is available."""
1199
        self.load_all_evcs()
1200
1201
    def load_all_evcs(self):
1202
        """Try to load all EVCs on startup."""
1203
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1204
        for circuit_id, circuit in circuits:
1205
            if circuit_id not in self.circuits:
1206
                self._load_evc(circuit)
1207
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1208
                   timeout=1)
1209
1210
    def _load_evc(self, circuit_dict):
1211
        """Load one EVC from mongodb to memory."""
1212
        try:
1213
            evc = self._evc_from_dict(circuit_dict)
1214
        except (ValueError, KytosTagError) as exception:
1215
            log.error(
1216
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1217
            )
1218
            return None
1219
        if evc.archived:
1220
            return None
1221
1222
        self.circuits.setdefault(evc.id, evc)
1223
        self.sched.add(evc)
1224
        return evc
1225
1226
    @listen_to("kytos/flow_manager.flow.error")
1227
    def on_flow_mod_error(self, event):
1228
        """Handle flow mod errors related to an EVC."""
1229
        self.handle_flow_mod_error(event)
1230
1231
    def handle_flow_mod_error(self, event):
1232
        """Handle flow mod errors related to an EVC."""
1233
        flow = event.content["flow"]
1234
        command = event.content.get("error_command")
1235
        if command != "add":
1236
            return
1237
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1238
        if evc:
1239
            with evc.lock:
1240
                evc.remove_current_flows(sync=False)
1241
                evc.remove_failover_flows(sync=True)
1242
1243
    def _evc_dict_with_instances(self, evc_dict):
1244
        """Convert some dict values to instance of EVC classes.
1245
1246
        This method will convert: [UNI, Link]
1247
        """
1248
        data = evc_dict.copy()  # Do not modify the original dict
1249
        for attribute, value in data.items():
1250
            # Get multiple attributes.
1251
            # Ex: uni_a, uni_z
1252
            if "uni" in attribute:
1253
                try:
1254
                    data[attribute] = self._uni_from_dict(value)
1255
                except ValueError as exception:
1256
                    result = "Error creating UNI: Invalid value"
1257
                    raise ValueError(result) from exception
1258
1259
            if attribute == "circuit_scheduler":
1260
                data[attribute] = []
1261
                for schedule in value:
1262
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1263
1264
            # Get multiple attributes.
1265
            # Ex: primary_links,
1266
            #     backup_links,
1267
            #     current_links_cache,
1268
            #     primary_links_cache,
1269
            #     backup_links_cache
1270
            if "links" in attribute:
1271
                data[attribute] = [
1272
                    self._link_from_dict(link, attribute) for link in value
1273
                ]
1274
1275
            # Ex: current_path,
1276
            #     primary_path,
1277
            #     backup_path
1278
            if "path" in attribute and attribute != "dynamic_backup_path":
1279
                data[attribute] = Path(
1280
                    [self._link_from_dict(link, attribute) for link in value]
1281
                )
1282
1283
        return data
1284
1285
    def _evc_from_dict(self, evc_dict):
1286
        data = self._evc_dict_with_instances(evc_dict)
1287
        data["table_group"] = self.table_group
1288
        return EVC(self.controller, **data)
1289
1290
    def _uni_from_dict(self, uni_dict):
1291
        """Return a UNI object from python dict."""
1292
        if uni_dict is None:
1293
            return False
1294
1295
        interface_id = uni_dict.get("interface_id")
1296
        interface = self.controller.get_interface_by_id(interface_id)
1297
        if interface is None:
1298
            result = (
1299
                "Error creating UNI:"
1300
                + f"Could not instantiate interface {interface_id}"
1301
            )
1302
            raise ValueError(result) from ValueError
1303
        tag_convert = {1: "vlan"}
1304
        tag_dict = uni_dict.get("tag", None)
1305
        if tag_dict:
1306
            tag_type = tag_dict.get("tag_type")
1307
            tag_type = tag_convert.get(tag_type, tag_type)
1308
            tag_value = tag_dict.get("value")
1309
            if isinstance(tag_value, list):
1310
                tag_value = get_tag_ranges(tag_value)
1311
                mask_list = get_vlan_tags_and_masks(tag_value)
1312
                tag = TAGRange(tag_type, tag_value, mask_list)
1313
            else:
1314
                tag = TAG(tag_type, tag_value)
1315
        else:
1316
            tag = None
1317
        uni = UNI(interface, tag)
1318
        return uni
1319
1320
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1321
        """Return a Link object from python dict."""
1322
        id_a = link_dict.get("endpoint_a").get("id")
1323
        id_b = link_dict.get("endpoint_b").get("id")
1324
1325
        endpoint_a = self.controller.get_interface_by_id(id_a)
1326
        endpoint_b = self.controller.get_interface_by_id(id_b)
1327
        if not endpoint_a:
1328
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1329
            raise ValueError(error_msg)
1330
        if not endpoint_b:
1331
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1332
            raise ValueError(error_msg)
1333
1334
        link = Link(endpoint_a, endpoint_b)
1335
        allowed_paths = {"current_path", "failover_path"}
1336
        if "metadata" in link_dict and attribute in allowed_paths:
1337
            link.extend_metadata(link_dict.get("metadata"))
1338
1339
        s_vlan = link.get_metadata("s_vlan")
1340
        if s_vlan:
1341
            tag = TAG.from_dict(s_vlan)
1342
            if tag is False:
1343
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1344
                raise ValueError(error_msg)
1345
            link.update_metadata("s_vlan", tag)
1346
        return link
1347
1348
    def _find_evc_by_schedule_id(self, schedule_id):
1349
        """
1350
        Find an EVC and CircuitSchedule based on schedule_id.
1351
1352
        :param schedule_id: Schedule ID
1353
        :return: EVC and Schedule
1354
        """
1355
        circuits = self._get_circuits_buffer()
1356
        found_schedule = None
1357
        evc = None
1358
1359
        # pylint: disable=unused-variable
1360
        for c_id, circuit in circuits.items():
1361
            for schedule in circuit.circuit_scheduler:
1362
                if schedule.id == schedule_id:
1363
                    found_schedule = schedule
1364
                    evc = circuit
1365
                    break
1366
            if found_schedule:
1367
                break
1368
        return evc, found_schedule
1369
1370
    def _get_circuits_buffer(self):
1371
        """
1372
        Return the circuit buffer.
1373
1374
        If the buffer is empty, try to load data from mongodb.
1375
        """
1376
        if not self.circuits:
1377
            # Load circuits from mongodb to buffer
1378
            circuits = self.mongo_controller.get_circuits()['circuits']
1379
            for c_id, circuit in circuits.items():
1380
                evc = self._evc_from_dict(circuit)
1381
                self.circuits[c_id] = evc
1382
        return self.circuits
1383
1384
    # pylint: disable=attribute-defined-outside-init
1385
    @alisten_to("kytos/of_multi_table.enable_table")
1386
    async def on_table_enabled(self, event):
1387
        """Handle a recently table enabled."""
1388
        table_group = event.content.get("mef_eline", None)
1389
        if not table_group:
1390
            return
1391
        for group in table_group:
1392
            if group not in settings.TABLE_GROUP_ALLOWED:
1393
                log.error(f'The table group "{group}" is not allowed for '
1394
                          f'mef_eline. Allowed table groups are '
1395
                          f'{settings.TABLE_GROUP_ALLOWED}')
1396
                return
1397
        self.table_group.update(table_group)
1398
        content = {"group_table": self.table_group}
1399
        name = "kytos/mef_eline.enable_table"
1400
        await aemit_event(self.controller, name, content)
1401