Test Failed
Pull Request — master (#480)
by Aldo
04:19
created

build.main.Main.on_flow_mod_error()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nop 2
crap 1
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        if enable_filter:
82 1
            return sorted(
83
                          [circuit for circuit in self.circuits.values()
84
                           if circuit.is_enabled()],
85
                          key=lambda x: (-x.service_level, x.creation_time),
86
            )
87 1
        return sorted(self.circuits.values(),
88
                      key=lambda x: (-x.service_level, x.creation_time))
89
90 1
    @staticmethod
91 1
    def get_eline_controller():
92
        """Return the ELineController instance."""
93
        return controllers.ELineController()
94
95 1
    def execute(self):
96
        """Execute once when the napp is running."""
97 1
        if self._lock.locked():
98 1
            return
99 1
        log.debug("Starting consistency routine")
100 1
        with self._lock:
101 1
            self.execute_consistency()
102 1
        log.debug("Finished consistency routine")
103
104 1
    def should_be_checked(self, circuit):
105
        "Verify if the circuit meets the necessary conditions to be checked"
106
        # pylint: disable=too-many-boolean-expressions
107 1
        if (
108
                circuit.is_enabled()
109
                and not circuit.is_active()
110
                and not circuit.lock.locked()
111
                and not circuit.has_recent_removed_flow()
112
                and not circuit.is_recent_updated()
113
                and circuit.are_unis_active(self.controller.switches)
114
                # if a inter-switch EVC does not have current_path, it does not
115
                # make sense to run sdntrace on it
116
                and (circuit.is_intra_switch() or circuit.current_path)
117
                ):
118 1
            return True
119
        return False
120
121 1
    def execute_consistency(self):
122
        """Execute consistency routine."""
123 1
        circuits_to_check = []
124 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
125 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
126 1
            stored_circuits.pop(circuit.id, None)
127 1
            if self.should_be_checked(circuit):
128 1
                circuits_to_check.append(circuit)
129 1
            circuit.try_setup_failover_path()
130 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
131 1
        for circuit in circuits_to_check:
132 1
            is_checked = circuits_checked.get(circuit.id)
133 1
            if is_checked:
134 1
                circuit.execution_rounds = 0
135 1
                log.info(f"{circuit} enabled but inactive - activating")
136 1
                with circuit.lock:
137 1
                    circuit.activate()
138 1
                    circuit.sync()
139
            else:
140 1
                circuit.execution_rounds += 1
141 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
142 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
143 1
                    with circuit.lock:
144 1
                        circuit.deploy()
145 1
146 1
    def shutdown(self):
147 1
        """Execute when your napp is unloaded.
148
149 1
        If you have some cleanup procedure, insert it here.
150
        """
151
152
    @rest("/v2/evc/", methods=["GET"])
153
    def list_circuits(self, request: Request) -> JSONResponse:
154
        """Endpoint to return circuits stored.
155 1
156 1
        archive query arg if defined (not null) will be filtered
157
        accordingly, by default only non archived evcs will be listed
158
        """
159
        log.debug("list_circuits /v2/evc")
160
        args = request.query_params
161
        archived = args.get("archived", "false").lower()
162 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
163 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
164 1
                                                      metadata=args)
165 1
        circuits = circuits['circuits']
166 1
        return JSONResponse(circuits)
167
168 1
    @rest("/v2/evc/schedule", methods=["GET"])
169 1
    def list_schedules(self, _request: Request) -> JSONResponse:
170
        """Endpoint to return all schedules stored for all circuits.
171 1
172 1
        Return a JSON with the following template:
173
        [{"schedule_id": <schedule_id>,
174
         "circuit_id": <circuit_id>,
175
         "schedule": <schedule object>}]
176
        """
177
        log.debug("list_schedules /v2/evc/schedule")
178
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
179
        if not circuits:
180 1
            result = {}
181 1
            status = 200
182 1
            return JSONResponse(result, status_code=status)
183 1
184 1
        result = []
185 1
        status = 200
186
        for circuit in circuits:
187 1
            circuit_scheduler = circuit.get("circuit_scheduler")
188 1
            if circuit_scheduler:
189 1
                for scheduler in circuit_scheduler:
190 1
                    value = {
191 1
                        "schedule_id": scheduler.get("id"),
192 1
                        "circuit_id": circuit.get("id"),
193 1
                        "schedule": scheduler,
194
                    }
195
                    result.append(value)
196
197
        log.debug("list_schedules result %s %s", result, status)
198 1
        return JSONResponse(result, status_code=status)
199
200 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
201 1
    def get_circuit(self, request: Request) -> JSONResponse:
202
        """Endpoint to return a circuit based on id."""
203 1
        circuit_id = request.path_params["circuit_id"]
204 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
205
        circuit = self.mongo_controller.get_circuit(circuit_id)
206 1
        if not circuit:
207 1
            result = f"circuit_id {circuit_id} not found"
208 1
            log.debug("get_circuit result %s %s", result, 404)
209 1
            raise HTTPException(404, detail=result)
210 1
        status = 200
211 1
        log.debug("get_circuit result %s %s", circuit, status)
212 1
        return JSONResponse(circuit, status_code=status)
213 1
214 1
    # pylint: disable=too-many-branches, too-many-statements
