Test Failed
Pull Request — master (#653)
by
unknown
04:23
created

build.main.Main.handle_link_down()   F

Complexity

Conditions 14

Size

Total Lines 80
Code Lines 52

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 24.3359

Importance

Changes 0
Metric Value
cc 14
eloc 52
nop 2
dl 0
loc 80
ccs 35
cts 56
cp 0.625
crap 24.3359
rs 3.6
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like build.main.Main.handle_link_down() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from contextlib import ExitStack
11 1
from copy import deepcopy
12 1
from threading import Lock
13 1
from typing import Optional
14
15 1
from pydantic import ValidationError
16
17 1
from kytos.core import KytosNApp, log, rest
18 1
from kytos.core.events import KytosEvent
19 1
from kytos.core.exceptions import KytosTagError
20 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
21
                                validate_openapi)
22 1
from kytos.core.interface import TAG, UNI, TAGRange
23 1
from kytos.core.link import Link
24 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
25
                                 get_json_or_400)
26 1
from kytos.core.tag_ranges import get_tag_ranges
27 1
from napps.kytos.mef_eline import controllers, settings
28 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
29
                                              DuplicatedNoTagUNI,
30
                                              FlowModException, InvalidPath)
31 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
32
                                          Path)
33 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
34 1
from napps.kytos.mef_eline.utils import (_does_uni_affect_evc, aemit_event,
35
                                         check_disabled_component, emit_event,
36
                                         get_vlan_tags_and_masks,
37
                                         map_evc_event_content,
38
                                         merge_flow_dicts, prepare_delete_flow,
39
                                         send_flow_mods_http)
40
41
42
# pylint: disable=too-many-public-methods
43 1
class Main(KytosNApp):
44
    """Main class of amlight/mef_eline NApp.
45
46
    This class is the entry point for this napp.
47
    """
48
49 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
50
51 1
    def setup(self):
52
        """Replace the '__init__' method for the KytosNApp subclass.
53
54
        The setup method is automatically called by the controller when your
55
        application is loaded.
56
57
        So, if you have any setup routine, insert it here.
58
        """
59
        # object used to scheduler circuit events
60 1
        self.sched = Scheduler()
61
62
        # object to save and load circuits
63 1
        self.mongo_controller = self.get_eline_controller()
64 1
        self.mongo_controller.bootstrap_indexes()
65
66
        # set the controller that will manager the dynamic paths
67 1
        DynamicPathManager.set_controller(self.controller)
68
69
        # dictionary of EVCs created. It acts as a circuit buffer.
70
        # Every create/update/delete must be synced to mongodb.
71 1
        self.circuits = dict[str, EVC]()
72
73 1
        self._intf_events = defaultdict(dict)
74 1
        self._lock_interfaces = defaultdict(Lock)
75 1
        self.table_group = {"epl": 0, "evpl": 0}
76 1
        self._lock = Lock()
77 1
        self.multi_evc_lock = Lock()
78 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
79
80 1
        self.load_all_evcs()
81 1
        self._topology_updated_at = None
82
83 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list[EVC]:
84
        """Get circuits sorted by desc service level and asc creation_time.
85
86
        In the future, as more ops are offloaded it should be get from the DB.
87
        """
88 1
        if enable_filter:
89 1
            return sorted(
90
                          [circuit for circuit in self.circuits.values()
91
                           if circuit.is_enabled()],
92
                          key=lambda x: (-x.service_level, x.creation_time),
93
            )
94 1
        return sorted(self.circuits.values(),
95
                      key=lambda x: (-x.service_level, x.creation_time))
96
97 1
    @staticmethod
98 1
    def get_eline_controller():
99
        """Return the ELineController instance."""
100
        return controllers.ELineController()
101
102 1
    def execute(self):
103
        """Execute once when the napp is running."""
104 1
        if self._lock.locked():
105 1
            return
106 1
        log.debug("Starting consistency routine")
107 1
        with self._lock:
108 1
            self.execute_consistency()
109 1
        log.debug("Finished consistency routine")
110
111 1
    def should_be_checked(self, circuit: EVC):
112
        "Verify if the circuit meets the necessary conditions to be checked"
113
        # pylint: disable=too-many-boolean-expressions
114 1
        if (
115
                circuit.is_enabled()
116
                and not circuit.is_active()
117
                and not circuit.has_recent_removed_flow()
118
                and not circuit.is_recent_updated()
119
                and circuit.are_unis_active()
120
                # if a inter-switch EVC does not have current_path, it does not
121
                # make sense to run sdntrace on it
122
                and (circuit.is_intra_switch() or circuit.current_path)
123
                ):
124
            return True
125 1
        return False
126
127
    def execute_consistency(self):
128 1
        """Execute consistency routine."""
129
        with ExitStack() as exit_stack:
130 1
131 1
            check_if_inactive_is_deployed = set[str]()
132 1
            check_if_active_is_working = set[str]()
133 1
            circuits_to_check = list[EVC]()
134 1
            undeploy = list[EVC]()
135 1
            evcs_to_update = dict[str, EVC]()
136 1
137 1
            for circuit in self.get_evcs_by_svc_level(enable_filter=False):
138 1
                with ExitStack() as sub_stack:
139 1
                    if not circuit.lock.acquire(blocking=False):
140 1
                        continue
141 1
                    sub_stack.push(circuit.lock)
142 1
                    circuit.try_setup_failover_path(warn_if_not_path=False)
143 1
                    if all((
144
                        circuit.is_enabled(),
145 1
                        not circuit.is_active(),
146 1
                        not circuit.has_recent_removed_flow(),
147 1
                        not circuit.is_recent_updated(),
148 1
                        circuit.are_unis_active(),
149 1
                        circuit.is_intra_switch() or circuit.current_path,
150
                    )):
151 1
                        check_if_inactive_is_deployed.add(circuit.id)
152
                    elif all((
153
                        circuit.is_enabled(),
154
                        circuit.is_active(),
155
                        not circuit.has_recent_removed_flow(),
156
                        not circuit.is_recent_updated(),
157 1
                        circuit.are_unis_active(),
158 1
                    )):
159
                        check_if_active_is_working.add(circuit.id)
160
                    else:
161
                        continue
162
163
                    circuits_to_check.append(circuit)
164 1
                    exit_stack.push(sub_stack.pop_all())
165 1
166 1
            circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
167 1
            for circuit in circuits_to_check:
168 1
                is_checked = circuits_checked.get(circuit.id)
169
                if not is_checked:
170 1
                    circuit.execution_rounds += 1
171 1
                    if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
172
                        log.info(
173 1
                            f"{circuit} deployed but trace failed - redeploy"
174 1
                        )
175
                        undeploy.append(circuit)
176
                    continue
177
178
                circuit.execution_rounds = 0
179
                log.debug(f"{circuit} trace passed!")
180
                if circuit.id in check_if_inactive_is_deployed:
181
                    log.info(
182 1
                        f"{circuit} deployed but inactive - activating"
183 1
                    )
184 1
                    circuit.activate()
185 1
                    evcs_to_update[circuit.id] = circuit
186 1
                # if circuit.id in check_if_active_is_working:
187 1
                #     pass
188
189 1
            # NOTE: Perhaps use a swap failover instead?
190 1
            if undeploy:
191 1
                success, failure = self.execute_undeploy(undeploy)
192 1
193 1
                for evc in success:
194 1
                    evc.execution_rounds = 0
195 1
196
                evcs_to_update.update((evc.id, evc) for evc in success)
197
198
                if failure:
199
                    log.error(
200 1
                        f"Failed to redeploy on failed trace for {failure}"
201
                    )
202 1
203 1
            if evcs_to_update:
204
                evc_dicts = list[dict]()
205 1
                update_time = now()
206 1
                for evc in evcs_to_update.values():
207
                    evc.updated_at = update_time
208 1
                    evc_dicts.append(evc.as_dict())
209 1
                self.mongo_controller.update_evcs(
210 1
                    evc_dicts
211 1
                )
212 1
213 1
    def shutdown(self):
214 1
        """Execute when your napp is unloaded.
215 1
216 1
        If you have some cleanup procedure, insert it here.
217 1
        """
218
219
    @rest("/v2/evc/", methods=["GET"])
220 1
    def list_circuits(self, request: Request) -> JSONResponse:
221 1
        """Endpoint to return circuits stored.
222 1
223
        archive query arg if defined (not null) will be filtered
224
        accordingly, by default only non archived evcs will be listed
225
        """
226
        log.debug("list_circuits /v2/evc")
227
        args = request.query_params
228
        archived = args.get("archived", "false").lower()
229
        args = {k: v for k, v in args.items() if k not in {"archived"}}
230
        circuits = self.mongo_controller.get_circuits(archived=archived,
231
                                                      metadata=args)
232
        circuits = circuits['circuits']
233
        return JSONResponse(circuits)
234
235
    @rest("/v2/evc/schedule", methods=["GET"])
236
    def list_schedules(self, _request: Request) -> JSONResponse:
237
        """Endpoint to return all schedules stored for all circuits.
238
239
        Return a JSON with the following template:
240
        [{"schedule_id": <schedule_id>,
241
         "circuit_id": <circuit_id>,
242
         "schedule": <schedule object>}]
243
        """
244
        log.debug("list_schedules /v2/evc/schedule")
245
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
246
        if not circuits:
247 1
            result = {}
248 1
            status = 200
249
            return JSONResponse(result, status_code=status)
250 1
251 1
        result = []
252 1
        status = 200
253 1
        for circuit in circuits:
254 1
            circuit_scheduler = circuit.get("circuit_scheduler")
255 1
            if circuit_scheduler:
256 1
                for scheduler in circuit_scheduler:
257 1
                    value = {
258 1
                        "schedule_id": scheduler.get("id"),
259 1
                        "circuit_id": circuit.get("id"),
260
                        "schedule": scheduler,
261
                    }
262
                    result.append(value)
263
264 1
        log.debug("list_schedules result %s %s", result, status)
265 1
        return JSONResponse(result, status_code=status)
266 1
267
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
268
    def get_circuit(self, request: Request) -> JSONResponse:
269
        """Endpoint to return a circuit based on id."""
270
        circuit_id = request.path_params["circuit_id"]
271 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
272 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
273
        if not circuit:
274
            result = f"circuit_id {circuit_id} not found"
275
            log.debug("get_circuit result %s %s", result, 404)
276 1
            raise HTTPException(404, detail=result)
277 1
        status = 200
278 1
        log.debug("get_circuit result %s %s", circuit, status)
279
        return JSONResponse(circuit, status_code=status)
280
281
    # pylint: disable=too-many-branches, too-many-statements
282
    @rest("/v2/evc/", methods=["POST"])
283 1
    @validate_openapi(spec)
284 1
    def create_circuit(self, request: Request) -> JSONResponse:
285
        """Try to create a new circuit.
286
287
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
288
        UNI_Z's requested C-VID are available from the interfaces' pools. This
289 1
        is checked when creating the UNI object.
290 1
291 1
        Then, E-Line NApp requests a primary and a backup path to the
292
        Pathfinder NApp using the attributes primary_links and backup_links
293 1
        submitted via REST
294 1
295 1
        # For each link composing paths in #3:
296 1
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
297
        #  - Using the S-VID obtained, generate abstract flow entries to be
298 1
        #    sent to FlowManager
299 1
300
        Push abstract flow entries to FlowManager and FlowManager pushes
301
        OpenFlow entries to datapaths
302
303
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
304 1
        creation
305 1
306 1
        Finnaly, notify user of the status of its request.
307 1
        """
308
        # Try to create the circuit object
309
        log.debug("create_circuit /v2/evc/")
310 1
        data = get_json_or_400(request, self.controller.loop)
311 1
312
        try:
313
            evc = self._evc_from_dict(data)
314
        except (ValueError, KytosTagError) as exception:
315
            log.debug("create_circuit result %s %s", exception, 400)
316 1
            raise HTTPException(400, detail=str(exception)) from exception
317
        try:
318
            check_disabled_component(evc.uni_a, evc.uni_z)
319 1
        except DisabledSwitch as exception:
320
            log.debug("create_circuit result %s %s", exception, 409)
321
            raise HTTPException(
322 1
                    409,
323 1
                    detail=f"Path is not valid: {exception}"
324 1
                ) from exception
325 1
326
        if evc.primary_path:
327
            try:
328 1
                evc.primary_path.is_valid(
329 1
                    evc.uni_a.interface.switch,
330 1
                    evc.uni_z.interface.switch,
331 1
                    bool(evc.circuit_scheduler),
332
                )
333 1
            except InvalidPath as exception:
334
                raise HTTPException(
335 1
                    400,
336 1
                    detail=f"primary_path is not valid: {exception}"
337 1
                ) from exception
338 1
        if evc.backup_path:
339 1
            try:
340 1
                evc.backup_path.is_valid(
341 1
                    evc.uni_a.interface.switch,
342 1
                    evc.uni_z.interface.switch,
343 1
                    bool(evc.circuit_scheduler),
344 1
                )
345
            except InvalidPath as exception:
346 1
                raise HTTPException(
347 1
                    400,
348
                    detail=f"backup_path is not valid: {exception}"
349
                ) from exception
350
351 1
        if not evc._tag_lists_equal():
352
            detail = "UNI_A and UNI_Z tag lists should be the same."
353 1
            raise HTTPException(400, detail=detail)
354 1
355 1
        try:
356 1
            evc._validate_has_primary_or_dynamic()
357 1
        except ValueError as exception:
358
            raise HTTPException(400, detail=str(exception)) from exception
359 1
360 1
        try:
361 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
362
        except DuplicatedNoTagUNI as exception:
363
            log.debug("create_circuit result %s %s", exception, 409)
364
            raise HTTPException(409, detail=str(exception)) from exception
365
366
        try:
367 1
            self._use_uni_tags(evc)
368 1
        except KytosTagError as exception:
369 1
            raise HTTPException(400, detail=str(exception)) from exception
370 1
371 1
        # save circuit
372 1
        try:
373 1
            evc.sync()
374 1
        except ValidationError as exception:
375 1
            raise HTTPException(400, detail=str(exception)) from exception
376
377 1
        # store circuit in dictionary
378 1
        self.circuits[evc.id] = evc
379 1
380
        # Schedule the circuit deploy
381
        self.sched.add(evc)
382
383 1
        # Circuit has no schedule, deploy now
384 1
        deployed = False
385 1
        if not evc.circuit_scheduler:
386 1
            with evc.lock:
387 1
                deployed = evc.deploy()
388
389
        # Notify users
390 1
        result = {"circuit_id": evc.id, "deployed": deployed}
391 1
        status = 201
392 1
        log.debug("create_circuit result %s %s", result, status)
393
        emit_event(self.controller, name="created",
394
                   content=map_evc_event_content(evc))
395
        return JSONResponse(result, status_code=status)
396 1
397 1
    @staticmethod
398
    def _use_uni_tags(evc):
399
        uni_a = evc.uni_a
400
        evc._use_uni_vlan(uni_a)
401
        try:
402
            uni_z = evc.uni_z
403
            evc._use_uni_vlan(uni_z)
404
        except KytosTagError as err:
405
            evc.make_uni_vlan_available(uni_a)
406 1
            raise err
407 1
408 1
    @listen_to('kytos/flow_manager.flow.removed')
409 1
    def on_flow_delete(self, event):
410 1
        """Capture delete messages to keep track when flows got removed."""
411 1
        self.handle_flow_delete(event)
412 1
413 1
    def handle_flow_delete(self, event):
414 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
415
        flow = event.content["flow"]
416 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
417 1
        if evc:
418
            log.debug("Flow removed in EVC %s", evc.id)
419 1
            evc.set_flow_removed_at()
420
421 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
422 1
    @validate_openapi(spec)
423
    def update(self, request: Request) -> JSONResponse:
424
        """Update a circuit based on payload.
425
426
        The EVC attributes (creation_time, active, current_path,
427
        failover_path, _id, archived) can't be updated.
428 1
        """
429 1
        data = get_json_or_400(request, self.controller.loop)
430 1
        circuit_id = request.path_params["circuit_id"]
431 1
        log.debug("update /v2/evc/%s", circuit_id)
432 1
        try:
433 1
            evc = self.circuits[circuit_id]
434 1
        except KeyError:
435 1
            result = f"circuit_id {circuit_id} not found"
436 1
            log.debug("update result %s %s", result, 404)
437
            raise HTTPException(404, detail=result) from KeyError
438 1
439 1
        with evc.lock:
440 1
            try:
441 1
                updated_data = self._evc_dict_with_instances(data)
442 1
                self._check_no_tag_duplication(
443 1
                    circuit_id, updated_data.get("uni_a"),
444 1
                    updated_data.get("uni_z")
445 1
                )
446 1
                enable, redeploy = evc.update(**updated_data)
447 1
            except (ValueError, KytosTagError, ValidationError) as exception:
448 1
                log.debug("update result %s %s", exception, 400)
449
                raise HTTPException(400, detail=str(exception)) from exception
450
            except DuplicatedNoTagUNI as exception:
451
                log.debug("update result %s %s", exception, 409)
452
                raise HTTPException(409, detail=str(exception)) from exception
453 1
            except DisabledSwitch as exception:
454 1
                log.debug("update result %s %s", exception, 409)
455 1
                raise HTTPException(
456 1
                        409,
457
                        detail=f"Path is not valid: {exception}"
458 1
                    ) from exception
459
460 1
            redeployed = False
461 1
462
            if evc.is_active():
463 1
                if enable is False:  # disable if active
464 1
                    evc.remove()
465 1
                elif redeploy is not None:  # redeploy if active
466
                    evc.remove()
467
                    redeployed = evc.deploy()
468
            else:
469
                if enable is True:  # enable if inactive
470
                    redeployed = evc.deploy()
471
                elif evc.is_enabled() and redeploy:
472
                    evc.remove()
473
                    redeployed = evc.deploy()
474 1
475 1
            result = {evc.id: evc.as_dict(), 'redeployed': redeployed}
476 1
            status = 200
477
478 1
            log.debug("update result %s %s", result, status)
479 1
            emit_event(
480
                self.controller,
481 1
                "updated",
482
                content=map_evc_event_content(evc, **data)
483 1
            )
484 1
            return JSONResponse(result, status_code=status)
485 1
486 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
487 1
    def delete_circuit(self, request: Request) -> JSONResponse:
488 1
        """Remove a circuit.
489 1
490
        First, the flows are removed from the switches, and then the EVC is
491 1
        disabled.
492 1
        """
493 1
        circuit_id = request.path_params["circuit_id"]
494
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
495 1
        try:
496 1
            evc = self.circuits.pop(circuit_id)
497 1
        except KeyError:
498
            result = f"circuit_id {circuit_id} not found"
499 1
            log.debug("delete_circuit result %s %s", result, 404)
500 1
            raise HTTPException(404, detail=result) from KeyError
501 1
        log.info("Removing %s", evc)
502
503 1
        with evc.lock:
504 1
            if not evc.archived:
505 1
                evc.deactivate()
506 1
                evc.disable()
507
                self.sched.remove(evc)
508
                evc.remove_current_flows(sync=False)
509
                evc.remove_failover_flows(sync=False)
510
                evc.archive()
511 1
                evc.remove_uni_tags()
512 1
                evc.sync()
513 1
                emit_event(
514
                    self.controller, "deleted",
515 1
                    content=map_evc_event_content(evc)
516 1
                )
517 1
518
        log.info("EVC removed. %s", evc)
519 1
        result = {"response": f"Circuit {circuit_id} removed"}
520 1
        status = 200
521 1
        log.debug("delete_circuit result %s %s", result, status)
522 1
523
        return JSONResponse(result, status_code=status)
524
525
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
526 1
    def get_metadata(self, request: Request) -> JSONResponse:
527 1
        """Get metadata from an EVC."""
528 1
        circuit_id = request.path_params["circuit_id"]
529 1
        try:
530 1
            return (
531 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
532 1
            )
533
        except KeyError as error:
534 1
            raise HTTPException(
535 1
                404,
536 1
                detail=f"circuit_id {circuit_id} not found."
537
            ) from error
538 1
539 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
540
    @validate_openapi(spec)
541 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
542 1
        """Add metadata to a bulk of EVCs."""
543 1
        data = get_json_or_400(request, self.controller.loop)
544 1
        circuit_ids = data.pop("circuit_ids")
