Passed
Pull Request — master (#473)
by Vinicius
03:59
created

build.main.Main.list_schedules()   B

Complexity

Conditions 5

Size

Total Lines 31
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 5

Importance

Changes 0
Metric Value
cc 5
eloc 21
nop 2
dl 0
loc 31
ccs 18
cts 18
cp 1
crap 5
rs 8.9093
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from collections import defaultdict
10 1
from threading import Lock
11 1
from typing import Optional
12
13 1
from pydantic import ValidationError
14
15 1
from kytos.core import KytosNApp, log, rest
16 1
from kytos.core.events import KytosEvent
17 1
from kytos.core.exceptions import KytosTagError
18 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec, now,
19
                                validate_openapi)
20 1
from kytos.core.interface import TAG, UNI, TAGRange
21 1
from kytos.core.link import Link
22 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
23
                                 get_json_or_400)
24 1
from kytos.core.tag_ranges import get_tag_ranges
25 1
from napps.kytos.mef_eline import controllers, settings
26 1
from napps.kytos.mef_eline.exceptions import (DisabledSwitch,
27
                                              DuplicatedNoTagUNI, InvalidPath)
28 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
29
                                          Path)
30 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
31 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
32
                                         emit_event, get_vlan_tags_and_masks,
33
                                         map_evc_event_content)
34
35
36
# pylint: disable=too-many-public-methods
37 1
class Main(KytosNApp):
38
    """Main class of amlight/mef_eline NApp.
39
40
    This class is the entry point for this napp.
41
    """
42
43 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
44
45 1
    def setup(self):
46
        """Replace the '__init__' method for the KytosNApp subclass.
47
48
        The setup method is automatically called by the controller when your
49
        application is loaded.
50
51
        So, if you have any setup routine, insert it here.
52
        """
53
        # object used to scheduler circuit events
54 1
        self.sched = Scheduler()
55
56
        # object to save and load circuits
57 1
        self.mongo_controller = self.get_eline_controller()
58 1
        self.mongo_controller.bootstrap_indexes()
59
60
        # set the controller that will manager the dynamic paths
61 1
        DynamicPathManager.set_controller(self.controller)
62
63
        # dictionary of EVCs created. It acts as a circuit buffer.
64
        # Every create/update/delete must be synced to mongodb.
65 1
        self.circuits = {}
66
67 1
        self._intf_events = defaultdict(dict)
68 1
        self._lock_interfaces = defaultdict(Lock)
69 1
        self.table_group = {"epl": 0, "evpl": 0}
70 1
        self._lock = Lock()
71 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
72
73 1
        self.load_all_evcs()
74 1
        self._topology_updated_at = None
75
76 1
    def get_evcs_by_svc_level(self, enable_filter: bool = True) -> list:
77
        """Get circuits sorted by desc service level and asc creation_time.
78
79
        In the future, as more ops are offloaded it should be get from the DB.
80
        """
81 1
        if enable_filter:
82 1
            return sorted(
83
                          [circuit for circuit in self.circuits.values()
84
                           if circuit.is_enabled()],
85
                          key=lambda x: (-x.service_level, x.creation_time),
86
            )
87 1
        return sorted(self.circuits.values(),
88
                      key=lambda x: (-x.service_level, x.creation_time))
89
90 1
    @staticmethod
91 1
    def get_eline_controller():
92
        """Return the ELineController instance."""
93
        return controllers.ELineController()
94
95 1
    def execute(self):
96
        """Execute once when the napp is running."""
97 1
        if self._lock.locked():
98 1
            return
99 1
        log.debug("Starting consistency routine")
100 1
        with self._lock:
101 1
            self.execute_consistency()
102 1
        log.debug("Finished consistency routine")
103
104 1
    def should_be_checked(self, circuit):
105
        "Verify if the circuit meets the necessary conditions to be checked"
106
        # pylint: disable=too-many-boolean-expressions
107 1
        if (
108
                circuit.is_enabled()
109
                and not circuit.is_active()
110
                and not circuit.lock.locked()
111
                and not circuit.has_recent_removed_flow()
112
                and not circuit.is_recent_updated()
113
                and circuit.are_unis_active(self.controller.switches)
114
                # if a inter-switch EVC does not have current_path, it does not
115
                # make sense to run sdntrace on it
116
                and (circuit.is_intra_switch() or circuit.current_path)
117
                ):
118 1
            return True
119
        return False
120
121 1
    def execute_consistency(self):
122
        """Execute consistency routine."""
123 1
        circuits_to_check = []
124 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
125 1
        for circuit in self.get_evcs_by_svc_level(enable_filter=False):
126 1
            stored_circuits.pop(circuit.id, None)
127 1
            if self.should_be_checked(circuit):
128 1
                circuits_to_check.append(circuit)
129 1
            circuit.try_setup_failover_path()
130 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
131 1
        for circuit in circuits_to_check:
132 1
            is_checked = circuits_checked.get(circuit.id)
133 1
            if is_checked:
134 1
                circuit.execution_rounds = 0
135 1
                log.info(f"{circuit} enabled but inactive - activating")
136 1
                with circuit.lock:
137 1
                    circuit.activate()
138 1
                    circuit.sync()
139
            else:
140 1
                circuit.execution_rounds += 1
141 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
142 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
143 1
                    with circuit.lock:
144 1
                        circuit.deploy()
145 1
        for circuit_id in stored_circuits:
146 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
147 1
            self._load_evc(stored_circuits[circuit_id])
148
149 1
    def shutdown(self):