215 1
    @rest("/v2/evc/", methods=["POST"])
216
    @validate_openapi(spec)
217
    def create_circuit(self, request: Request) -> JSONResponse:
218 1
        """Try to create a new circuit.
219 1
220 1
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
221
        UNI_Z's requested C-VID are available from the interfaces' pools. This
222
        is checked when creating the UNI object.
223
224
        Then, E-Line NApp requests a primary and a backup path to the
225
        Pathfinder NApp using the attributes primary_links and backup_links
226
        submitted via REST
227
228
        # For each link composing paths in #3:
229
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
230
        #  - Using the S-VID obtained, generate abstract flow entries to be
231
        #    sent to FlowManager
232
233
        Push abstract flow entries to FlowManager and FlowManager pushes
234
        OpenFlow entries to datapaths
235
236
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
237
        creation
238
239
        Finnaly, notify user of the status of its request.
240
        """
241
        # Try to create the circuit object
242
        log.debug("create_circuit /v2/evc/")
243
        data = get_json_or_400(request, self.controller.loop)
244
245 1
        try:
246 1
            evc = self._evc_from_dict(data)
247
        except (ValueError, KytosTagError) as exception:
248 1
            log.debug("create_circuit result %s %s", exception, 400)
249 1
            raise HTTPException(400, detail=str(exception)) from exception
250 1
        try:
251 1
            check_disabled_component(evc.uni_a, evc.uni_z)
252 1
        except DisabledSwitch as exception:
253 1
            log.debug("create_circuit result %s %s", exception, 409)
254 1
            raise HTTPException(
255 1
                    409,
256 1
                    detail=f"Path is not valid: {exception}"
257 1
                ) from exception
258
259
        if evc.primary_path:
260
            try:
261
                evc.primary_path.is_valid(
262 1
                    evc.uni_a.interface.switch,
263 1
                    evc.uni_z.interface.switch,
264 1
                    bool(evc.circuit_scheduler),
265
                )
266
            except InvalidPath as exception:
267
                raise HTTPException(
268
                    400,
269 1
                    detail=f"primary_path is not valid: {exception}"
270 1
                ) from exception
271
        if evc.backup_path:
272
            try:
273
                evc.backup_path.is_valid(
274 1
                    evc.uni_a.interface.switch,
275 1
                    evc.uni_z.interface.switch,
276 1
                    bool(evc.circuit_scheduler),
277
                )
278
            except InvalidPath as exception:
279
                raise HTTPException(
280
                    400,
281 1
                    detail=f"backup_path is not valid: {exception}"
282 1
                ) from exception
283
284
        if not evc._tag_lists_equal():
285
            detail = "UNI_A and UNI_Z tag lists should be the same."
286
            raise HTTPException(400, detail=detail)
287 1
288 1
        try:
289 1
            evc._validate_has_primary_or_dynamic()
290
        except ValueError as exception:
291 1
            raise HTTPException(400, detail=str(exception)) from exception
292 1
293 1
        try:
294 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
295
        except DuplicatedNoTagUNI as exception:
296 1
            log.debug("create_circuit result %s %s", exception, 409)
297 1
            raise HTTPException(409, detail=str(exception)) from exception
298
299
        try:
300
            self._use_uni_tags(evc)
301
        except KytosTagError as exception:
302 1
            raise HTTPException(400, detail=str(exception)) from exception
303 1
304 1
        # save circuit
305 1
        try:
306
            evc.sync()
307
        except ValidationError as exception:
308 1
            raise HTTPException(400, detail=str(exception)) from exception
309 1
310
        # store circuit in dictionary
311
        self.circuits[evc.id] = evc
312
313
        # Schedule the circuit deploy
314 1
        self.sched.add(evc)
315
316
        # Circuit has no schedule, deploy now
317 1
        if not evc.circuit_scheduler:
318
            with evc.lock:
319
                evc.deploy()
320 1
321 1
        # Notify users
322 1
        result = {"circuit_id": evc.id}
323
        status = 201
324
        log.debug("create_circuit result %s %s", result, status)
325 1
        emit_event(self.controller, name="created",
326 1
                   content=map_evc_event_content(evc))
327 1
        return JSONResponse(result, status_code=status)
328 1
329
    @staticmethod
330 1
    def _use_uni_tags(evc):
331
        uni_a = evc.uni_a
332 1
        evc._use_uni_vlan(uni_a)
333 1
        try:
334 1
            uni_z = evc.uni_z
335 1
            evc._use_uni_vlan(uni_z)
336 1
        except KytosTagError as err:
337 1
            evc.make_uni_vlan_available(uni_a)
338 1
            raise err
339 1
340 1
    @listen_to('kytos/flow_manager.flow.removed')
341 1
    def on_flow_delete(self, event):
342
        """Capture delete messages to keep track when flows got removed."""
343 1
        self.handle_flow_delete(event)
344 1
345
    def handle_flow_delete(self, event):
346
        """Keep track when the EVC got flows removed by deriving its cookie."""
347
        flow = event.content["flow"]
348 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
349
        if evc:
350 1
            log.debug("Flow removed in EVC %s", evc.id)
351 1
            evc.set_flow_removed_at()
352 1
353 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
354 1
    @validate_openapi(spec)