545 1
546 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
547
548
        fail_evcs = []
549
        for _id in circuit_ids:
550
            try:
551 1
                evc = self.circuits[_id]
552 1
                evc.extend_metadata(data)
553 1
            except KeyError:
554
                fail_evcs.append(_id)
555 1
556 1
        if fail_evcs:
557
            raise HTTPException(404, detail=fail_evcs)
558 1
        return JSONResponse("Operation successful", status_code=201)
559 1
560
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
561
    @validate_openapi(spec)
562 1
    def add_metadata(self, request: Request) -> JSONResponse:
563 1
        """Add metadata to an EVC."""
564
        circuit_id = request.path_params["circuit_id"]
565
        metadata = get_json_or_400(request, self.controller.loop)
566 1
        if not isinstance(metadata, dict):
567 1
            raise HTTPException(400, f"Invalid metadata value: {metadata}")
568 1
        try:
569 1
            evc = self.circuits[circuit_id]
570 1
        except KeyError as error:
571
            raise HTTPException(
572
                404,
573
                detail=f"circuit_id {circuit_id} not found."
574 1
            ) from error
575 1
576 1
        with evc.lock:
577 1
            evc.extend_metadata(metadata)
578
            evc.sync()
579
        return JSONResponse("Operation successful", status_code=201)
580
581 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
582 1
    @validate_openapi(spec)
583 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
584 1
        """Delete metada from a bulk of EVCs"""
585 1
        data = get_json_or_400(request, self.controller.loop)
586
        key = request.path_params["key"]
587 1
        circuit_ids = data.pop("circuit_ids")
588
        self.mongo_controller.update_evcs_metadata(
589
            circuit_ids, {key: ""}, "del"
590 1
        )
591
592 1
        fail_evcs = []
593
        for _id in circuit_ids:
594 1
            try:
595 1
                evc = self.circuits[_id]
596 1
                evc.remove_metadata(key)
597
            except KeyError:
598
                fail_evcs.append(_id)
599
600
        if fail_evcs:
601
            raise HTTPException(404, detail=fail_evcs)
602
        return JSONResponse("Operation successful")
603
604
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
605
    def delete_metadata(self, request: Request) -> JSONResponse:
606
        """Delete metadata from an EVC."""
607
        circuit_id = request.path_params["circuit_id"]
608
        key = request.path_params["key"]
609
        try:
610
            evc = self.circuits[circuit_id]
611
        except KeyError as error:
612 1
            raise HTTPException(
613 1
                404,
614 1
                detail=f"circuit_id {circuit_id} not found."
615 1
            ) from error
616
617
        with evc.lock:
618 1
            evc.remove_metadata(key)
619
            evc.sync()
620
        return JSONResponse("Operation successful")
621 1
622
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
623
    def redeploy(self, request: Request) -> JSONResponse:
624 1
        """Endpoint to force the redeployment of an EVC."""
625 1
        circuit_id = request.path_params["circuit_id"]
626 1
        try_avoid_same_s_vlan = request.query_params.get(
627 1
            "try_avoid_same_s_vlan", "true"
628
        )
629
        try_avoid_same_s_vlan = try_avoid_same_s_vlan.lower()
630 1
        if try_avoid_same_s_vlan not in {"true", "false"}:
631
            msg = "Parameter try_avoid_same_s_vlan has an invalid value."
632
            raise HTTPException(400, detail=msg)
633 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
634 1
        try:
635
            evc = self.circuits[circuit_id]
636
        except KeyError:
637 1
            raise HTTPException(
638
                404,
639
                detail=f"circuit_id {circuit_id} not found"
640 1
            ) from KeyError
641
        deployed = False
642
        with evc.lock:
643 1
            if evc.is_enabled():
644
                path_dict = evc.remove_current_flows(
645 1
                    sync=False,
646 1
                    return_path=try_avoid_same_s_vlan == "true"
647
                )
648 1
                evc.remove_failover_flows(sync=True)
649 1
                deployed = evc.deploy(path_dict)
650
        if deployed:
651 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
652 1
            status = 202
653 1
        else:
654
            result = {
655
                "response": f"Circuit {circuit_id} is disabled."
656
            }
657
            status = 409
658
659
        return JSONResponse(result, status_code=status)
660
661
    @rest("/v2/evc/schedule/", methods=["POST"])
662
    @validate_openapi(spec)
663
    def create_schedule(self, request: Request) -> JSONResponse:
664
        """
665
        Create a new schedule for a given circuit.
666 1
667 1
        This service do no check if there are conflicts with another schedule.
668 1
        Payload example:
669
            {
670
              "circuit_id":"aa:bb:cc",
671 1
              "schedule": {
672
                "date": "2019-08-07T14:52:10.967Z",
673
                "interval": "string",
674 1
                "frequency": "1 * * * *",
675 1
                "action": "create"
676 1
              }
677 1
            }
678
        """
679 1
        log.debug("create_schedule /v2/evc/schedule/")
680 1
        data = get_json_or_400(request, self.controller.loop)
681
        circuit_id = data["circuit_id"]
682 1
        schedule_data = data["schedule"]
683
684 1
        # Get EVC from circuits buffer
685
        circuits = self._get_circuits_buffer()
686
687 1
        # get the circuit
688
        evc = circuits.get(circuit_id)
689 1
690
        # get the circuit
691 1
        if not evc:
692
            result = f"circuit_id {circuit_id} not found"
693 1
            log.debug("create_schedule result %s %s", result, 404)
694 1
            raise HTTPException(404, detail=result)
695
696 1
        # new schedule from dict
697 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
698
699 1
        with evc.lock:
700 1
            # If there is no schedule, create the list
701
            if not evc.circuit_scheduler:
702
                evc.circuit_scheduler = []
703
704
            # Add the new schedule
705
            evc.circuit_scheduler.append(new_schedule)
706
707 1
            # Add schedule job
708 1
            self.sched.add_circuit_job(evc, new_schedule)
709 1
710
            # save circuit to mongodb
711
            evc.sync()
712 1
713 1
        result = new_schedule.as_dict()
714 1
        status = 201
715 1
716
        log.debug("create_schedule result %s %s", result, status)
717
        return JSONResponse(result, status_code=status)
718 1
719
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
720
    @validate_openapi(spec)
721 1
    def update_schedule(self, request: Request) -> JSONResponse:
722
        """Update a schedule.
723 1
724
        Change all attributes from the given schedule from a EVC circuit.
725 1
        The schedule ID is preserved as default.
726 1
        Payload example:
727
            {
728 1
              "date": "2019-08-07T14:52:10.967Z",
729 1
              "interval": "string",
730
              "frequency": "1 * * *",
731 1
              "action": "create"
732
            }
733
        """
734
        data = get_json_or_400(request, self.controller.loop)
735
        schedule_id = request.path_params["schedule_id"]
736
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
737
738
        # TODO: Potentially not thread safe