150
        """Execute when your napp is unloaded.
151
152
        If you have some cleanup procedure, insert it here.
153
        """
154
155 1
    @rest("/v2/evc/", methods=["GET"])
156 1
    def list_circuits(self, request: Request) -> JSONResponse:
157
        """Endpoint to return circuits stored.
158
159
        archive query arg if defined (not null) will be filtered
160
        accordingly, by default only non archived evcs will be listed
161
        """
162 1
        log.debug("list_circuits /v2/evc")
163 1
        args = request.query_params
164 1
        archived = args.get("archived", "false").lower()
165 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
166 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
167
                                                      metadata=args)
168 1
        circuits = circuits['circuits']
169 1
        return JSONResponse(circuits)
170
171 1
    @rest("/v2/evc/schedule", methods=["GET"])
172 1
    def list_schedules(self, _request: Request) -> JSONResponse:
173
        """Endpoint to return all schedules stored for all circuits.
174
175
        Return a JSON with the following template:
176
        [{"schedule_id": <schedule_id>,
177
         "circuit_id": <circuit_id>,
178
         "schedule": <schedule object>}]
179
        """
180 1
        log.debug("list_schedules /v2/evc/schedule")
181 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
182 1
        if not circuits:
183 1
            result = {}
184 1
            status = 200
185 1
            return JSONResponse(result, status_code=status)
186
187 1
        result = []
188 1
        status = 200
189 1
        for circuit in circuits:
190 1
            circuit_scheduler = circuit.get("circuit_scheduler")
191 1
            if circuit_scheduler:
192 1
                for scheduler in circuit_scheduler:
193 1
                    value = {
194
                        "schedule_id": scheduler.get("id"),
195
                        "circuit_id": circuit.get("id"),
196
                        "schedule": scheduler,
197
                    }
198 1
                    result.append(value)
199
200 1
        log.debug("list_schedules result %s %s", result, status)
201 1
        return JSONResponse(result, status_code=status)
202
203 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
204 1
    def get_circuit(self, request: Request) -> JSONResponse:
205
        """Endpoint to return a circuit based on id."""
206 1
        circuit_id = request.path_params["circuit_id"]
207 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
208 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
209 1
        if not circuit:
210 1
            result = f"circuit_id {circuit_id} not found"
211 1
            log.debug("get_circuit result %s %s", result, 404)
212 1
            raise HTTPException(404, detail=result)
213 1
        status = 200
214 1
        log.debug("get_circuit result %s %s", circuit, status)
215 1
        return JSONResponse(circuit, status_code=status)
216
217
    # pylint: disable=too-many-branches, too-many-statements
218 1
    @rest("/v2/evc/", methods=["POST"])
219 1
    @validate_openapi(spec)
220 1
    def create_circuit(self, request: Request) -> JSONResponse:
221
        """Try to create a new circuit.
222
223
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
224
        UNI_Z's requested C-VID are available from the interfaces' pools. This
225
        is checked when creating the UNI object.
226
227
        Then, E-Line NApp requests a primary and a backup path to the
228
        Pathfinder NApp using the attributes primary_links and backup_links
229
        submitted via REST
230
231
        # For each link composing paths in #3:
232
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
233
        #  - Using the S-VID obtained, generate abstract flow entries to be
234
        #    sent to FlowManager
235
236
        Push abstract flow entries to FlowManager and FlowManager pushes
237
        OpenFlow entries to datapaths
238
239
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
240
        creation
241
242
        Finnaly, notify user of the status of its request.
243
        """
244
        # Try to create the circuit object
245 1
        log.debug("create_circuit /v2/evc/")
246 1
        data = get_json_or_400(request, self.controller.loop)
247
248 1
        try:
249 1
            evc = self._evc_from_dict(data)
250 1
        except (ValueError, KytosTagError) as exception:
251 1
            log.debug("create_circuit result %s %s", exception, 400)
252 1
            raise HTTPException(400, detail=str(exception)) from exception
253 1
        try:
254 1
            check_disabled_component(evc.uni_a, evc.uni_z)
255 1
        except DisabledSwitch as exception:
256 1
            log.debug("create_circuit result %s %s", exception, 409)
257 1
            raise HTTPException(
258
                    409,
259
                    detail=f"Path is not valid: {exception}"
260
                ) from exception
261
262 1
        if evc.primary_path:
263 1
            try:
264 1
                evc.primary_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"primary_path is not valid: {exception}"
273
                ) from exception
274 1
        if evc.backup_path:
275 1
            try:
276 1
                evc.backup_path.is_valid(
277
                    evc.uni_a.interface.switch,
278
                    evc.uni_z.interface.switch,
279
                    bool(evc.circuit_scheduler),
280
                )
281 1
            except InvalidPath as exception:
282 1
                raise HTTPException(
283
                    400,
284
                    detail=f"backup_path is not valid: {exception}"
285
                ) from exception
286
287 1
        if not evc._tag_lists_equal():
288 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
289 1
            raise HTTPException(400, detail=detail)
290
291 1
        try:
292 1
            evc._validate_has_primary_or_dynamic()
293 1
        except ValueError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295
296 1
        try:
297 1
            self._check_no_tag_duplication(evc.id, evc.uni_a, evc.uni_z)
298
        except DuplicatedNoTagUNI as exception:
299
            log.debug("create_circuit result %s %s", exception, 409)
300
            raise HTTPException(409, detail=str(exception)) from exception
301
302 1
        try:
303 1
            self._use_uni_tags(evc)
304 1
        except KytosTagError as exception:
305 1
            raise HTTPException(400, detail=str(exception)) from exception