355
    def update(self, request: Request) -> JSONResponse:
356 1
        """Update a circuit based on payload.
357 1
358 1
        The EVC attributes (creation_time, active, current_path,
359
        failover_path, _id, archived) can't be updated.
360
        """
361
        data = get_json_or_400(request, self.controller.loop)
362
        circuit_id = request.path_params["circuit_id"]
363
        log.debug("update /v2/evc/%s", circuit_id)
364 1
        try:
365 1
            evc = self.circuits[circuit_id]
366 1
        except KeyError:
367 1
            result = f"circuit_id {circuit_id} not found"
368 1
            log.debug("update result %s %s", result, 404)
369 1
            raise HTTPException(404, detail=result) from KeyError
370 1
371 1
        try:
372 1
            updated_data = self._evc_dict_with_instances(data)
373
            self._check_no_tag_duplication(
374 1
                circuit_id, updated_data.get("uni_a"),
375 1
                updated_data.get("uni_z")
376 1
            )
377
            enable, redeploy = evc.update(**updated_data)
378
        except (ValueError, KytosTagError, ValidationError) as exception:
379
            log.debug("update result %s %s", exception, 400)
380 1
            raise HTTPException(400, detail=str(exception)) from exception
381 1
        except DuplicatedNoTagUNI as exception:
382 1
            log.debug("update result %s %s", exception, 409)
383 1
            raise HTTPException(409, detail=str(exception)) from exception
384 1
        except DisabledSwitch as exception:
385
            log.debug("update result %s %s", exception, 409)
386
            raise HTTPException(
387 1
                    409,
388 1
                    detail=f"Path is not valid: {exception}"
389 1
                ) from exception
390
391
        if evc.is_active():
392
            if enable is False:  # disable if active
393
                with evc.lock:
394 1
                    evc.remove()
395
            elif redeploy is not None:  # redeploy if active
396
                with evc.lock:
397
                    evc.remove()
398
                    evc.deploy()
399
        else:
400
            if enable is True:  # enable if inactive
401
                with evc.lock:
402
                    evc.deploy()
403 1
            elif evc.is_enabled() and redeploy:
404 1
                with evc.lock:
405 1
                    evc.remove()
406 1
                    evc.deploy()
407 1
        result = {evc.id: evc.as_dict()}
408 1
        status = 200
409 1
410 1
        log.debug("update result %s %s", result, status)
411 1
        emit_event(self.controller, "updated",
412
                   content=map_evc_event_content(evc, **data))
413 1
        return JSONResponse(result, status_code=status)
414 1
415
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
416 1
    def delete_circuit(self, request: Request) -> JSONResponse:
417
        """Remove a circuit.
418 1
419 1
        First, the flows are removed from the switches, and then the EVC is
420
        disabled.
421
        """
422
        circuit_id = request.path_params["circuit_id"]
423
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
424
        try:
425 1
            evc = self.circuits.pop(circuit_id)
426 1
        except KeyError:
427 1
            result = f"circuit_id {circuit_id} not found"
428 1
            log.debug("delete_circuit result %s %s", result, 404)
429 1
            raise HTTPException(404, detail=result) from KeyError
430 1
        log.info("Removing %s", evc)
431 1
432 1
        with evc.lock:
433
            if not evc.archived:
434 1
                evc.remove_current_flows()
435 1
                evc.remove_failover_flows(sync=False)
436 1
                evc.deactivate()
437 1
                evc.disable()
438 1
                self.sched.remove(evc)
439 1
                evc.archive()
440 1
                evc.remove_uni_tags()
441 1
                evc.sync()
442 1
                emit_event(self.controller, "deleted",
443 1
                   content=map_evc_event_content(evc))
444 1
445 1
        log.info("EVC removed. %s", evc)
446 1
        result = {"response": f"Circuit {circuit_id} removed"}
447
        status = 200
448 1
        log.debug("delete_circuit result %s %s", result, status)
449 1
450
        return JSONResponse(result, status_code=status)
451 1
452
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
453 1
    def get_metadata(self, request: Request) -> JSONResponse:
454 1
        """Get metadata from an EVC."""
455
        circuit_id = request.path_params["circuit_id"]
456 1
        try:
457 1
            return (
458 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
459
            )
460
        except KeyError as error:
461
            raise HTTPException(
462
                404,
463
                detail=f"circuit_id {circuit_id} not found."
464
            ) from error
465
466 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
467 1
    @validate_openapi(spec)
468 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
469 1
        """Add metadata to a bulk of EVCs."""
470
        data = get_json_or_400(request, self.controller.loop)
471 1
        circuit_ids = data.pop("circuit_ids")
472 1
473
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
474 1
475
        fail_evcs = []
476 1
        for _id in circuit_ids:
477 1
            try:
478 1
                evc = self.circuits[_id]
479 1
                evc.extend_metadata(data)
480 1
            except KeyError:
481 1
                fail_evcs.append(_id)
482 1
483
        if fail_evcs:
484 1
            raise HTTPException(404, detail=fail_evcs)
485 1
        return JSONResponse("Operation successful", status_code=201)
486 1
487
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
488 1
    @validate_openapi(spec)
489 1
    def add_metadata(self, request: Request) -> JSONResponse:
490 1
        """Add metadata to an EVC."""
491
        circuit_id = request.path_params["circuit_id"]
492 1
        metadata = get_json_or_400(request, self.controller.loop)
493 1
        if not isinstance(metadata, dict):
494 1
            raise HTTPException(400, "Invalid metadata value: {metadata}")
495
        try:
496 1
            evc = self.circuits[circuit_id]
497 1
        except KeyError as error:
498 1
            raise HTTPException(
499 1
                404,
500
                detail=f"circuit_id {circuit_id} not found."
501
            ) from error
502
503
        evc.extend_metadata(metadata)
504 1
        evc.sync()
505 1
        return JSONResponse("Operation successful", status_code=201)
506 1
507 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
508 1
    @validate_openapi(spec)
509 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
510 1
        """Delete metada from a bulk of EVCs"""
511
        data = get_json_or_400(request, self.controller.loop)
512 1
        key = request.path_params["key"]
513 1
        circuit_ids = data.pop("circuit_ids")
514 1
        self.mongo_controller.update_evcs_metadata(
515 1
            circuit_ids, {key: ""}, "del"
516
        )
517
518
        fail_evcs = []
519 1
        for _id in circuit_ids:
520 1
            try:
521 1
                evc = self.circuits[_id]
522 1
                evc.remove_metadata(key)
523 1
            except KeyError:
524 1
                fail_evcs.append(_id)
525 1
526
        if fail_evcs:
527 1
            raise HTTPException(404, detail=fail_evcs)
528 1
        return JSONResponse("Operation successful")
529 1
530
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
531 1
    def delete_metadata(self, request: Request) -> JSONResponse:
532 1
        """Delete metadata from an EVC."""
533
        circuit_id = request.path_params["circuit_id"]
534 1
        key = request.path_params["key"]
535 1
        try:
536 1
            evc = self.circuits[circuit_id]
537 1
        except KeyError as error:
538 1
            raise HTTPException(
539 1
                404,
540
                detail=f"circuit_id {circuit_id} not found."
541
            ) from error
542
543
        evc.remove_metadata(key)
544 1
        evc.sync()
545 1
        return JSONResponse("Operation successful")
546 1
547
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
548 1
    def redeploy(self, request: Request) -> JSONResponse:
549 1
        """Endpoint to force the redeployment of an EVC."""
550
        circuit_id = request.path_params["circuit_id"]
551 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
552 1
        try:
553 1
            evc = self.circuits[circuit_id]
554 1
        except KeyError:
555 1
            raise HTTPException(
556 1
                404,
557
                detail=f"circuit_id {circuit_id} not found"
558
            ) from KeyError
559
        if evc.is_enabled():
560 1
            with evc.lock:
561 1
                evc.remove_failover_flows(sync=False)
562 1
                evc.remove_current_flows(sync=True)
563 1
                evc.deploy()
564 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
565 1
            status = 202
566 1
        else:
567
            result = {"response": f"Circuit {circuit_id} is disabled."}
568 1
            status = 409
569 1
570
        return JSONResponse(result, status_code=status)
571 1
572
    @rest("/v2/evc/schedule/", methods=["POST"])
573 1
    @validate_openapi(spec)
574 1
    def create_schedule(self, request: Request) -> JSONResponse:
575 1
        """
576
        Create a new schedule for a given circuit.
577
578
        This service do no check if there are conflicts with another schedule.
579
        Payload example:
580
            {
581
              "circuit_id":"aa:bb:cc",
582
              "schedule": {
583
                "date": "2019-08-07T14:52:10.967Z",
584
                "interval": "string",
585
                "frequency": "1 * * * *",
586
                "action": "create"
587
              }
588
            }
589
        """
590
        log.debug("create_schedule /v2/evc/schedule/")
591 1
        data = get_json_or_400(request, self.controller.loop)
592 1
        circuit_id = data["circuit_id"]
593 1
        schedule_data = data["schedule"]
594 1
595
        # Get EVC from circuits buffer
596
        circuits = self._get_circuits_buffer()
597 1
598
        # get the circuit
599
        evc = circuits.get(circuit_id)
600 1
601
        # get the circuit
602
        if not evc:
603 1
            result = f"circuit_id {circuit_id} not found"
604 1
            log.debug("create_schedule result %s %s", result, 404)
605 1
            raise HTTPException(404, detail=result)
606 1
607
        # new schedule from dict
608
        new_schedule = CircuitSchedule.from_dict(schedule_data)
609 1
610
        # If there is no schedule, create the list
611
        if not evc.circuit_scheduler:
612 1
            evc.circuit_scheduler = []
613 1
614
        # Add the new schedule
615
        evc.circuit_scheduler.append(new_schedule)
616 1
617
        # Add schedule job
618
        self.sched.add_circuit_job(evc, new_schedule)
619 1
620
        # save circuit to mongodb
621
        evc.sync()
622 1
623
        result = new_schedule.as_dict()
624 1
        status = 201
625 1
626
        log.debug("create_schedule result %s %s", result, status)
627 1
        return JSONResponse(result, status_code=status)
628 1
629
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
630 1
    @validate_openapi(spec)
631 1
    def update_schedule(self, request: Request) -> JSONResponse:
632 1
        """Update a schedule.
633
634
        Change all attributes from the given schedule from a EVC circuit.
635
        The schedule ID is preserved as default.
636
        Payload example:
637
            {
638
              "date": "2019-08-07T14:52:10.967Z",
639
              "interval": "string",
640
              "frequency": "1 * * *",
641
              "action": "create"
642
            }
643
        """
644
        data = get_json_or_400(request, self.controller.loop)
645 1
        schedule_id = request.path_params["schedule_id"]
646 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
647 1
648
        # Try to find a circuit schedule
649
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
650 1
651
        # Can not modify circuits deleted and archived
652
        if not found_schedule:
653 1
            result = f"schedule_id {schedule_id} not found"
654 1
            log.debug("update_schedule result %s %s", result, 404)
655 1
            raise HTTPException(404, detail=result)
656 1
657
        new_schedule = CircuitSchedule.from_dict(data)
658 1
        new_schedule.id = found_schedule.id
659 1
        # Remove the old schedule
660
        evc.circuit_scheduler.remove(found_schedule)
661 1
        # Append the modified schedule
662
        evc.circuit_scheduler.append(new_schedule)
663 1
664
        # Cancel all schedule jobs
665
        self.sched.cancel_job(found_schedule.id)
666 1
        # Add the new circuit schedule
667
        self.sched.add_circuit_job(evc, new_schedule)
668 1
        # Save EVC to mongodb
669
        evc.sync()
670 1
671
        result = new_schedule.as_dict()
672 1
        status = 200
673 1
674
        log.debug("update_schedule result %s %s", result, status)
675 1
        return JSONResponse(result, status_code=status)
676 1
677
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
678 1
    def delete_schedule(self, request: Request) -> JSONResponse:
679 1
        """Remove a circuit schedule.
680
681
        Remove the Schedule from EVC.
682
        Remove the Schedule from cron job.
683
        Save the EVC to the Storehouse.
684
        """
685
        schedule_id = request.path_params["schedule_id"]
686 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
687 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
688 1
689
        # Can not modify circuits deleted and archived
690
        if not found_schedule:
691 1
            result = f"schedule_id {schedule_id} not found"
692 1
            log.debug("delete_schedule result %s %s", result, 404)
693 1
            raise HTTPException(404, detail=result)
694 1
695
        # Remove the old schedule
696
        evc.circuit_scheduler.remove(found_schedule)
697 1
698
        # Cancel all schedule jobs
699
        self.sched.cancel_job(found_schedule.id)
700 1
        # Save EVC to mongodb
701
        evc.sync()
702 1
703
        result = "Schedule removed"
704 1
        status = 200
705 1
706
        log.debug("delete_schedule result %s %s", result, status)
707 1
        return JSONResponse(result, status_code=status)
708 1
709
    def _check_no_tag_duplication(
710 1
        self,
711
        evc_id: str,
712
        uni_a: Optional[UNI] = None,
713
        uni_z: Optional[UNI] = None
714
    ):
715
        """Check if the given EVC has UNIs with no tag and if these are
716
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
717
        Args:
718
            evc (dict): EVC to be analyzed.
719
        """
720
721
        # No UNIs
722
        if not (uni_a or uni_z):
723 1
            return
724 1
725
        if (not (uni_a and not uni_a.user_tag) and
726 1
                not (uni_z and not uni_z.user_tag)):
727
            return
728 1
        for circuit in self.circuits.copy().values():
729 1
            if (not circuit.archived and circuit._id != evc_id):
730 1
                if uni_a and uni_a.user_tag is None:
731 1
                    circuit.check_no_tag_duplicate(uni_a)
732 1
                if uni_z and uni_z.user_tag is None:
733 1
                    circuit.check_no_tag_duplicate(uni_z)
734 1
735
    @listen_to("kytos/topology.link_up")
736 1
    def on_link_up(self, event):
737 1
        """Change circuit when link is up or end_maintenance."""
738
        self.handle_link_up(event)
739
740
    def handle_link_up(self, event):
741 1
        """Change circuit when link is up or end_maintenance."""
742
        log.info("Event handle_link_up %s", event.content["link"])
743 1
        for evc in self.get_evcs_by_svc_level():
744 1
            if evc.is_enabled() and not evc.archived:
745 1
                with evc.lock:
746 1
                    evc.handle_link_up(event.content["link"])
747 1
748
    # Possibly replace this with interruptions?
749
    @listen_to(
750 1
        '.*.switch.interface.(link_up|link_down|created|deleted)'
751
    )
752
    def on_interface_link_change(self, event: KytosEvent):
753 1
        """
754
        Handler for interface link_up and link_down events.
755
        """
756
        self.handle_on_interface_link_change(event)
757
758
    def handle_on_interface_link_change(self, event: KytosEvent):
759 1
        """
760
        Handler to sort interface events {link_(up, down), create, deleted}
761
762
        To avoid multiple database updated (link flap):
763
        Every interface is identfied and processed in parallel.
764
        Once an interface event is received a time is started.
765
        While time is running self._intf_events will be updated.
766
        After time has passed last received event will be processed.
767
        """
768
        iface = event.content.get("interface")
769 1
        with self._lock_interfaces[iface.id]:
770 1
            _now = event.timestamp