739
        # Try to find a circuit schedule
740
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
741
742
        # Can not modify circuits deleted and archived
743
        if not found_schedule:
744 1
            result = f"schedule_id {schedule_id} not found"
745 1
            log.debug("update_schedule result %s %s", result, 404)
746
            raise HTTPException(404, detail=result)
747 1
748
        new_schedule = CircuitSchedule.from_dict(data)
749 1
        new_schedule.id = found_schedule.id
750 1
751 1
        with evc.lock:
752 1
            # Remove the old schedule
753 1
            evc.circuit_scheduler.remove(found_schedule)
754 1
            # Append the modified schedule
755 1
            evc.circuit_scheduler.append(new_schedule)
756
757 1
            # Cancel all schedule jobs
758 1
            self.sched.cancel_job(found_schedule.id)
759
            # Add the new circuit schedule
760
            self.sched.add_circuit_job(evc, new_schedule)
761
            # Save EVC to mongodb
762 1
            evc.sync()
763
764 1
        result = new_schedule.as_dict()
765 1
        status = 200
766 1
767 1
        log.debug("update_schedule result %s %s", result, status)
768 1
        return JSONResponse(result, status_code=status)
769
770
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
771 1
    def delete_schedule(self, request: Request) -> JSONResponse:
772
        """Remove a circuit schedule.
773
774 1
        Remove the Schedule from EVC.
775
        Remove the Schedule from cron job.
776
        Save the EVC to the Storehouse.
777
        """
778
        schedule_id = request.path_params["schedule_id"]
779
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
780 1
        # TODO: This _find_evc_by_schedule, doesn't seem thread safe.
781
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
782
783
        # Can not modify circuits deleted and archived
784
        if not found_schedule:
785
            result = f"schedule_id {schedule_id} not found"
786
            log.debug("delete_schedule result %s %s", result, 404)
787
            raise HTTPException(404, detail=result)
788
789
        # TODO: Found_schedule could be removed by another thread,
790 1
        # before this lock is entered.
791 1
        with evc.lock:
792 1
            # Remove the old schedule
793
            evc.circuit_scheduler.remove(found_schedule)
794 1
795
            # Cancel all schedule jobs
796
            self.sched.cancel_job(found_schedule.id)
797
            # Save EVC to mongodb
798 1
            evc.sync()
799 1
800 1
        result = "Schedule removed"
801 1
        status = 200
802 1
803 1
        log.debug("delete_schedule result %s %s", result, status)
804 1
        return JSONResponse(result, status_code=status)
805 1
806 1
    def _check_no_tag_duplication(
807 1
        self,
808 1
        evc_id: str,
809 1
        uni_a: Optional[UNI] = None,
810 1
        uni_z: Optional[UNI] = None
811 1
    ):
812
        """Check if the given EVC has UNIs with no tag and if these are
813 1
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
814
        Args:
815
            evc (dict): EVC to be analyzed.
816
        """
817
818
        # No UNIs
819
        if not (uni_a or uni_z):
820
            return
821
822
        if (not (uni_a and not uni_a.user_tag) and
823
                not (uni_z and not uni_z.user_tag)):
824
            return
825 1
        for circuit in self.circuits.copy().values():
826
            if (not circuit.archived and circuit._id != evc_id):
827
                if uni_a and uni_a.user_tag is None:
828
                    circuit.check_no_tag_duplicate(uni_a)
829
                if uni_z and uni_z.user_tag is None:
830
                    circuit.check_no_tag_duplicate(uni_z)
831
832
    @listen_to("kytos/topology.link_up")
833
    def on_link_up(self, event):
834
        """Change circuit when link is up or end_maintenance."""
835
        self.handle_link_up(event)
836
837 1
    def handle_link_up(self, event):
838 1
        """Change circuit when link is up or end_maintenance."""
839
        log.info("Event handle_link_up %s", event.content["link"])
840
        for evc in self.get_evcs_by_svc_level():
841
            if evc.is_enabled() and not evc.archived:
842 1
                with evc.lock:
843
                    evc.handle_link_up(event.content["link"])
844
845
    # Possibly replace this with interruptions?
846
    @listen_to(
847
        '.*.switch.interface.(link_up|link_down|created|deleted)'
848
    )
849
    def on_interface_link_change(self, event: KytosEvent):
850
        """
851
        Handler for interface link_up and link_down events.
852
        """
853
        self.handle_on_interface_link_change(event)
854
855
    def handle_on_interface_link_change(self, event: KytosEvent):
856 1
        """
857
        Handler to sort interface events {link_(up, down), create, deleted}
858
859
        To avoid multiple database updated (link flap):
860
        Every interface is identfied and processed in parallel.
861
        Once an interface event is received a time is started.
862
        While time is running self._intf_events will be updated.
863 1
        After time has passed last received event will be processed.
864
        """
865
        iface = event.content.get("interface")
866
        with self._lock_interfaces[iface.id]:
867
            _now = event.timestamp
868 1
            # Return out of order events
869 1
            if (
870 1
                iface.id in self._intf_events
871 1
                and self._intf_events[iface.id]["event"].timestamp > _now
872
            ):
873 1
                return
874 1
            self._intf_events[iface.id].update({"event": event})
875 1
            if "last_acquired" in self._intf_events[iface.id]:
876 1
                return
877 1
            self._intf_events[iface.id].update({"last_acquired": now()})
878
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
879 1
        with self._lock_interfaces[iface.id]:
880
            event = self._intf_events[iface.id]["event"]
881 1
            self._intf_events[iface.id].pop('last_acquired', None)
882
            _, _, event_type = event.name.rpartition('.')
883 1
            if event_type in ('link_up', 'created'):
884 1
                self.handle_interface_link_up(iface)
885
            elif event_type in ('link_down', 'deleted'):
886
                self.handle_interface_link_down(iface)
887
888 1
    def handle_interface_link_up(self, interface):
889 1
        """
890 1
        Handler for interface link_up events
891 1
        """
892 1
        log.info("Event handle_interface_link_up %s", interface)
893
        for evc in self.get_evcs_by_svc_level():
894
            if _does_uni_affect_evc(evc, interface, "up"):
895
                with evc.lock:
896 1
                    evc.handle_interface_link_up(
897 1
                        interface
898 1
                    )
899 1
900
    def handle_interface_link_down(self, interface):
901 1
        """
902
        Handler for interface link_down events
903
        """
904
        log.info("Event handle_interface_link_down %s", interface)
905
        for evc in self.get_evcs_by_svc_level():
906
            if _does_uni_affect_evc(evc, interface, "down"):
907
                with evc.lock:
908
                    evc.handle_interface_link_down(
909
                        interface
910
                    )
911
912
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
913
    def on_link_down(self, event):
914
        """Change circuit when link is down or under_mantenance."""
915
        self.handle_link_down(event)
916
917 1
    def prepare_swap_to_failover_flow(self, evc: EVC):
918
        """Prepare an evc for switching to failover."""