306
307
        # save circuit
308 1
        try:
309 1
            evc.sync()
310
        except ValidationError as exception:
311
            raise HTTPException(400, detail=str(exception)) from exception
312
313
        # store circuit in dictionary
314 1
        self.circuits[evc.id] = evc
315
316
        # Schedule the circuit deploy
317 1
        self.sched.add(evc)
318
319
        # Circuit has no schedule, deploy now
320 1
        if not evc.circuit_scheduler:
321 1
            with evc.lock:
322 1
                evc.deploy()
323
324
        # Notify users
325 1
        result = {"circuit_id": evc.id}
326 1
        status = 201
327 1
        log.debug("create_circuit result %s %s", result, status)
328 1
        emit_event(self.controller, name="created",
329
                   content=map_evc_event_content(evc))
330 1
        return JSONResponse(result, status_code=status)
331
332 1
    @staticmethod
333 1
    def _use_uni_tags(evc):
334 1
        uni_a = evc.uni_a
335 1
        evc._use_uni_vlan(uni_a)
336 1
        try:
337 1
            uni_z = evc.uni_z
338 1
            evc._use_uni_vlan(uni_z)
339 1
        except KytosTagError as err:
340 1
            evc.make_uni_vlan_available(uni_a)
341 1
            raise err
342
343 1
    @listen_to('kytos/flow_manager.flow.removed')
344 1
    def on_flow_delete(self, event):
345
        """Capture delete messages to keep track when flows got removed."""
346
        self.handle_flow_delete(event)
347
348 1
    def handle_flow_delete(self, event):
349
        """Keep track when the EVC got flows removed by deriving its cookie."""
350 1
        flow = event.content["flow"]
351 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
352 1
        if evc:
353 1
            log.debug("Flow removed in EVC %s", evc.id)
354 1
            evc.set_flow_removed_at()
355
356 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
357 1
    @validate_openapi(spec)
358 1
    def update(self, request: Request) -> JSONResponse:
359
        """Update a circuit based on payload.
360
361
        The EVC attributes (creation_time, active, current_path,
362
        failover_path, _id, archived) can't be updated.
363
        """
364 1
        data = get_json_or_400(request, self.controller.loop)
365 1
        circuit_id = request.path_params["circuit_id"]
366 1
        log.debug("update /v2/evc/%s", circuit_id)
367 1
        try:
368 1
            evc = self.circuits[circuit_id]
369 1
        except KeyError:
370 1
            result = f"circuit_id {circuit_id} not found"
371 1
            log.debug("update result %s %s", result, 404)
372 1
            raise HTTPException(404, detail=result) from KeyError
373
374 1
        try:
375 1
            updated_data = self._evc_dict_with_instances(data)
376 1
            self._check_no_tag_duplication(
377
                circuit_id, updated_data.get("uni_a"),
378
                updated_data.get("uni_z")
379
            )
380 1
            enable, redeploy = evc.update(**updated_data)
381 1
        except (ValueError, KytosTagError, ValidationError) as exception:
382 1
            log.debug("update result %s %s", exception, 400)
383 1
            raise HTTPException(400, detail=str(exception)) from exception
384 1
        except DuplicatedNoTagUNI as exception:
385
            log.debug("update result %s %s", exception, 409)
386
            raise HTTPException(409, detail=str(exception)) from exception
387 1
        except DisabledSwitch as exception:
388 1
            log.debug("update result %s %s", exception, 409)
389 1
            raise HTTPException(
390
                    409,
391
                    detail=f"Path is not valid: {exception}"
392
                ) from exception
393
394 1
        if evc.is_active():
395
            if enable is False:  # disable if active
396
                with evc.lock:
397
                    evc.remove()
398
            elif redeploy is not None:  # redeploy if active
399
                with evc.lock:
400
                    evc.remove()
401
                    evc.deploy()
402
        else:
403 1
            if enable is True:  # enable if inactive
404 1
                with evc.lock:
405 1
                    evc.deploy()
406 1
            elif evc.is_enabled() and redeploy:
407 1
                with evc.lock:
408 1
                    evc.remove()
409 1
                    evc.deploy()
410 1
        result = {evc.id: evc.as_dict()}
411 1
        status = 200
412
413 1
        log.debug("update result %s %s", result, status)
414 1
        emit_event(self.controller, "updated",
415
                   content=map_evc_event_content(evc, **data))
416 1
        return JSONResponse(result, status_code=status)
417
418 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
419 1
    def delete_circuit(self, request: Request) -> JSONResponse:
420
        """Remove a circuit.
421
422
        First, the flows are removed from the switches, and then the EVC is
423
        disabled.
424
        """
425 1
        circuit_id = request.path_params["circuit_id"]
426 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
427 1
        try:
428 1
            evc = self.circuits.pop(circuit_id)
429 1
        except KeyError:
430 1
            result = f"circuit_id {circuit_id} not found"
431 1
            log.debug("delete_circuit result %s %s", result, 404)
432 1
            raise HTTPException(404, detail=result) from KeyError
433
434 1
        log.info("Removing %s", evc)
435 1
        with evc.lock:
436 1
            evc.remove_current_flows()
437 1
            evc.remove_failover_flows(sync=False)
438 1
            evc.deactivate()
439 1
            evc.disable()
440 1
            self.sched.remove(evc)
441 1
            evc.archive()
442 1
            evc.remove_uni_tags()
443 1
            evc.sync()
444 1
        log.info("EVC removed. %s", evc)
445 1
        result = {"response": f"Circuit {circuit_id} removed"}
446 1
        status = 200