771 1
            # Return out of order events
772
            if (
773 1
                iface.id in self._intf_events
774
                and self._intf_events[iface.id]["event"].timestamp > _now
775
            ):
776
                return
777 1
            self._intf_events[iface.id].update({"event": event})
778 1
            if "last_acquired" in self._intf_events[iface.id]:
779 1
                return
780 1
            self._intf_events[iface.id].update({"last_acquired": now()})
781 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
782 1
        with self._lock_interfaces[iface.id]:
783 1
            event = self._intf_events[iface.id]["event"]
784 1
            self._intf_events[iface.id].pop('last_acquired', None)
785 1
            _, _, event_type = event.name.rpartition('.')
786 1
            if event_type in ('link_up', 'created'):
787 1
                self.handle_interface_link_up(iface)
788 1
            elif event_type in ('link_down', 'deleted'):
789 1
                self.handle_interface_link_down(iface)
790 1
791
    def handle_interface_link_up(self, interface):
792 1
        """
793
        Handler for interface link_up events
794
        """
795
        for evc in self.get_evcs_by_svc_level():
796
            with evc.lock:
797
                log.info("Event handle_interface_link_up %s", interface)
798
                evc.handle_interface_link_up(
799
                    interface
800
                )
801
802
    def handle_interface_link_down(self, interface):
803 1
        """
804
        Handler for interface link_down events
805
        """
806
        log.info("Event handle_interface_link_down %s", interface)
807
        for evc in self.get_evcs_by_svc_level():
808
            with evc.lock:
809
                evc.handle_interface_link_down(
810
                    interface
811
                )
812
813
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
814 1
    def on_link_down(self, event):
815 1
        """Change circuit when link is down or under_mantenance."""
816
        self.handle_link_down(event)
817
818
    # pylint: disable=too-many-branches
819
    # pylint: disable=too-many-locals
820
    def handle_link_down(self, event):
821 1
        """Change circuit when link is down or under_mantenance."""
822
        link = event.content["link"]
823 1
        log.info("Event handle_link_down %s", link)
824 1
        switch_flows = {}
825 1
        evcs_with_failover = []
826 1
        evcs_normal = []
827 1
        check_failover = []
828 1
        failover_event_contents = {}
829 1
830
        for evc in self.get_evcs_by_svc_level():
831 1
            with evc.lock:
832 1
                if evc.is_affected_by_link(link):
833 1
                    evc.affected_by_link_at = event.timestamp
834 1
                    # if there is no failover path, handles link down the
835
                    # tradditional way
836
                    if (
837 1
                        not evc.failover_path or
838
                        evc.is_failover_path_affected_by_link(link)
839
                    ):
840
                        evcs_normal.append(evc)
841 1
                        continue
842 1
                    try:
843 1
                        dpid_flows = evc.get_failover_flows()
844 1
                        evc.old_path = evc.current_path
845 1
                        evc.current_path = evc.failover_path
846 1
                        evc.failover_path = Path([])
847 1
                    # pylint: disable=broad-except
848
                    except Exception:
849 1
                        err = traceback.format_exc().replace("\n", ", ")
850 1
                        log.error(
851 1
                            "Ignore Failover path for "
852
                            f"{evc} due to error: {err}"
853
                        )
854
                        evcs_normal.append(evc)
855 1
                        continue
856 1
                    for dpid, flows in dpid_flows.items():
857 1
                        switch_flows.setdefault(dpid, [])
858 1
                        switch_flows[dpid].extend(flows)
859 1
                    evcs_with_failover.append(evc)
860 1
                    failover_event_contents[evc.id] = map_evc_event_content(
861 1
                        evc,
862
                        flows={k: v.copy() for k, v in switch_flows.items()}
863
                    )
864
                elif evc.is_failover_path_affected_by_link(link):
865 1
                    evc.old_path = evc.failover_path
866 1
                    evc.failover_path = Path([])
867 1
                    check_failover.append(evc)
868 1
869
        if failover_event_contents:
870 1
            emit_event(self.controller, "failover_link_down",
871 1
                       content=failover_event_contents)
872
873
        while switch_flows:
874 1
            offset = settings.BATCH_SIZE or None
875 1
            switches = list(switch_flows.keys())
876 1
            for dpid in switches:
877 1
                emit_event(
878 1
                    self.controller,
879
                    context="kytos.flow_manager",
880
                    name="flows.install",
881
                    content={
882
                        "dpid": dpid,
883
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
884
                    }
885
                )
886
                if offset is None or offset >= len(switch_flows[dpid]):
887 1
                    del switch_flows[dpid]
888 1
                    continue
889 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
890 1
            time.sleep(settings.BATCH_INTERVAL)
891 1
892
        for evc in evcs_normal:
893 1
            emit_event(
894 1
                self.controller,
895
                "evc_affected_by_link_down",
896
                content={"link": link} | map_evc_event_content(evc)
897
            )
898
899
        evcs_to_update = []
900 1
        for evc in evcs_with_failover:
901 1
            evcs_to_update.append(evc.as_dict())
902 1
            log.info(
903 1
                f"{evc} redeployed with failover due to link down {link.id}"
904
            )
905
        for evc in check_failover:
906 1
            evcs_to_update.append(evc.as_dict())