919
        install_flows = {}
920
        try:
921
            install_flows = evc.get_failover_flows()
922
        # pylint: disable=broad-except
923
        except Exception:
924
            err = traceback.format_exc()
925 1
            log.error(
926
                "Ignore Failover path for "
927
                f"{evc} due to error: {err}"
928
            )
929
        return install_flows
930 1
931 1
    def prepare_swap_to_failover_event(self, evc: EVC, install_flow):
932 1
        """Prepare event contents for swap to failover."""
933 1
        return map_evc_event_content(
934
            evc,
935 1
            flows=deepcopy(install_flow)
936 1
        )
937 1
938 1
    def execute_swap_to_failover(
939 1
        self,
940
        evcs: list[EVC]
941 1
    ) -> tuple[list[EVC], list[EVC]]:
942
        """Process changes needed to commit a swap to failover."""
943 1
        event_contents = {}
944
        install_flows = {}
945 1
        swapped_evcs = list[EVC]()
946 1
        not_swapped_evcs = list[EVC]()
947
948
        for evc in evcs:
949
            install_flow = self.prepare_swap_to_failover_flow(evc)
950 1
            if install_flow:
951 1
                install_flows = merge_flow_dicts(install_flows, install_flow)
952 1
                event_contents[evc.id] =\
953 1
                    self.prepare_swap_to_failover_event(evc, install_flow)
954
                swapped_evcs.append(evc)
955
            else:
956
                not_swapped_evcs.append(evc)
957
958 1
        try:
959 1
            send_flow_mods_http(
960 1
                install_flows,
961 1
                "install"
962
            )
963 1
            for evc in swapped_evcs:
964
                temp_path = evc.current_path
965
                evc.current_path = evc.failover_path
966
                evc.failover_path = temp_path
967
            emit_event(
968
                self.controller, "failover_link_down",
969
                content=deepcopy(event_contents)
970
            )
971
            return swapped_evcs, not_swapped_evcs
972
        except FlowModException as exc:
973
            log.error(f"Fail to install failover flows for {evcs}: {exc}")
974
            return [], [*swapped_evcs, *not_swapped_evcs]
975
976
    def prepare_clear_failover_flow(self, evc: EVC):
977
        """Prepare an evc for clearing the old path."""
978
        del_flows = {}
979
        try:
980 1
            del_flows = prepare_delete_flow(
981
                merge_flow_dicts(
982 1
                    evc._prepare_uni_flows(evc.failover_path, skip_in=True),
983 1
                    evc._prepare_nni_flows(evc.failover_path)
984 1
                )
985
            )
986 1
        # pylint: disable=broad-except
987 1
        except Exception:
988 1
            err = traceback.format_exc()
989 1
            log.error(f"Fail to remove {evc} old_path: {err}")
990 1
        return del_flows
991
992 1
    def prepare_clear_failover_event(self, evc: EVC, delete_flow):
993
        """Prepare event contents for clearing failover."""
994 1
        return map_evc_event_content(
995 1
            evc,
996
            current_path=evc.current_path.as_dict(),
997
            removed_flows=deepcopy(delete_flow)
998
        )
999
1000 1
    def execute_clear_failover(
1001 1
        self,
1002 1
        evcs: list[EVC]
1003 1
    ) -> tuple[list[EVC], list[EVC]]:
1004 1
        """Process changes needed to commit clearing the failover path"""
1005 1
        event_contents = {}
1006 1
        delete_flows = {}
1007
        cleared_evcs = list[EVC]()
1008
        not_cleared_evcs = list[EVC]()
1009
1010
        for evc in evcs:
1011 1
            delete_flow = self.prepare_clear_failover_flow(evc)
1012 1
            if delete_flow:
1013 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
1014 1
                event_contents[evc.id] =\
1015
                    self.prepare_clear_failover_event(evc, delete_flow)
1016
                cleared_evcs.append(evc)
1017 1
            else:
1018
                not_cleared_evcs.append(evc)
1019 1
1020
        try:
1021 1
            send_flow_mods_http(
1022 1
                delete_flows,
1023
                'delete'
1024 1
            )
1025 1
            for evc in cleared_evcs:
1026 1
                evc.failover_path.make_vlans_available(self.controller)
1027 1
                evc.failover_path = Path([])
1028 1
            emit_event(
1029 1
                self.controller,
1030
                "failover_old_path",
1031 1
                content=event_contents
1032 1
            )
1033 1
            return cleared_evcs, not_cleared_evcs
1034 1
        except FlowModException as exc:
1035
            log.error(f"Failed to delete failover flows for {evcs}: {exc}")
1036
            return [], [*cleared_evcs, *not_cleared_evcs]
1037
1038
    def prepare_undeploy_flow(self, evc: EVC):
1039 1
        """Prepare an evc for undeploying."""
1040 1
        del_flows = {}
1041
        try:
1042
            del_flows = prepare_delete_flow(
1043
                merge_flow_dicts(
1044
                    evc._prepare_uni_flows(evc.current_path, skip_in=True),
1045 1
                    evc._prepare_nni_flows(evc.current_path),
1046 1
                    evc._prepare_nni_flows(evc.failover_path)
1047
                )
1048
            )
1049
        # pylint: disable=broad-except
1050
        except Exception:
1051 1
            err = traceback.format_exc()
1052
            log.error(f"Fail to undeploy {evc}: {err}")
1053
        return del_flows
1054
1055 1
    def execute_undeploy(self, evcs: list[EVC]):
1056
        """Process changes needed to commit an undeploy"""
1057
        delete_flows = {}
1058
        undeploy_evcs = list[EVC]()
1059 1
        not_undeploy_evcs = list[EVC]()
1060 1
1061
        for evc in evcs:
1062
            delete_flow = self.prepare_undeploy_flow(evc)
1063
            if delete_flow:
1064 1
                delete_flows = merge_flow_dicts(delete_flows, delete_flow)
1065
                undeploy_evcs.append(evc)
1066 1
            else:
1067
                not_undeploy_evcs.append(evc)
1068 1
1069
        try:
1070
            send_flow_mods_http(
1071
                delete_flows,
1072 1
                'delete'
1073 1
            )
1074
1075 1
            for evc in undeploy_evcs:
1076
                evc.current_path.make_vlans_available(self.controller)
1077 1
                evc.failover_path.make_vlans_available(self.controller)
1078
                evc.current_path = Path([])
1079
                evc.failover_path = Path([])
1080
                evc.deactivate()
1081 1
                emit_event(
1082 1
                    self.controller,
1083
                    "need_redeploy",
1084 1
                    content={"evc_id": evc.id}
1085
                )
1086 1
                log.info(f"{evc} scheduled for redeploy")
1087 1
            return undeploy_evcs, not_undeploy_evcs
1088
        except FlowModException as exc:
1089
            log.error(
1090
                f"Failed to delete flows before redeploy for {evcs}: {exc}"
1091 1
            )