447
448 1
        log.debug("delete_circuit result %s %s", result, status)
449 1
        emit_event(self.controller, "deleted",
450
                   content=map_evc_event_content(evc))
451 1
        return JSONResponse(result, status_code=status)
452
453 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
454 1
    def get_metadata(self, request: Request) -> JSONResponse:
455
        """Get metadata from an EVC."""
456 1
        circuit_id = request.path_params["circuit_id"]
457 1
        try:
458 1
            return (
459
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
460
            )
461
        except KeyError as error:
462
            raise HTTPException(
463
                404,
464
                detail=f"circuit_id {circuit_id} not found."
465
            ) from error
466
467 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
468 1
    @validate_openapi(spec)
469 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
470
        """Add metadata to a bulk of EVCs."""
471 1
        data = get_json_or_400(request, self.controller.loop)
472 1
        circuit_ids = data.pop("circuit_ids")
473
474 1
        self.mongo_controller.update_evcs_metadata(circuit_ids, data, "add")
475
476 1
        fail_evcs = []
477 1
        for _id in circuit_ids:
478 1
            try:
479 1
                evc = self.circuits[_id]
480 1
                evc.extend_metadata(data)
481 1
            except KeyError:
482 1
                fail_evcs.append(_id)
483
484 1
        if fail_evcs:
485 1
            raise HTTPException(404, detail=fail_evcs)
486 1
        return JSONResponse("Operation successful", status_code=201)
487
488 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
489 1
    @validate_openapi(spec)
490 1
    def add_metadata(self, request: Request) -> JSONResponse:
491
        """Add metadata to an EVC."""
492 1
        circuit_id = request.path_params["circuit_id"]
493 1
        metadata = get_json_or_400(request, self.controller.loop)
494 1
        if not isinstance(metadata, dict):
495
            raise HTTPException(400, "Invalid metadata value: {metadata}")
496 1
        try:
497 1
            evc = self.circuits[circuit_id]
498 1
        except KeyError as error:
499 1
            raise HTTPException(
500
                404,
501
                detail=f"circuit_id {circuit_id} not found."
502
            ) from error
503
504 1
        evc.extend_metadata(metadata)
505 1
        evc.sync()
506 1
        return JSONResponse("Operation successful", status_code=201)
507
508 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
509 1
    @validate_openapi(spec)
510 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
511
        """Delete metada from a bulk of EVCs"""
512 1
        data = get_json_or_400(request, self.controller.loop)
513 1
        key = request.path_params["key"]
514 1
        circuit_ids = data.pop("circuit_ids")
515 1
        self.mongo_controller.update_evcs_metadata(
516
            circuit_ids, {key: ""}, "del"
517
        )
518
519 1
        fail_evcs = []
520 1
        for _id in circuit_ids:
521 1
            try:
522 1
                evc = self.circuits[_id]
523 1
                evc.remove_metadata(key)
524 1
            except KeyError:
525 1
                fail_evcs.append(_id)
526
527 1
        if fail_evcs:
528 1
            raise HTTPException(404, detail=fail_evcs)
529 1
        return JSONResponse("Operation successful")
530
531 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
532 1
    def delete_metadata(self, request: Request) -> JSONResponse:
533
        """Delete metadata from an EVC."""
534 1
        circuit_id = request.path_params["circuit_id"]
535 1
        key = request.path_params["key"]
536 1
        try:
537 1
            evc = self.circuits[circuit_id]
538 1
        except KeyError as error:
539 1
            raise HTTPException(
540
                404,
541
                detail=f"circuit_id {circuit_id} not found."
542
            ) from error
543
544 1
        evc.remove_metadata(key)
545 1
        evc.sync()
546 1
        return JSONResponse("Operation successful")
547
548 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
549 1
    def redeploy(self, request: Request) -> JSONResponse:
550
        """Endpoint to force the redeployment of an EVC."""
551 1
        circuit_id = request.path_params["circuit_id"]
552 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
553 1
        try:
554 1
            evc = self.circuits[circuit_id]
555 1
        except KeyError:
556 1
            raise HTTPException(
557
                404,
558
                detail=f"circuit_id {circuit_id} not found"
559
            ) from KeyError
560 1
        if evc.is_enabled():
561 1
            with evc.lock:
562 1
                evc.remove_failover_flows(sync=False)
563 1
                evc.remove_current_flows(sync=True)
564 1
                evc.deploy()
565 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
566 1
            status = 202
567
        else:
568 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
569 1
            status = 409
570
571 1
        return JSONResponse(result, status_code=status)
572
573 1
    @rest("/v2/evc/schedule/", methods=["POST"])
574 1
    @validate_openapi(spec)
575 1
    def create_schedule(self, request: Request) -> JSONResponse:
576
        """
577
        Create a new schedule for a given circuit.
578
579
        This service do no check if there are conflicts with another schedule.
580
        Payload example:
581
            {
582
              "circuit_id":"aa:bb:cc",
583
              "schedule": {
584
                "date": "2019-08-07T14:52:10.967Z",
585
                "interval": "string",
586
                "frequency": "1 * * * *",
587
                "action": "create"
588
              }
589
            }
590
        """
591 1
        log.debug("create_schedule /v2/evc/schedule/")
592 1
        data = get_json_or_400(request, self.controller.loop)
593 1
        circuit_id = data["circuit_id"]
594 1
        schedule_data = data["schedule"]
595
596
        # Get EVC from circuits buffer
597 1
        circuits = self._get_circuits_buffer()
598
599
        # get the circuit
600 1
        evc = circuits.get(circuit_id)