907 1
908
        self.mongo_controller.update_evcs(evcs_to_update)
909 1
910
        emit_event(
911 1
            self.controller,
912
            "cleanup_evcs_old_path",
913
            content={"evcs": evcs_with_failover + check_failover}
914
        )
915
916
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
917 1
    def on_evc_affected_by_link_down(self, event):
918 1
        """Change circuit when link is down or under_mantenance."""
919
        self.handle_evc_affected_by_link_down(event)
920
921
    def handle_evc_affected_by_link_down(self, event):
922 1
        """Change circuit when link is down or under_mantenance."""
923
        evc = self.circuits.get(event.content["evc_id"])
924 1
        link = event.content['link']
925 1
        if not evc:
926 1
            return
927 1
        with evc.lock:
928 1
            if not evc.is_affected_by_link(link):
929 1
                return
930
            result = evc.handle_link_down()
931 1
        event_name = "error_redeploy_link_down"
932 1
        if result:
933 1
            log.info(f"{evc} redeployed due to link down {link.id}")
934 1
            event_name = "redeployed_link_down"
935 1
        emit_event(self.controller, event_name,
936 1
                   content=map_evc_event_content(evc))
937
938
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
939 1
    def on_evc_deployed(self, event):
940 1
        """Handle EVC deployed|redeployed_link_down."""
941
        self.handle_evc_deployed(event)
942
943
    def handle_evc_deployed(self, event):
944 1
        """Setup failover path on evc deployed."""
945
        evc = self.circuits.get(event.content["evc_id"])
946
        if not evc:
947
            return
948
        evc.try_setup_failover_path()
949
950
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
951 1
    def on_cleanup_evcs_old_path(self, event):
952 1
        """Handle cleanup evcs old path."""
953
        self.handle_cleanup_evcs_old_path(event)
954
955
    def handle_cleanup_evcs_old_path(self, event):
956 1
        """Handle cleanup evcs old path."""
957
        evcs = event.content.get("evcs", [])
958 1
        event_contents: dict[str, list[dict]] = {}
959 1
        for evc in evcs:
960 1
            if not evc.old_path:
961 1
                continue
962 1
            removed_flows = evc.remove_path_flows(evc.old_path)
963 1
            content = map_evc_event_content(
964 1
                evc,
965
                removed_flows=removed_flows,
966
                current_path=evc.current_path.as_dict(),
967
            )
968
            event_contents[evc.id] = content
969 1
            evc.old_path = Path([])
970 1
        if event_contents:
971 1
            emit_event(self.controller, "failover_old_path",
972 1
                       content=event_contents)
973
974
    @listen_to("kytos/topology.topology_loaded")
975 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
976 1
        """Load EVCs once the topology is available."""
977
        self.load_all_evcs()
978
979
    def load_all_evcs(self):
980 1
        """Try to load all EVCs on startup."""
981
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
982 1
        for circuit_id, circuit in circuits:
983 1
            if circuit_id not in self.circuits:
984 1
                self._load_evc(circuit)
985 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
986 1
                   timeout=1)
987
988
    def _load_evc(self, circuit_dict):
989 1
        """Load one EVC from mongodb to memory."""
990
        try:
991 1
            evc = self._evc_from_dict(circuit_dict)
992 1
        except (ValueError, KytosTagError) as exception:
993 1
            log.error(
994 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
995
            )
996
            return None
997 1
        if evc.archived:
998 1
            return None
999 1
1000
        self.circuits.setdefault(evc.id, evc)
1001 1
        self.sched.add(evc)
1002 1
        return evc
1003 1
1004
    @listen_to("kytos/flow_manager.flow.error")
1005 1
    def on_flow_mod_error(self, event):
1006 1
        """Handle flow mod errors related to an EVC."""
1007
        self.handle_flow_mod_error(event)
1008
1009
    def handle_flow_mod_error(self, event):
1010 1
        """Handle flow mod errors related to an EVC."""
1011
        flow = event.content["flow"]
1012 1
        command = event.content.get("error_command")
1013 1
        if command != "add":
1014 1
            return
1015
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1016 1
        if evc:
1017 1
            with evc.lock:
1018 1
                evc.remove_current_flows()
1019 1
1020
    def _evc_dict_with_instances(self, evc_dict):
1021 1
        """Convert some dict values to instance of EVC classes.
1022
1023
        This method will convert: [UNI, Link]
1024
        """
1025
        data = evc_dict.copy()  # Do not modify the original dict
1026 1
        for attribute, value in data.items():
1027 1
            # Get multiple attributes.
1028
            # Ex: uni_a, uni_z
1029
            if "uni" in attribute:
1030 1
                try:
1031 1
                    data[attribute] = self._uni_from_dict(value)
1032 1
                except ValueError as exception:
1033 1
                    result = "Error creating UNI: Invalid value"
1034 1
                    raise ValueError(result) from exception
1035 1
1036
            if attribute == "circuit_scheduler":
1037 1
                data[attribute] = []
1038 1
                for schedule in value:
1039 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1040 1
1041
            # Get multiple attributes.
1042
            # Ex: primary_links,
1043
            #     backup_links,
1044
            #     current_links_cache,
1045
            #     primary_links_cache,
1046
            #     backup_links_cache