1092 1
            return [], [*undeploy_evcs, *not_undeploy_evcs]
1093
1094
    def handle_link_down(self, event):
1095
        """Change circuit when link is down or under_mantenance."""
1096 1
        link = event.content["link"]
1097 1
        log.info("Event handle_link_down %s", link)
1098
1099
        with ExitStack() as exit_stack:
1100
            exit_stack.enter_context(self.multi_evc_lock)
1101 1
            swap_to_failover = list[EVC]()
1102
            undeploy = list[EVC]()
1103
            clear_failover = list[EVC]()
1104
            evcs_to_update = dict[str, EVC]()
1105
1106
            for evc in self.get_evcs_by_svc_level():
1107
                with ExitStack() as sub_stack:
1108
                    sub_stack.enter_context(evc.lock)
1109
                    if all((
1110
                        evc.is_affected_by_link(link),
1111
                        evc.failover_path,
1112
                        not evc.is_failover_path_affected_by_link(link)
1113
                    )):
1114
                        swap_to_failover.append(evc)
1115
                    elif all((
1116
                        evc.is_affected_by_link(link),
1117 1
                        not evc.failover_path or
1118 1
                        evc.is_failover_path_affected_by_link(link)
1119
                    )):
1120
                        undeploy.append(evc)
1121
                    elif all((
1122 1
                        not evc.is_affected_by_link(link),
1123
                        evc.failover_path,
1124 1
                        evc.is_failover_path_affected_by_link(link)
1125 1
                    )):
1126 1
                        clear_failover.append(evc)
1127 1
                    else:
1128 1
                        continue
1129 1
1130
                    exit_stack.push(sub_stack.pop_all())
1131 1
1132 1
            # Swap from current path to failover path
1133 1
1134 1
            if swap_to_failover:
1135 1
                success, failure = self.execute_swap_to_failover(
1136 1
                    swap_to_failover
1137
                )
1138
1139 1
                clear_failover.extend(success)
1140 1
1141
                evcs_to_update.update((evc.id, evc) for evc in success)
1142
1143
                undeploy.extend(failure)
1144 1
1145
            # Clear out failover path
1146
1147
            if clear_failover:
1148
                success, failure = self.execute_clear_failover(clear_failover)
1149
1150
                evcs_to_update.update((evc.id, evc) for evc in success)
1151 1
1152 1
                undeploy.extend(failure)
1153
1154
            # Undeploy the evc, schedule a redeploy
1155
1156 1
            if undeploy:
1157
                success, failure = self.execute_undeploy(undeploy)
1158 1
1159 1
                evcs_to_update.update((evc.id, evc) for evc in success)
1160 1
1161 1
                if failure:
1162 1
                    log.error(f"Failed to handle_link_down for {failure}")
1163
1164
            # Push update to DB
1165 1
1166
            if evcs_to_update:
1167 1
                evc_dicts = list[dict]()
1168 1
                update_time = now()
1169 1
                for evc in evcs_to_update.values():
1170 1
                    evc.updated_at = update_time
1171
                    evc_dicts.append(evc.as_dict())
1172
                self.mongo_controller.update_evcs(
1173 1
                    evc_dicts
1174 1
                )
1175 1
1176
    @listen_to("kytos/mef_eline.need_redeploy")
1177 1
    def on_evc_need_redeploy(self, event):
1178 1
        """Redeploy evcs that need to be redeployed."""
1179 1
        self.handle_evc_need_redeploy(event)
1180
1181 1
    def handle_evc_need_redeploy(self, event):
1182 1
        """Redeploy evcs that need to be redeployed."""
1183
        evc = self.circuits.get(event.content["evc_id"])
1184
        if evc is None:
1185
            return
1186 1
        with evc.lock:
1187
            if not evc.is_enabled() or evc.is_active():
1188 1
                return
1189 1
            result = evc.deploy()
1190 1
        event_name = "error_redeploy_link_down"
1191 1
        if result:
1192 1
            log.info(f"{evc} redeployed")
1193 1
            event_name = "redeployed_link_down"
1194 1
        emit_event(self.controller, event_name,
1195 1
                   content=map_evc_event_content(evc))
1196 1
1197 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
1198
    def on_evc_affected_by_link_down(self, event):
1199 1
        """Change circuit when link is down or under_mantenance."""
1200
        self.handle_evc_affected_by_link_down(event)
1201
1202
    def handle_evc_affected_by_link_down(self, event):
1203
        """Change circuit when link is down or under_mantenance."""
1204 1
        evc = self.circuits.get(event.content["evc_id"])
1205 1
        link = event.content['link']
1206
        if not evc:
1207
            return
1208 1
        with evc.lock:
1209 1
            if not evc.is_affected_by_link(link):
1210 1
                return
1211 1
            result = evc.handle_link_down()
1212 1
        event_name = "error_redeploy_link_down"
1213 1
        if result:
1214
            log.info(f"{evc} redeployed due to link down {link.id}")
1215 1
            event_name = "redeployed_link_down"
1216 1
        emit_event(self.controller, event_name,
1217 1
                   content=map_evc_event_content(evc))
1218 1
1219
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
1220
    def on_evc_deployed(self, event):
1221
        """Handle EVC deployed|redeployed_link_down."""
1222
        self.handle_evc_deployed(event)
1223
1224
    def handle_evc_deployed(self, event):
1225
        """Setup failover path on evc deployed."""
1226 1
        evc = self.circuits.get(event.content["evc_id"])
1227 1
        if not evc:
1228
            return
1229
        with evc.lock:
1230
            evc.try_setup_failover_path()
1231
1232
    @listen_to("kytos/topology.topology_loaded")
1233
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
1234 1
        """Load EVCs once the topology is available."""
1235
        self.load_all_evcs()
1236 1
1237
    def load_all_evcs(self):
1238
        """Try to load all EVCs on startup."""
1239
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
1240 1
        for circuit_id, circuit in circuits:
1241
            if circuit_id not in self.circuits:
1242 1
                self._load_evc(circuit)
1243 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
1244 1
                   timeout=1)
1245 1
1246
    def _load_evc(self, circuit_dict):
1247 1
        """Load one EVC from mongodb to memory."""
1248
        try:
1249 1
            evc = self._evc_from_dict(circuit_dict)
1250 1
        except (ValueError, KytosTagError) as exception:
1251
            log.error(
1252 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
1253 1
            )
1254 1
            return None
1255 1
        if evc.archived:
1256
            return None
1257
1258
        self.circuits.setdefault(evc.id, evc)
1259 1
        self.sched.add(evc)
1260 1
        return evc
1261 1
1262 1
    @listen_to("kytos/flow_manager.flow.error")
1263 1
    def on_flow_mod_error(self, event):
1264 1
        """Handle flow mod errors related to an EVC."""
1265 1
        self.handle_flow_mod_error(event)
1266 1
1267 1
    def handle_flow_mod_error(self, event):