601
602
        # get the circuit
603 1
        if not evc:
604 1
            result = f"circuit_id {circuit_id} not found"
605 1
            log.debug("create_schedule result %s %s", result, 404)
606 1
            raise HTTPException(404, detail=result)
607
608
        # new schedule from dict
609 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
610
611
        # If there is no schedule, create the list
612 1
        if not evc.circuit_scheduler:
613 1
            evc.circuit_scheduler = []
614
615
        # Add the new schedule
616 1
        evc.circuit_scheduler.append(new_schedule)
617
618
        # Add schedule job
619 1
        self.sched.add_circuit_job(evc, new_schedule)
620
621
        # save circuit to mongodb
622 1
        evc.sync()
623
624 1
        result = new_schedule.as_dict()
625 1
        status = 201
626
627 1
        log.debug("create_schedule result %s %s", result, status)
628 1
        return JSONResponse(result, status_code=status)
629
630 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
631 1
    @validate_openapi(spec)
632 1
    def update_schedule(self, request: Request) -> JSONResponse:
633
        """Update a schedule.
634
635
        Change all attributes from the given schedule from a EVC circuit.
636
        The schedule ID is preserved as default.
637
        Payload example:
638
            {
639
              "date": "2019-08-07T14:52:10.967Z",
640
              "interval": "string",
641
              "frequency": "1 * * *",
642
              "action": "create"
643
            }
644
        """
645 1
        data = get_json_or_400(request, self.controller.loop)
646 1
        schedule_id = request.path_params["schedule_id"]
647 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
648
649
        # Try to find a circuit schedule
650 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
651
652
        # Can not modify circuits deleted and archived
653 1
        if not found_schedule:
654 1
            result = f"schedule_id {schedule_id} not found"
655 1
            log.debug("update_schedule result %s %s", result, 404)
656 1
            raise HTTPException(404, detail=result)
657
658 1
        new_schedule = CircuitSchedule.from_dict(data)
659 1
        new_schedule.id = found_schedule.id
660
        # Remove the old schedule
661 1
        evc.circuit_scheduler.remove(found_schedule)
662
        # Append the modified schedule
663 1
        evc.circuit_scheduler.append(new_schedule)
664
665
        # Cancel all schedule jobs
666 1
        self.sched.cancel_job(found_schedule.id)
667
        # Add the new circuit schedule
668 1
        self.sched.add_circuit_job(evc, new_schedule)
669
        # Save EVC to mongodb
670 1
        evc.sync()
671
672 1
        result = new_schedule.as_dict()
673 1
        status = 200
674
675 1
        log.debug("update_schedule result %s %s", result, status)
676 1
        return JSONResponse(result, status_code=status)
677
678 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
679 1
    def delete_schedule(self, request: Request) -> JSONResponse:
680
        """Remove a circuit schedule.
681
682
        Remove the Schedule from EVC.
683
        Remove the Schedule from cron job.
684
        Save the EVC to the Storehouse.
685
        """
686 1
        schedule_id = request.path_params["schedule_id"]
687 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
688 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
689
690
        # Can not modify circuits deleted and archived
691 1
        if not found_schedule:
692 1
            result = f"schedule_id {schedule_id} not found"
693 1
            log.debug("delete_schedule result %s %s", result, 404)
694 1
            raise HTTPException(404, detail=result)
695
696
        # Remove the old schedule
697 1
        evc.circuit_scheduler.remove(found_schedule)
698
699
        # Cancel all schedule jobs
700 1
        self.sched.cancel_job(found_schedule.id)
701
        # Save EVC to mongodb
702 1
        evc.sync()
703
704 1
        result = "Schedule removed"
705 1
        status = 200
706
707 1
        log.debug("delete_schedule result %s %s", result, status)
708 1
        return JSONResponse(result, status_code=status)
709
710 1
    def _check_no_tag_duplication(
711
        self,
712
        evc_id: str,
713
        uni_a: Optional[UNI] = None,
714
        uni_z: Optional[UNI] = None
715
    ):
716
        """Check if the given EVC has UNIs with no tag and if these are
717
         duplicated. Raise DuplicatedNoTagUNI if duplication is found.
718
        Args:
719
            evc (dict): EVC to be analyzed.
720
        """
721
722
        # No UNIs
723 1
        if not (uni_a or uni_z):
724 1
            return
725
726 1
        if (not (uni_a and not uni_a.user_tag) and
727
                not (uni_z and not uni_z.user_tag)):
728 1
            return
729 1
        for circuit in self.circuits.copy().values():
730 1
            if (not circuit.archived and circuit._id != evc_id):
731 1
                if uni_a and uni_a.user_tag is None:
732 1
                    circuit.check_no_tag_duplicate(uni_a)
733 1
                if uni_z and uni_z.user_tag is None:
734 1
                    circuit.check_no_tag_duplicate(uni_z)
735
736 1
    @listen_to("kytos/topology.link_up")
737 1
    def on_link_up(self, event):
738
        """Change circuit when link is up or end_maintenance."""
739
        self.handle_link_up(event)
740
741 1
    def handle_link_up(self, event):
742
        """Change circuit when link is up or end_maintenance."""
743 1
        log.info("Event handle_link_up %s", event.content["link"])
744 1
        for evc in self.get_evcs_by_svc_level():
745 1
            if evc.is_enabled() and not evc.archived:
746 1
                with evc.lock:
747 1
                    evc.handle_link_up(event.content["link"])
748
749
    # Possibly replace this with interruptions?
750 1
    @listen_to(
751
        '.*.switch.interface.(link_up|link_down|created|deleted)'
752
    )