1047
            if "links" in attribute:
1048 1
                data[attribute] = [
1049 1
                    self._link_from_dict(link) for link in value
1050
                ]
1051
1052
            # Ex: current_path,
1053
            #     primary_path,
1054
            #     backup_path
1055
            if "path" in attribute and attribute != "dynamic_backup_path":
1056 1
                data[attribute] = Path(
1057 1
                    [self._link_from_dict(link) for link in value]
1058
                )
1059
1060
        return data
1061 1
1062
    def _evc_from_dict(self, evc_dict):
1063 1
        data = self._evc_dict_with_instances(evc_dict)
1064 1
        data["table_group"] = self.table_group
1065 1
        return EVC(self.controller, **data)
1066 1
1067
    def _uni_from_dict(self, uni_dict):
1068 1
        """Return a UNI object from python dict."""
1069
        if uni_dict is None:
1070 1
            return False
1071 1
1072
        interface_id = uni_dict.get("interface_id")
1073 1
        interface = self.controller.get_interface_by_id(interface_id)
1074 1
        if interface is None:
1075 1
            result = (
1076 1
                "Error creating UNI:"
1077
                + f"Could not instantiate interface {interface_id}"
1078
            )
1079
            raise ValueError(result) from ValueError
1080 1
        tag_convert = {1: "vlan"}
1081 1
        tag_dict = uni_dict.get("tag", None)
1082 1
        if tag_dict:
1083 1
            tag_type = tag_dict.get("tag_type")
1084 1
            tag_type = tag_convert.get(tag_type, tag_type)
1085 1
            tag_value = tag_dict.get("value")
1086 1
            if isinstance(tag_value, list):
1087 1
                tag_value = get_tag_ranges(tag_value)
1088 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1089 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1090 1
            else:
1091
                tag = TAG(tag_type, tag_value)
1092 1
        else:
1093
            tag = None
1094 1
        uni = UNI(interface, tag)
1095 1
        return uni
1096 1
1097
    def _link_from_dict(self, link_dict):
1098 1
        """Return a Link object from python dict."""
1099
        id_a = link_dict.get("endpoint_a").get("id")
1100 1
        id_b = link_dict.get("endpoint_b").get("id")
1101 1
1102
        endpoint_a = self.controller.get_interface_by_id(id_a)
1103 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1104 1
        if not endpoint_a:
1105 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1106 1
            raise ValueError(error_msg)
1107 1
        if not endpoint_b:
1108 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1109
            raise ValueError(error_msg)
1110
1111
        link = Link(endpoint_a, endpoint_b)
1112 1
        if "metadata" in link_dict:
1113 1
            link.extend_metadata(link_dict.get("metadata"))
1114 1
1115
        s_vlan = link.get_metadata("s_vlan")
1116 1
        if s_vlan:
1117 1
            tag = TAG.from_dict(s_vlan)
1118 1
            if tag is False:
1119 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1120
                raise ValueError(error_msg)
1121
            link.update_metadata("s_vlan", tag)
1122 1
        return link
1123 1
1124
    def _find_evc_by_schedule_id(self, schedule_id):
1125 1
        """
1126
        Find an EVC and CircuitSchedule based on schedule_id.
1127
1128
        :param schedule_id: Schedule ID
1129
        :return: EVC and Schedule
1130
        """
1131
        circuits = self._get_circuits_buffer()
1132 1
        found_schedule = None
1133 1
        evc = None
1134 1
1135
        # pylint: disable=unused-variable
1136
        for c_id, circuit in circuits.items():
1137 1
            for schedule in circuit.circuit_scheduler:
1138 1
                if schedule.id == schedule_id:
1139 1
                    found_schedule = schedule
1140 1
                    evc = circuit
1141 1
                    break
1142 1
            if found_schedule:
1143 1
                break
1144 1
        return evc, found_schedule
1145 1
1146
    def _get_circuits_buffer(self):
1147 1
        """
1148
        Return the circuit buffer.
1149
1150
        If the buffer is empty, try to load data from mongodb.
1151
        """
1152
        if not self.circuits:
1153 1
            # Load circuits from mongodb to buffer
1154
            circuits = self.mongo_controller.get_circuits()['circuits']
1155 1
            for c_id, circuit in circuits.items():
1156 1
                evc = self._evc_from_dict(circuit)
1157 1
                self.circuits[c_id] = evc
1158 1
        return self.circuits
1159 1
1160
    # pylint: disable=attribute-defined-outside-init
1161
    @alisten_to("kytos/of_multi_table.enable_table")
1162 1
    async def on_table_enabled(self, event):
1163 1
        """Handle a recently table enabled."""
1164
        table_group = event.content.get("mef_eline", None)
1165 1
        if not table_group:
1166 1
            return
1167 1
        for group in table_group:
1168 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1169 1
                log.error(f'The table group "{group}" is not allowed for '
1170 1
                          f'mef_eline. Allowed table groups are '
1171
                          f'{settings.TABLE_GROUP_ALLOWED}')
1172
                return
1173 1
        self.table_group.update(table_group)
1174 1
        content = {"group_table": self.table_group}
1175 1
        name = "kytos/mef_eline.enable_table"
1176
        await aemit_event(self.controller, name, content)
1177