1268 1
        """Handle flow mod errors related to an EVC."""
1269 1
        flow = event.content["flow"]
1270
        command = event.content.get("error_command")
1271 1
        if command != "add":
1272
            return
1273 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1274 1
        if not evc or evc.archived or not evc.is_enabled():
1275 1
            return
1276
        with evc.lock:
1277 1
            evc.remove_current_flows(sync=False)
1278
            evc.remove_failover_flows(sync=True)
1279 1
1280 1
    def _evc_dict_with_instances(self, evc_dict):
1281
        """Convert some dict values to instance of EVC classes.
1282 1
1283 1
        This method will convert: [UNI, Link]
1284 1
        """
1285 1
        data = evc_dict.copy()  # Do not modify the original dict
1286 1
        for attribute, value in data.items():
1287 1
            # Get multiple attributes.
1288
            # Ex: uni_a, uni_z
1289
            if "uni" in attribute:
1290
                try:
1291 1
                    data[attribute] = self._uni_from_dict(value)
1292 1
                except ValueError as exception:
1293 1
                    result = "Error creating UNI: Invalid value"
1294 1
                    raise ValueError(result) from exception
1295
1296 1
            if attribute == "circuit_scheduler":
1297 1
                data[attribute] = []
1298 1
                for schedule in value:
1299 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1300
1301
            # Get multiple attributes.
1302 1
            # Ex: primary_links,
1303 1
            #     backup_links,
1304
            #     current_links_cache,
1305 1
            #     primary_links_cache,
1306
            #     backup_links_cache
1307
            if "links" in attribute:
1308
                data[attribute] = [
1309
                    self._link_from_dict(link, attribute) for link in value
1310
                ]
1311
1312 1
            # Ex: current_path,
1313 1
            #     primary_path,
1314 1
            #     backup_path
1315
            if (attribute.endswith("path") and
1316
                    attribute != "dynamic_backup_path"):
1317 1
                data[attribute] = Path(
1318 1
                    [self._link_from_dict(link, attribute) for link in value]
1319 1
                )
1320 1
1321 1
        return data
1322 1
1323 1
    def _evc_from_dict(self, evc_dict):
1324 1
        data = self._evc_dict_with_instances(evc_dict)
1325 1
        data["table_group"] = self.table_group
1326
        return EVC(self.controller, **data)
1327 1
1328
    def _uni_from_dict(self, uni_dict):
1329
        """Return a UNI object from python dict."""
1330
        if uni_dict is None:
1331
            return False
1332
1333 1
        interface_id = uni_dict.get("interface_id")
1334
        interface = self.controller.get_interface_by_id(interface_id)
1335 1
        if interface is None:
1336 1
            result = (
1337 1
                "Error creating UNI:"
1338 1
                + f"Could not instantiate interface {interface_id}"
1339 1
            )
1340
            raise ValueError(result) from ValueError
1341
        tag_convert = {1: "vlan"}
1342 1
        tag_dict = uni_dict.get("tag", None)
1343 1
        if tag_dict:
1344
            tag_type = tag_dict.get("tag_type")
1345 1
            tag_type = tag_convert.get(tag_type, tag_type)
1346 1
            tag_value = tag_dict.get("value")
1347 1
            if isinstance(tag_value, list):
1348 1
                tag_value = get_tag_ranges(tag_value)
1349 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1350 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1351
            else:
1352
                tag = TAG(tag_type, tag_value)
1353 1
        else:
1354 1
            tag = None
1355 1
        uni = UNI(interface, tag)
1356 1
        return uni
1357 1
1358
    def _link_from_dict(self, link_dict: dict, attribute: str) -> Link:
1359
        """Return a Link object from python dict."""
1360
        id_a = link_dict.get("endpoint_a").get("id")
1361
        id_b = link_dict.get("endpoint_b").get("id")
1362
1363
        endpoint_a = self.controller.get_interface_by_id(id_a)
1364
        endpoint_b = self.controller.get_interface_by_id(id_b)
1365
        if not endpoint_a:
1366
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1367
            raise ValueError(error_msg)
1368
        if not endpoint_b:
1369
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1370
            raise ValueError(error_msg)
1371
1372
        link = Link(endpoint_a, endpoint_b)
1373
        allowed_paths = {"current_path", "failover_path"}
1374
        if "metadata" in link_dict and attribute in allowed_paths:
1375
            link.extend_metadata(link_dict.get("metadata"))
1376
1377
        s_vlan = link.get_metadata("s_vlan")
1378
        if s_vlan:
1379
            tag = TAG.from_dict(s_vlan)
1380
            if tag is False:
1381
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1382
                raise ValueError(error_msg)
1383
            link.update_metadata("s_vlan", tag)
1384
        return link
1385
1386
    def _find_evc_by_schedule_id(self, schedule_id):
1387
        """
1388
        Find an EVC and CircuitSchedule based on schedule_id.
1389
1390
        :param schedule_id: Schedule ID
1391
        :return: EVC and Schedule
1392
        """
1393
        circuits = self._get_circuits_buffer()
1394
        found_schedule = None
1395
        evc = None
1396
1397
        # pylint: disable=unused-variable
1398
        for c_id, circuit in circuits.items():
1399
            # TODO: circuit_scheduler not protected by lock.
1400
            # Could be modified during iteration.
1401
            for schedule in circuit.circuit_scheduler:
1402
                if schedule.id == schedule_id:
1403
                    found_schedule = schedule
1404
                    evc = circuit
1405
                    break
1406
            if found_schedule:
1407
                break
1408
        return evc, found_schedule
1409
1410
    def _get_circuits_buffer(self):
1411
        """
1412
        Return the circuit buffer.
1413
1414
        If the buffer is empty, try to load data from mongodb.
1415
        """
1416
        if not self.circuits:
1417
            # Load circuits from mongodb to buffer
1418
            circuits = self.mongo_controller.get_circuits()['circuits']
1419
            for c_id, circuit in circuits.items():
1420
                evc = self._evc_from_dict(circuit)
1421
                self.circuits[c_id] = evc
1422
        return self.circuits
1423
1424
    # pylint: disable=attribute-defined-outside-init
1425
    @alisten_to("kytos/of_multi_table.enable_table")
1426
    async def on_table_enabled(self, event):
1427
        """Handle a recently table enabled."""
1428
        table_group = event.content.get("mef_eline", None)
1429
        if not table_group:
1430
            return
1431
        for group in table_group:
1432
            if group not in settings.TABLE_GROUP_ALLOWED:
1433
                log.error(f'The table group "{group}" is not allowed for '
1434
                          f'mef_eline. Allowed table groups are '
1435
                          f'{settings.TABLE_GROUP_ALLOWED}')
1436
                return
1437
        self.table_group.update(table_group)
1438
        content = {"group_table": self.table_group}
1439
        name = "kytos/mef_eline.enable_table"
1440
        await aemit_event(self.controller, name, content)
1441