753 1
    def on_interface_link_change(self, event: KytosEvent):
754
        """
755
        Handler for interface link_up and link_down events.
756
        """
757
        self.handle_on_interface_link_change(event)
758
759 1
    def handle_on_interface_link_change(self, event: KytosEvent):
760
        """
761
        Handler to sort interface events {link_(up, down), create, deleted}
762
763
        To avoid multiple database updated (link flap):
764
        Every interface is identfied and processed in parallel.
765
        Once an interface event is received a time is started.
766
        While time is running self._intf_events will be updated.
767
        After time has passed last received event will be processed.
768
        """
769 1
        iface = event.content.get("interface")
770 1
        with self._lock_interfaces[iface.id]:
771 1
            _now = event.timestamp
772
            # Return out of order events
773 1
            if (
774
                iface.id in self._intf_events
775
                and self._intf_events[iface.id]["event"].timestamp > _now
776
            ):
777 1
                return
778 1
            self._intf_events[iface.id].update({"event": event})
779 1
            if "last_acquired" in self._intf_events[iface.id]:
780 1
                return
781 1
            self._intf_events[iface.id].update({"last_acquired": now()})
782 1
        time.sleep(settings.UNI_STATE_CHANGE_DELAY)
783 1
        with self._lock_interfaces[iface.id]:
784 1
            event = self._intf_events[iface.id]["event"]
785 1
            self._intf_events[iface.id].pop('last_acquired', None)
786 1
            _, _, event_type = event.name.rpartition('.')
787 1
            if event_type in ('link_up', 'created'):
788 1
                self.handle_interface_link_up(iface)
789 1
            elif event_type in ('link_down', 'deleted'):
790 1
                self.handle_interface_link_down(iface)
791
792 1
    def handle_interface_link_up(self, interface):
793
        """
794
        Handler for interface link_up events
795
        """
796
        for evc in self.get_evcs_by_svc_level():
797
            with evc.lock:
798
                log.info("Event handle_interface_link_up %s", interface)
799
                evc.handle_interface_link_up(
800
                    interface
801
                )
802
803 1
    def handle_interface_link_down(self, interface):
804
        """
805
        Handler for interface link_down events
806
        """
807
        log.info("Event handle_interface_link_down %s", interface)
808
        for evc in self.get_evcs_by_svc_level():
809
            with evc.lock:
810
                evc.handle_interface_link_down(
811
                    interface
812
                )
813
814 1
    @listen_to("kytos/topology.link_down", pool="dynamic_single")
815 1
    def on_link_down(self, event):
816
        """Change circuit when link is down or under_mantenance."""
817
        self.handle_link_down(event)
818
819
    # pylint: disable=too-many-branches
820
    # pylint: disable=too-many-locals
821 1
    def handle_link_down(self, event):
822
        """Change circuit when link is down or under_mantenance."""
823 1
        link = event.content["link"]
824 1
        log.info("Event handle_link_down %s", link)
825 1
        switch_flows = {}
826 1
        evcs_with_failover = []
827 1
        evcs_normal = []
828 1
        check_failover = []
829 1
        failover_event_contents = {}
830
831 1
        for evc in self.get_evcs_by_svc_level():
832 1
            with evc.lock:
833 1
                if evc.is_affected_by_link(link):
834 1
                    evc.affected_by_link_at = event.timestamp
835
                    # if there is no failover path, handles link down the
836
                    # tradditional way
837 1
                    if (
838
                        not evc.failover_path or
839
                        evc.is_failover_path_affected_by_link(link)
840
                    ):
841 1
                        evcs_normal.append(evc)
842 1
                        continue
843 1
                    try:
844 1
                        dpid_flows = evc.get_failover_flows()
845 1
                        evc.old_path = evc.current_path
846 1
                        evc.current_path = evc.failover_path
847 1
                        evc.failover_path = Path([])
848
                    # pylint: disable=broad-except
849 1
                    except Exception:
850 1
                        err = traceback.format_exc().replace("\n", ", ")
851 1
                        log.error(
852
                            "Ignore Failover path for "
853
                            f"{evc} due to error: {err}"
854
                        )
855 1
                        evcs_normal.append(evc)
856 1
                        continue
857 1
                    for dpid, flows in dpid_flows.items():
858 1
                        switch_flows.setdefault(dpid, [])
859 1
                        switch_flows[dpid].extend(flows)
860 1
                    evcs_with_failover.append(evc)
861 1
                    failover_event_contents[evc.id] = map_evc_event_content(
862
                        evc,
863
                        flows={k: v.copy() for k, v in switch_flows.items()}
864
                    )
865 1
                elif evc.is_failover_path_affected_by_link(link):
866 1
                    evc.old_path = evc.failover_path
867 1
                    evc.failover_path = Path([])
868 1
                    check_failover.append(evc)
869
870 1
        if failover_event_contents:
871 1
            emit_event(self.controller, "failover_link_down",
872
                       content=failover_event_contents)
873
874 1
        while switch_flows:
875 1
            offset = settings.BATCH_SIZE or None
876 1
            switches = list(switch_flows.keys())
877 1
            for dpid in switches:
878 1
                emit_event(
879
                    self.controller,
880
                    context="kytos.flow_manager",
881
                    name="flows.install",
882
                    content={
883
                        "dpid": dpid,
884
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
885
                    }
886
                )
887 1
                if offset is None or offset >= len(switch_flows[dpid]):
888 1
                    del switch_flows[dpid]
889 1
                    continue
890 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
891 1
            time.sleep(settings.BATCH_INTERVAL)
892
893 1
        for evc in evcs_normal:
894 1
            emit_event(
895
                self.controller,
896
                "evc_affected_by_link_down",
897
                content={"link": link} | map_evc_event_content(evc)
898
            )
899
900 1
        evcs_to_update = []
901 1
        for evc in evcs_with_failover:
902 1
            evcs_to_update.append(evc.as_dict())
903 1
            log.info(
904
                f"{evc} redeployed with failover due to link down {link.id}"
905
            )
906 1
        for evc in check_failover:
907 1
            evcs_to_update.append(evc.as_dict())
908
909 1
        self.mongo_controller.update_evcs(evcs_to_update)
910
911 1
        emit_event(
912
            self.controller,
913
            "cleanup_evcs_old_path",
914
            content={"evcs": evcs_with_failover + check_failover}
915
        )
916
917 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
918 1
    def on_evc_affected_by_link_down(self, event):
919
        """Change circuit when link is down or under_mantenance."""
920
        self.handle_evc_affected_by_link_down(event)
921
922 1
    def handle_evc_affected_by_link_down(self, event):
923
        """Change circuit when link is down or under_mantenance."""
924 1
        evc = self.circuits.get(event.content["evc_id"])
925 1
        link = event.content['link']
926 1
        if not evc:
927 1
            return
928 1
        with evc.lock:
929 1
            if not evc.is_affected_by_link(link):
930
                return
931 1
            result = evc.handle_link_down()
932 1
        event_name = "error_redeploy_link_down"
933 1
        if result:
934 1
            log.info(f"{evc} redeployed due to link down {link.id}")
935 1
            event_name = "redeployed_link_down"
936 1
        emit_event(self.controller, event_name,
937
                   content=map_evc_event_content(evc))
938
939 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
940 1
    def on_evc_deployed(self, event):
941
        """Handle EVC deployed|redeployed_link_down."""
942
        self.handle_evc_deployed(event)
943
944 1
    def handle_evc_deployed(self, event):
945
        """Setup failover path on evc deployed."""
946
        evc = self.circuits.get(event.content["evc_id"])
947
        if not evc:
948
            return
949
        evc.try_setup_failover_path()
950
951 1
    @listen_to("kytos/mef_eline.cleanup_evcs_old_path")
952 1
    def on_cleanup_evcs_old_path(self, event):
953
        """Handle cleanup evcs old path."""
954
        self.handle_cleanup_evcs_old_path(event)
955
956 1
    def handle_cleanup_evcs_old_path(self, event):
957
        """Handle cleanup evcs old path."""
958 1
        evcs = event.content.get("evcs", [])
959 1
        event_contents: dict[str, list[dict]] = {}
960 1
        for evc in evcs:
961 1
            if not evc.old_path:
962 1
                continue
963 1
            removed_flows = evc.remove_path_flows(evc.old_path)
964 1
            content = map_evc_event_content(
965
                evc,
966
                removed_flows=removed_flows,
967
                current_path=evc.current_path.as_dict(),
968
            )
969 1
            event_contents[evc.id] = content
970 1
            evc.old_path = Path([])
971 1
        if event_contents:
972 1
            emit_event(self.controller, "failover_old_path",
973
                       content=event_contents)
974
975 1
    @listen_to("kytos/topology.topology_loaded")
976 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
977
        """Load EVCs once the topology is available."""
978
        self.load_all_evcs()
979
980 1
    def load_all_evcs(self):
981
        """Try to load all EVCs on startup."""
982 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
983 1
        for circuit_id, circuit in circuits:
984 1
            if circuit_id not in self.circuits:
985 1
                self._load_evc(circuit)
986 1
        emit_event(self.controller, "evcs_loaded", content=dict(circuits),
987
                   timeout=1)
988
989 1
    def _load_evc(self, circuit_dict):
990
        """Load one EVC from mongodb to memory."""
991 1
        try:
992 1
            evc = self._evc_from_dict(circuit_dict)
993 1
        except (ValueError, KytosTagError) as exception:
994 1
            log.error(
995
                f"Could not load EVC: dict={circuit_dict} error={exception}"
996
            )
997 1
            return None
998 1
        if evc.archived:
999 1
            return None
1000
1001 1
        self.circuits.setdefault(evc.id, evc)
1002 1
        self.sched.add(evc)
1003 1
        return evc
1004
1005 1
    @listen_to("kytos/flow_manager.flow.error")
1006 1
    def on_flow_mod_error(self, event):
1007
        """Handle flow mod errors related to an EVC."""
1008
        self.handle_flow_mod_error(event)
1009
1010 1
    def handle_flow_mod_error(self, event):
1011
        """Handle flow mod errors related to an EVC."""
1012 1
        flow = event.content["flow"]
1013 1
        command = event.content.get("error_command")
1014 1
        if command != "add":
1015
            return
1016 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
1017 1
        if evc:
1018 1
            with evc.lock:
1019 1
                evc.remove_current_flows()
1020
1021 1
    def _evc_dict_with_instances(self, evc_dict):
1022
        """Convert some dict values to instance of EVC classes.
1023
1024
        This method will convert: [UNI, Link]
1025
        """
1026 1
        data = evc_dict.copy()  # Do not modify the original dict
1027 1
        for attribute, value in data.items():
1028
            # Get multiple attributes.
1029
            # Ex: uni_a, uni_z
1030 1
            if "uni" in attribute:
1031 1
                try:
1032 1
                    data[attribute] = self._uni_from_dict(value)
1033 1
                except ValueError as exception:
1034 1
                    result = "Error creating UNI: Invalid value"
1035 1
                    raise ValueError(result) from exception
1036
1037 1
            if attribute == "circuit_scheduler":
1038 1
                data[attribute] = []
1039 1
                for schedule in value:
1040 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
1041
1042
            # Get multiple attributes.
1043
            # Ex: primary_links,
1044
            #     backup_links,
1045
            #     current_links_cache,
1046
            #     primary_links_cache,
1047
            #     backup_links_cache
1048 1
            if "links" in attribute:
1049 1
                data[attribute] = [
1050
                    self._link_from_dict(link) for link in value
1051
                ]
1052
1053
            # Ex: current_path,
1054
            #     primary_path,
1055
            #     backup_path
1056 1
            if "path" in attribute and attribute != "dynamic_backup_path":
1057 1
                data[attribute] = Path(
1058
                    [self._link_from_dict(link) for link in value]
1059
                )
1060
1061 1
        return data
1062
1063 1
    def _evc_from_dict(self, evc_dict):
1064 1
        data = self._evc_dict_with_instances(evc_dict)
1065 1
        data["table_group"] = self.table_group
1066 1
        return EVC(self.controller, **data)
1067
1068 1
    def _uni_from_dict(self, uni_dict):
1069
        """Return a UNI object from python dict."""
1070 1
        if uni_dict is None:
1071 1
            return False
1072
1073 1
        interface_id = uni_dict.get("interface_id")
1074 1
        interface = self.controller.get_interface_by_id(interface_id)
1075 1
        if interface is None:
1076 1
            result = (
1077
                "Error creating UNI:"
1078
                + f"Could not instantiate interface {interface_id}"
1079
            )
1080 1
            raise ValueError(result) from ValueError
1081 1
        tag_convert = {1: "vlan"}
1082 1
        tag_dict = uni_dict.get("tag", None)
1083 1
        if tag_dict:
1084 1
            tag_type = tag_dict.get("tag_type")
1085 1
            tag_type = tag_convert.get(tag_type, tag_type)
1086 1
            tag_value = tag_dict.get("value")
1087 1
            if isinstance(tag_value, list):
1088 1
                tag_value = get_tag_ranges(tag_value)
1089 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1090 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1091
            else:
1092 1
                tag = TAG(tag_type, tag_value)
1093
        else:
1094 1
            tag = None
1095 1
        uni = UNI(interface, tag)
1096 1
        return uni
1097
1098 1
    def _link_from_dict(self, link_dict):
1099
        """Return a Link object from python dict."""
1100 1
        id_a = link_dict.get("endpoint_a").get("id")
1101 1
        id_b = link_dict.get("endpoint_b").get("id")
1102
1103 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1104 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1105 1
        if not endpoint_a:
1106 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1107 1
            raise ValueError(error_msg)
1108 1
        if not endpoint_b:
1109
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1110
            raise ValueError(error_msg)
1111
1112 1
        link = Link(endpoint_a, endpoint_b)
1113 1
        if "metadata" in link_dict:
1114 1
            link.extend_metadata(link_dict.get("metadata"))
1115
1116 1
        s_vlan = link.get_metadata("s_vlan")
1117 1
        if s_vlan:
1118 1
            tag = TAG.from_dict(s_vlan)
1119 1
            if tag is False:
1120
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1121
                raise ValueError(error_msg)
1122 1
            link.update_metadata("s_vlan", tag)
1123 1
        return link
1124
1125 1
    def _find_evc_by_schedule_id(self, schedule_id):
1126
        """
1127
        Find an EVC and CircuitSchedule based on schedule_id.
1128
1129
        :param schedule_id: Schedule ID
1130
        :return: EVC and Schedule
1131
        """
1132 1
        circuits = self._get_circuits_buffer()
1133 1
        found_schedule = None
1134 1
        evc = None
1135
1136
        # pylint: disable=unused-variable
1137 1
        for c_id, circuit in circuits.items():
1138 1
            for schedule in circuit.circuit_scheduler:
1139 1
                if schedule.id == schedule_id:
1140 1
                    found_schedule = schedule
1141 1
                    evc = circuit
1142 1
                    break
1143 1
            if found_schedule:
1144 1
                break
1145 1
        return evc, found_schedule
1146
1147 1
    def _get_circuits_buffer(self):
1148
        """
1149
        Return the circuit buffer.
1150
1151
        If the buffer is empty, try to load data from mongodb.
1152
        """
1153 1
        if not self.circuits:
1154
            # Load circuits from mongodb to buffer
1155 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1156 1
            for c_id, circuit in circuits.items():
1157 1
                evc = self._evc_from_dict(circuit)
1158 1
                self.circuits[c_id] = evc
1159 1
        return self.circuits
1160
1161
    # pylint: disable=attribute-defined-outside-init
1162 1
    @alisten_to("kytos/of_multi_table.enable_table")
1163 1
    async def on_table_enabled(self, event):
1164
        """Handle a recently table enabled."""
1165 1
        table_group = event.content.get("mef_eline", None)
1166 1
        if not table_group:
1167 1
            return
1168 1
        for group in table_group:
1169 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1170 1
                log.error(f'The table group "{group}" is not allowed for '
1171
                          f'mef_eline. Allowed table groups are '
1172
                          f'{settings.TABLE_GROUP_ALLOWED}')
1173 1
                return
1174 1
        self.table_group.update(table_group)
1175 1
        content = {"group_table": self.table_group}
1176 1
        name = "kytos/mef_eline.enable_table"
1177
        await aemit_event(self.controller, name, content)
1178