Passed
Pull Request — master (#371)
by Vinicius
14:40 queued 10:44
created

build.main.Main.delete_schedule()   A

Complexity

Conditions 3

Size

Total Lines 36
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 3

Importance

Changes 0
Metric Value
cc 3
eloc 20
nop 2
dl 0
loc 36
ccs 20
cts 20
cp 1
crap 3
rs 9.4
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
15
                                validate_openapi)
16 1
from kytos.core.interface import TAG, UNI
17 1
from kytos.core.link import Link
18 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
19
                                 get_json_or_400)
20 1
from napps.kytos.mef_eline import controllers, settings
21 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
22 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
23
                                          Path)
24 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
25 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
26
                                         emit_event, map_evc_event_content)
27
28
29
# pylint: disable=too-many-public-methods
30 1
class Main(KytosNApp):
31
    """Main class of amlight/mef_eline NApp.
32
33
    This class is the entry point for this napp.
34
    """
35
36 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
37
38 1
    def setup(self):
39
        """Replace the '__init__' method for the KytosNApp subclass.
40
41
        The setup method is automatically called by the controller when your
42
        application is loaded.
43
44
        So, if you have any setup routine, insert it here.
45
        """
46
        # object used to scheduler circuit events
47 1
        self.sched = Scheduler()
48
49
        # object to save and load circuits
50 1
        self.mongo_controller = self.get_eline_controller()
51 1
        self.mongo_controller.bootstrap_indexes()
52
53
        # set the controller that will manager the dynamic paths
54 1
        DynamicPathManager.set_controller(self.controller)
55
56
        # dictionary of EVCs created. It acts as a circuit buffer.
57
        # Every create/update/delete must be synced to mongodb.
58 1
        self.circuits = {}
59
60 1
        self.table_group = {"epl": 0, "evpl": 0}
61 1
        self._lock = Lock()
62 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
63
64 1
        self.load_all_evcs()
65 1
        self._topology_updated_at = None
66
67 1
    def get_evcs_by_svc_level(self) -> list:
68
        """Get circuits sorted by desc service level and asc creation_time.
69
70
        In the future, as more ops are offloaded it should be get from the DB.
71
        """
72 1
        return sorted(self.circuits.values(),
73
                      key=lambda x: (-x.service_level, x.creation_time))
74
75 1
    @staticmethod
76 1
    def get_eline_controller():
77
        """Return the ELineController instance."""
78
        return controllers.ELineController()
79
80 1
    def execute(self):
81
        """Execute once when the napp is running."""
82 1
        if self._lock.locked():
83 1
            return
84 1
        log.debug("Starting consistency routine")
85 1
        with self._lock:
86 1
            self.execute_consistency()
87 1
        log.debug("Finished consistency routine")
88
89 1
    def should_be_checked(self, circuit):
90
        "Verify if the circuit meets the necessary conditions to be checked"
91
        # pylint: disable=too-many-boolean-expressions
92 1
        if (
93
                circuit.is_enabled()
94
                and not circuit.is_active()
95
                and not circuit.lock.locked()
96
                and not circuit.has_recent_removed_flow()
97
                and not circuit.is_recent_updated()
98
                and circuit.are_unis_active(self.controller.switches)
99
                # if a inter-switch EVC does not have current_path, it does not
100
                # make sense to run sdntrace on it
101
                and (circuit.is_intra_switch() or circuit.current_path)
102
                ):
103 1
            return True
104
        return False
105
106 1
    def execute_consistency(self):
107
        """Execute consistency routine."""
108 1
        circuits_to_check = []
109 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
110 1
        for circuit in self.get_evcs_by_svc_level():
111 1
            stored_circuits.pop(circuit.id, None)
112 1
            if self.should_be_checked(circuit):
113 1
                circuits_to_check.append(circuit)
114 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
115 1
        for circuit in circuits_to_check:
116 1
            is_checked = circuits_checked.get(circuit.id)
117 1
            if is_checked:
118 1
                circuit.execution_rounds = 0
119 1
                log.info(f"{circuit} enabled but inactive - activating")
120 1
                with circuit.lock:
121 1
                    circuit.activate()
122 1
                    circuit.sync()
123
            else:
124 1
                circuit.execution_rounds += 1
125 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
126 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
127 1
                    with circuit.lock:
128 1
                        circuit.deploy()
129 1
        for circuit_id in stored_circuits:
130 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
131 1
            self._load_evc(stored_circuits[circuit_id])
132
133 1
    def shutdown(self):
134
        """Execute when your napp is unloaded.
135
136
        If you have some cleanup procedure, insert it here.
137
        """
138
139 1
    @rest("/v2/evc/", methods=["GET"])
140 1
    def list_circuits(self, request: Request) -> JSONResponse:
141
        """Endpoint to return circuits stored.
142
143
        archive query arg if defined (not null) will be filtered
144
        accordingly, by default only non archived evcs will be listed
145
        """
146 1
        log.debug("list_circuits /v2/evc")
147 1
        args = request.query_params
148 1
        archived = args.get("archived", "false").lower()
149 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
150 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
151
                                                      metadata=args)
152 1
        circuits = circuits['circuits']
153 1
        return JSONResponse(circuits)
154
155 1
    @rest("/v2/evc/schedule", methods=["GET"])
156 1
    def list_schedules(self, _request: Request) -> JSONResponse:
157
        """Endpoint to return all schedules stored for all circuits.
158
159
        Return a JSON with the following template:
160
        [{"schedule_id": <schedule_id>,
161
         "circuit_id": <circuit_id>,
162
         "schedule": <schedule object>}]
163
        """
164 1
        log.debug("list_schedules /v2/evc/schedule")
165 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
166 1
        if not circuits:
167 1
            result = {}
168 1
            status = 200
169 1
            return JSONResponse(result, status_code=status)
170
171 1
        result = []
172 1
        status = 200
173 1
        for circuit in circuits:
174 1
            circuit_scheduler = circuit.get("circuit_scheduler")
175 1
            if circuit_scheduler:
176 1
                for scheduler in circuit_scheduler:
177 1
                    value = {
178
                        "schedule_id": scheduler.get("id"),
179
                        "circuit_id": circuit.get("id"),
180
                        "schedule": scheduler,
181
                    }
182 1
                    result.append(value)
183
184 1
        log.debug("list_schedules result %s %s", result, status)
185 1
        return JSONResponse(result, status_code=status)
186
187 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
188 1
    def get_circuit(self, request: Request) -> JSONResponse:
189
        """Endpoint to return a circuit based on id."""
190 1
        circuit_id = request.path_params["circuit_id"]
191 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
192 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
193 1
        if not circuit:
194 1
            result = f"circuit_id {circuit_id} not found"
195 1
            log.debug("get_circuit result %s %s", result, 404)
196 1
            raise HTTPException(404, detail=result)
197 1
        status = 200
198 1
        log.debug("get_circuit result %s %s", circuit, status)
199 1
        return JSONResponse(circuit, status_code=status)
200
201 1
    @rest("/v2/evc/", methods=["POST"])
202 1
    @validate_openapi(spec)
203 1
    def create_circuit(self, request: Request) -> JSONResponse:
204
        """Try to create a new circuit.
205
206
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
207
        UNI_Z's requested C-VID are available from the interfaces' pools. This
208
        is checked when creating the UNI object.
209
210
        Then, E-Line NApp requests a primary and a backup path to the
211
        Pathfinder NApp using the attributes primary_links and backup_links
212
        submitted via REST
213
214
        # For each link composing paths in #3:
215
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
216
        #  - Using the S-VID obtained, generate abstract flow entries to be
217
        #    sent to FlowManager
218
219
        Push abstract flow entries to FlowManager and FlowManager pushes
220
        OpenFlow entries to datapaths
221
222
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
223
        creation
224
225
        Finnaly, notify user of the status of its request.
226
        """
227
        # Try to create the circuit object
228 1
        log.debug("create_circuit /v2/evc/")
229 1
        data = get_json_or_400(request, self.controller.loop)
230
231 1
        try:
232 1
            evc = self._evc_from_dict(data)
233 1
        except ValueError as exception:
234 1
            log.debug("create_circuit result %s %s", exception, 400)
235 1
            raise HTTPException(400, detail=str(exception)) from exception
236
237 1
        try:
238 1
            check_disabled_component(evc.uni_a, evc.uni_z)
239 1
        except DisabledSwitch as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 409)
241 1
            raise HTTPException(
242
                    409,
243
                    detail=f"Path is not valid: {exception}"
244
                ) from exception
245
246 1
        if evc.primary_path:
247
            try:
248
                evc.primary_path.is_valid(
249
                    evc.uni_a.interface.switch,
250
                    evc.uni_z.interface.switch,
251
                    bool(evc.circuit_scheduler),
252
                )
253
            except InvalidPath as exception:
254
                raise HTTPException(
255
                    400,
256
                    detail=f"primary_path is not valid: {exception}"
257
                ) from exception
258 1
        if evc.backup_path:
259
            try:
260
                evc.backup_path.is_valid(
261
                    evc.uni_a.interface.switch,
262
                    evc.uni_z.interface.switch,
263
                    bool(evc.circuit_scheduler),
264
                )
265
            except InvalidPath as exception:
266
                raise HTTPException(
267
                    400,
268
                    detail=f"backup_path is not valid: {exception}"
269
                ) from exception
270
271
        # verify duplicated evc
272 1
        if self._is_duplicated_evc(evc):
273 1
            result = "The EVC already exists."
274 1
            log.debug("create_circuit result %s %s", result, 409)
275 1
            raise HTTPException(409, detail=result)
276
277 1
        try:
278 1
            evc._validate_has_primary_or_dynamic()
279 1
        except ValueError as exception:
280 1
            raise HTTPException(400, detail=str(exception)) from exception
281
282 1
        try:
283 1
            self._use_uni_tags(evc)
284
        except ValueError as exception:
285
            raise HTTPException(400, detail=str(exception)) from exception
286
287
        # save circuit
288 1
        try:
289 1
            evc.sync()
290
        except ValidationError as exception:
291
            raise HTTPException(400, detail=str(exception)) from exception
292
293
        # store circuit in dictionary
294 1
        self.circuits[evc.id] = evc
295
296
        # Schedule the circuit deploy
297 1
        self.sched.add(evc)
298
299
        # Circuit has no schedule, deploy now
300 1
        if not evc.circuit_scheduler:
301 1
            with evc.lock:
302 1
                evc.deploy()
303
304
        # Notify users
305 1
        result = {"circuit_id": evc.id}
306 1
        status = 201
307 1
        log.debug("create_circuit result %s %s", result, status)
308 1
        emit_event(self.controller, name="created",
309
                   content=map_evc_event_content(evc))
310 1
        return JSONResponse(result, status_code=status)
311
312 1
    @staticmethod
313 1
    def _use_uni_tags(evc):
314 1
        uni_a = evc.uni_a
315 1
        try:
316 1
            evc._use_uni_vlan(uni_a)
317 1
        except ValueError as err:
318 1
            raise err
319 1
        try:
320 1
            uni_z = evc.uni_z
321 1
            evc._use_uni_vlan(uni_z)
322 1
        except ValueError as err:
323 1
            evc.make_uni_vlan_available(uni_a)
324 1
            raise err
325
326 1
    @listen_to('kytos/flow_manager.flow.removed')
327 1
    def on_flow_delete(self, event):
328
        """Capture delete messages to keep track when flows got removed."""
329
        self.handle_flow_delete(event)
330
331 1
    def handle_flow_delete(self, event):
332
        """Keep track when the EVC got flows removed by deriving its cookie."""
333 1
        flow = event.content["flow"]
334 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
335 1
        if evc:
336 1
            log.debug("Flow removed in EVC %s", evc.id)
337 1
            evc.set_flow_removed_at()
338
339 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
340 1
    @validate_openapi(spec)
341 1
    def update(self, request: Request) -> JSONResponse:
342
        """Update a circuit based on payload.
343
344
        The EVC attributes (creation_time, active, current_path,
345
        failover_path, _id, archived) can't be updated.
346
        """
347 1
        data = get_json_or_400(request, self.controller.loop)
348 1
        circuit_id = request.path_params["circuit_id"]
349 1
        log.debug("update /v2/evc/%s", circuit_id)
350 1
        try:
351 1
            evc = self.circuits[circuit_id]
352 1
        except KeyError:
353 1
            result = f"circuit_id {circuit_id} not found"
354 1
            log.debug("update result %s %s", result, 404)
355 1
            raise HTTPException(404, detail=result) from KeyError
356
357 1
        if evc.archived:
358 1
            result = "Can't update archived EVC"
359 1
            log.debug("update result %s %s", result, 409)
360 1
            raise HTTPException(409, detail=result)
361
362 1
        try:
363 1
            enable, redeploy = evc.update(
364
                **self._evc_dict_with_instances(data)
365
            )
366 1
        except ValidationError as exception:
367
            raise HTTPException(400, detail=str(exception)) from exception
368 1
        except ValueError as exception:
369 1
            log.error(exception)
370 1
            log.debug("update result %s %s", exception, 400)
371 1
            raise HTTPException(400, detail=str(exception)) from exception
372 1
        except DisabledSwitch as exception:
373 1
            log.debug("update result %s %s", exception, 409)
374 1
            raise HTTPException(
375
                    409,
376
                    detail=f"Path is not valid: {exception}"
377
                ) from exception
378
379 1
        if evc.is_active():
380
            if enable is False:  # disable if active
381
                with evc.lock:
382
                    evc.remove()
383
            elif redeploy is not None:  # redeploy if active
384
                with evc.lock:
385
                    evc.remove()
386
                    evc.deploy()
387
        else:
388 1
            if enable is True:  # enable if inactive
389 1
                with evc.lock:
390 1
                    evc.deploy()
391 1
        result = {evc.id: evc.as_dict()}
392 1
        status = 200
393
394 1
        log.debug("update result %s %s", result, status)
395 1
        emit_event(self.controller, "updated",
396
                   content=map_evc_event_content(evc, **data))
397 1
        return JSONResponse(result, status_code=status)
398
399 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
400 1
    def delete_circuit(self, request: Request) -> JSONResponse:
401
        """Remove a circuit.
402
403
        First, the flows are removed from the switches, and then the EVC is
404
        disabled.
405
        """
406 1
        circuit_id = request.path_params["circuit_id"]
407 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
408 1
        try:
409 1
            evc = self.circuits[circuit_id]
410 1
        except KeyError:
411 1
            result = f"circuit_id {circuit_id} not found"
412 1
            log.debug("delete_circuit result %s %s", result, 404)
413 1
            raise HTTPException(404, detail=result) from KeyError
414
415 1
        if evc.archived:
416 1
            result = f"Circuit {circuit_id} already removed"
417 1
            log.debug("delete_circuit result %s %s", result, 404)
418 1
            raise HTTPException(404, detail=result)
419
420 1
        log.info("Removing %s", evc)
421 1
        with evc.lock:
422 1
            evc.remove_current_flows()
423 1
            evc.remove_failover_flows(sync=False)
424 1
            evc.deactivate()
425 1
            evc.disable()
426 1
            self.sched.remove(evc)
427 1
            evc.archive()
428 1
            evc.remove_uni_tags()
429 1
            evc.sync()
430 1
        log.info("EVC removed. %s", evc)
431 1
        result = {"response": f"Circuit {circuit_id} removed"}
432 1
        status = 200
433
434 1
        log.debug("delete_circuit result %s %s", result, status)
435 1
        emit_event(self.controller, "deleted",
436
                   content=map_evc_event_content(evc))
437 1
        return JSONResponse(result, status_code=status)
438
439 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
440 1
    def get_metadata(self, request: Request) -> JSONResponse:
441
        """Get metadata from an EVC."""
442 1
        circuit_id = request.path_params["circuit_id"]
443 1
        try:
444 1
            return (
445
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
446
            )
447
        except KeyError as error:
448
            raise HTTPException(
449
                404,
450
                detail=f"circuit_id {circuit_id} not found."
451
            ) from error
452
453 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
454 1
    @validate_openapi(spec)
455 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
456
        """Add metadata to a bulk of EVCs."""
457 1
        data = get_json_or_400(request, self.controller.loop)
458 1
        circuit_ids = data.pop("circuit_ids")
459
460 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
461
462 1
        fail_evcs = []
463 1
        for _id in circuit_ids:
464 1
            try:
465 1
                evc = self.circuits[_id]
466 1
                evc.extend_metadata(data)
467 1
            except KeyError:
468 1
                fail_evcs.append(_id)
469
470 1
        if fail_evcs:
471 1
            raise HTTPException(404, detail=fail_evcs)
472 1
        return JSONResponse("Operation successful", status_code=201)
473
474 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
475 1
    @validate_openapi(spec)
476 1
    def add_metadata(self, request: Request) -> JSONResponse:
477
        """Add metadata to an EVC."""
478 1
        circuit_id = request.path_params["circuit_id"]
479 1
        metadata = get_json_or_400(request, self.controller.loop)
480 1
        if not isinstance(metadata, dict):
481
            raise HTTPException(400, "Invalid metadata value: {metadata}")
482 1
        try:
483 1
            evc = self.circuits[circuit_id]
484 1
        except KeyError as error:
485 1
            raise HTTPException(
486
                404,
487
                detail=f"circuit_id {circuit_id} not found."
488
            ) from error
489
490 1
        evc.extend_metadata(metadata)
491 1
        evc.sync()
492 1
        return JSONResponse("Operation successful", status_code=201)
493
494 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
495 1
    @validate_openapi(spec)
496 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
497
        """Delete metada from a bulk of EVCs"""
498 1
        data = get_json_or_400(request, self.controller.loop)
499 1
        key = request.path_params["key"]
500 1
        circuit_ids = data.pop("circuit_ids")
501 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
502
503 1
        fail_evcs = []
504 1
        for _id in circuit_ids:
505 1
            try:
506 1
                evc = self.circuits[_id]
507 1
                evc.remove_metadata(key)
508
            except KeyError:
509
                fail_evcs.append(_id)
510
511 1
        if fail_evcs:
512
            raise HTTPException(404, detail=fail_evcs)
513 1
        return JSONResponse("Operation successful")
514
515 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
516 1
    def delete_metadata(self, request: Request) -> JSONResponse:
517
        """Delete metadata from an EVC."""
518 1
        circuit_id = request.path_params["circuit_id"]
519 1
        key = request.path_params["key"]
520 1
        try:
521 1
            evc = self.circuits[circuit_id]
522 1
        except KeyError as error:
523 1
            raise HTTPException(
524
                404,
525
                detail=f"circuit_id {circuit_id} not found."
526
            ) from error
527
528 1
        evc.remove_metadata(key)
529 1
        evc.sync()
530 1
        return JSONResponse("Operation successful")
531
532 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
533 1
    def redeploy(self, request: Request) -> JSONResponse:
534
        """Endpoint to force the redeployment of an EVC."""
535 1
        circuit_id = request.path_params["circuit_id"]
536 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
537 1
        try:
538 1
            evc = self.circuits[circuit_id]
539 1
        except KeyError:
540 1
            raise HTTPException(
541
                404,
542
                detail=f"circuit_id {circuit_id} not found"
543
            ) from KeyError
544 1
        if evc.is_enabled():
545 1
            with evc.lock:
546 1
                evc.remove_current_flows()
547 1
                evc.deploy()
548 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
549 1
            status = 202
550
        else:
551 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
552 1
            status = 409
553
554 1
        return JSONResponse(result, status_code=status)
555
556 1
    @rest("/v2/evc/schedule/", methods=["POST"])
557 1
    @validate_openapi(spec)
558 1
    def create_schedule(self, request: Request) -> JSONResponse:
559
        """
560
        Create a new schedule for a given circuit.
561
562
        This service do no check if there are conflicts with another schedule.
563
        Payload example:
564
            {
565
              "circuit_id":"aa:bb:cc",
566
              "schedule": {
567
                "date": "2019-08-07T14:52:10.967Z",
568
                "interval": "string",
569
                "frequency": "1 * * * *",
570
                "action": "create"
571
              }
572
            }
573
        """
574 1
        log.debug("create_schedule /v2/evc/schedule/")
575 1
        data = get_json_or_400(request, self.controller.loop)
576 1
        circuit_id = data["circuit_id"]
577 1
        schedule_data = data["schedule"]
578
579
        # Get EVC from circuits buffer
580 1
        circuits = self._get_circuits_buffer()
581
582
        # get the circuit
583 1
        evc = circuits.get(circuit_id)
584
585
        # get the circuit
586 1
        if not evc:
587 1
            result = f"circuit_id {circuit_id} not found"
588 1
            log.debug("create_schedule result %s %s", result, 404)
589 1
            raise HTTPException(404, detail=result)
590
        # Can not modify circuits deleted and archived
591 1
        if evc.archived:
592 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
593 1
            log.debug("create_schedule result %s %s", result, 409)
594 1
            raise HTTPException(409, detail=result)
595
596
        # new schedule from dict
597 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
598
599
        # If there is no schedule, create the list
600 1
        if not evc.circuit_scheduler:
601 1
            evc.circuit_scheduler = []
602
603
        # Add the new schedule
604 1
        evc.circuit_scheduler.append(new_schedule)
605
606
        # Add schedule job
607 1
        self.sched.add_circuit_job(evc, new_schedule)
608
609
        # save circuit to mongodb
610 1
        evc.sync()
611
612 1
        result = new_schedule.as_dict()
613 1
        status = 201
614
615 1
        log.debug("create_schedule result %s %s", result, status)
616 1
        return JSONResponse(result, status_code=status)
617
618 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
619 1
    @validate_openapi(spec)
620 1
    def update_schedule(self, request: Request) -> JSONResponse:
621
        """Update a schedule.
622
623
        Change all attributes from the given schedule from a EVC circuit.
624
        The schedule ID is preserved as default.
625
        Payload example:
626
            {
627
              "date": "2019-08-07T14:52:10.967Z",
628
              "interval": "string",
629
              "frequency": "1 * * *",
630
              "action": "create"
631
            }
632
        """
633 1
        data = get_json_or_400(request, self.controller.loop)
634 1
        schedule_id = request.path_params["schedule_id"]
635 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
636
637
        # Try to find a circuit schedule
638 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
639
640
        # Can not modify circuits deleted and archived
641 1
        if not found_schedule:
642 1
            result = f"schedule_id {schedule_id} not found"
643 1
            log.debug("update_schedule result %s %s", result, 404)
644 1
            raise HTTPException(404, detail=result)
645 1
        if evc.archived:
646 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
647 1
            log.debug("update_schedule result %s %s", result, 409)
648 1
            raise HTTPException(409, detail=result)
649
650 1
        new_schedule = CircuitSchedule.from_dict(data)
651 1
        new_schedule.id = found_schedule.id
652
        # Remove the old schedule
653 1
        evc.circuit_scheduler.remove(found_schedule)
654
        # Append the modified schedule
655 1
        evc.circuit_scheduler.append(new_schedule)
656
657
        # Cancel all schedule jobs
658 1
        self.sched.cancel_job(found_schedule.id)
659
        # Add the new circuit schedule
660 1
        self.sched.add_circuit_job(evc, new_schedule)
661
        # Save EVC to mongodb
662 1
        evc.sync()
663
664 1
        result = new_schedule.as_dict()
665 1
        status = 200
666
667 1
        log.debug("update_schedule result %s %s", result, status)
668 1
        return JSONResponse(result, status_code=status)
669
670 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
671 1
    def delete_schedule(self, request: Request) -> JSONResponse:
672
        """Remove a circuit schedule.
673
674
        Remove the Schedule from EVC.
675
        Remove the Schedule from cron job.
676
        Save the EVC to the Storehouse.
677
        """
678 1
        schedule_id = request.path_params["schedule_id"]
679 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
680 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
681
682
        # Can not modify circuits deleted and archived
683 1
        if not found_schedule:
684 1
            result = f"schedule_id {schedule_id} not found"
685 1
            log.debug("delete_schedule result %s %s", result, 404)
686 1
            raise HTTPException(404, detail=result)
687
688 1
        if evc.archived:
689 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
690 1
            log.debug("delete_schedule result %s %s", result, 409)
691 1
            raise HTTPException(409, detail=result)
692
693
        # Remove the old schedule
694 1
        evc.circuit_scheduler.remove(found_schedule)
695
696
        # Cancel all schedule jobs
697 1
        self.sched.cancel_job(found_schedule.id)
698
        # Save EVC to mongodb
699 1
        evc.sync()
700
701 1
        result = "Schedule removed"
702 1
        status = 200
703
704 1
        log.debug("delete_schedule result %s %s", result, status)
705 1
        return JSONResponse(result, status_code=status)
706
707 1
    def _is_duplicated_evc(self, evc):
708
        """Verify if the circuit given is duplicated with the stored evcs.
709
710
        Args:
711
            evc (EVC): circuit to be analysed.
712
713
        Returns:
714
            boolean: True if the circuit is duplicated, otherwise False.
715
716
        """
717 1
        for circuit in tuple(self.circuits.values()):
718 1
            if not circuit.archived and circuit.shares_uni(evc):
719 1
                return True
720 1
        return False
721
722 1
    @listen_to("kytos/topology.link_up")
723 1
    def on_link_up(self, event):
724
        """Change circuit when link is up or end_maintenance."""
725
        self.handle_link_up(event)
726
727 1
    def handle_link_up(self, event):
728
        """Change circuit when link is up or end_maintenance."""
729 1
        log.info("Event handle_link_up %s", event.content["link"])
730 1
        for evc in self.get_evcs_by_svc_level():
731 1
            if evc.is_enabled() and not evc.archived:
732 1
                with evc.lock:
733 1
                    evc.handle_link_up(event.content["link"])
734
735 1
    @listen_to("kytos/topology.updated")
736 1
    def on_topology_update(self, event):
737
        """Capture topology update event"""
738
        self.handle_topology_update(event)
739
740 1
    def handle_topology_update(self, event):
741
        """Handle topology update"""
742 1
        with self._lock:
743 1
            if (
744
                self._topology_updated_at
745
                and self._topology_updated_at > event.timestamp
746
            ):
747
                return
748 1
            self._topology_updated_at = event.timestamp
749 1
            for evc in self.get_evcs_by_svc_level():
750 1
                if evc.is_enabled() and not evc.archived:
751 1
                    with evc.lock:
752 1
                        evc.handle_topology_update(
753
                            event.content["topology"].switches
754
                        )
755
756 1
    @listen_to("kytos/topology.link_down")
757 1
    def on_link_down(self, event):
758
        """Change circuit when link is down or under_mantenance."""
759
        self.handle_link_down(event)
760
761 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
762
        """Change circuit when link is down or under_mantenance."""
763 1
        link = event.content["link"]
764 1
        log.info("Event handle_link_down %s", link)
765 1
        switch_flows = {}
766 1
        evcs_with_failover = []
767 1
        evcs_normal = []
768 1
        check_failover = []
769 1
        for evc in self.get_evcs_by_svc_level():
770 1
            if evc.is_affected_by_link(link):
771
                # if there is no failover path, handles link down the
772
                # tradditional way
773 1
                if (
774
                    not getattr(evc, 'failover_path', None) or
775
                    evc.is_failover_path_affected_by_link(link)
776
                ):
777 1
                    evcs_normal.append(evc)
778 1
                    continue
779 1
                try:
780 1
                    dpid_flows = evc.get_failover_flows()
781
                # pylint: disable=broad-except
782 1
                except Exception:
783 1
                    err = traceback.format_exc().replace("\n", ", ")
784 1
                    log.error(
785
                        f"Ignore Failover path for {evc} due to error: {err}"
786
                    )
787 1
                    evcs_normal.append(evc)
788 1
                    continue
789 1
                for dpid, flows in dpid_flows.items():
790 1
                    switch_flows.setdefault(dpid, [])
791 1
                    switch_flows[dpid].extend(flows)
792 1
                evcs_with_failover.append(evc)
793
            else:
794 1
                check_failover.append(evc)
795
796 1
        while switch_flows:
797 1
            offset = settings.BATCH_SIZE or None
798 1
            switches = list(switch_flows.keys())
799 1
            for dpid in switches:
800 1
                emit_event(
801
                    self.controller,
802
                    context="kytos.flow_manager",
803
                    name="flows.install",
804
                    content={
805
                        "dpid": dpid,
806
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
807
                    }
808
                )
809 1
                if offset is None or offset >= len(switch_flows[dpid]):
810 1
                    del switch_flows[dpid]
811 1
                    continue
812 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
813 1
            time.sleep(settings.BATCH_INTERVAL)
814
815 1
        for evc in evcs_with_failover:
816 1
            with evc.lock:
817 1
                old_path = evc.current_path
818 1
                evc.current_path = evc.failover_path
819 1
                evc.failover_path = old_path
820 1
                evc.sync()
821 1
            emit_event(self.controller, "redeployed_link_down",
822
                       content=map_evc_event_content(evc))
823 1
            log.info(
824
                f"{evc} redeployed with failover due to link down {link.id}"
825
            )
826
827 1
        for evc in evcs_normal:
828 1
            emit_event(
829
                self.controller,
830
                "evc_affected_by_link_down",
831
                content={"link_id": link.id} | map_evc_event_content(evc)
832
            )
833
834
        # After handling the hot path, check if new failover paths are needed.
835
        # Note that EVCs affected by link down will generate a KytosEvent for
836
        # deployed|redeployed, which will trigger the failover path setup.
837
        # Thus, we just need to further check the check_failover list
838 1
        for evc in check_failover:
839 1
            if evc.is_failover_path_affected_by_link(link):
840 1
                with evc.lock:
841 1
                    evc.setup_failover_path()
842
843 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
844 1
    def on_evc_affected_by_link_down(self, event):
845
        """Change circuit when link is down or under_mantenance."""
846
        self.handle_evc_affected_by_link_down(event)
847
848 1
    def handle_evc_affected_by_link_down(self, event):
849
        """Change circuit when link is down or under_mantenance."""
850 1
        evc = self.circuits.get(event.content["evc_id"])
851 1
        link_id = event.content['link_id']
852 1
        if not evc:
853 1
            return
854 1
        with evc.lock:
855 1
            result = evc.handle_link_down()
856 1
        event_name = "error_redeploy_link_down"
857 1
        if result:
858 1
            log.info(f"{evc} redeployed due to link down {link_id}")
859 1
            event_name = "redeployed_link_down"
860 1
        emit_event(self.controller, event_name,
861
                   content=map_evc_event_content(evc))
862
863 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
864 1
    def on_evc_deployed(self, event):
865
        """Handle EVC deployed|redeployed_link_down."""
866
        self.handle_evc_deployed(event)
867
868 1
    def handle_evc_deployed(self, event):
869
        """Setup failover path on evc deployed."""
870 1
        evc = self.circuits.get(event.content["evc_id"])
871 1
        if not evc:
872 1
            return
873 1
        with evc.lock:
874 1
            evc.setup_failover_path()
875
876 1
    @listen_to("kytos/topology.topology_loaded")
877 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
878
        """Load EVCs once the topology is available."""
879
        self.load_all_evcs()
880
881 1
    def load_all_evcs(self):
882
        """Try to load all EVCs on startup."""
883 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
884 1
        for circuit_id, circuit in circuits:
885 1
            if circuit_id not in self.circuits:
886 1
                self._load_evc(circuit)
887
888 1
    def _load_evc(self, circuit_dict):
889
        """Load one EVC from mongodb to memory."""
890 1
        try:
891 1
            evc = self._evc_from_dict(circuit_dict)
892 1
        except ValueError as exception:
893 1
            log.error(
894
                f"Could not load EVC: dict={circuit_dict} error={exception}"
895
            )
896 1
            return None
897
898 1
        if evc.archived:
899 1
            return None
900 1
        evc.deactivate()
901 1
        evc.sync()
902 1
        self.circuits.setdefault(evc.id, evc)
903 1
        self.sched.add(evc)
904 1
        return evc
905
906 1
    @listen_to("kytos/flow_manager.flow.error")
907 1
    def on_flow_mod_error(self, event):
908
        """Handle flow mod errors related to an EVC."""
909
        self.handle_flow_mod_error(event)
910
911 1
    def handle_flow_mod_error(self, event):
912
        """Handle flow mod errors related to an EVC."""
913 1
        flow = event.content["flow"]
914 1
        command = event.content.get("error_command")
915 1
        if command != "add":
916
            return
917 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
918 1
        if evc:
919 1
            with evc.lock:
920 1
                evc.remove_current_flows()
921
922 1
    def _evc_dict_with_instances(self, evc_dict):
923
        """Convert some dict values to instance of EVC classes.
924
925
        This method will convert: [UNI, Link]
926
        """
927 1
        data = evc_dict.copy()  # Do not modify the original dict
928 1
        for attribute, value in data.items():
929
            # Get multiple attributes.
930
            # Ex: uni_a, uni_z
931 1
            if "uni" in attribute:
932 1
                try:
933 1
                    data[attribute] = self._uni_from_dict(value)
934 1
                except ValueError as exception:
935 1
                    result = "Error creating UNI: Invalid value"
936 1
                    raise ValueError(result) from exception
937
938 1
            if attribute == "circuit_scheduler":
939 1
                data[attribute] = []
940 1
                for schedule in value:
941 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
942
943
            # Get multiple attributes.
944
            # Ex: primary_links,
945
            #     backup_links,
946
            #     current_links_cache,
947
            #     primary_links_cache,
948
            #     backup_links_cache
949 1
            if "links" in attribute:
950 1
                data[attribute] = [
951
                    self._link_from_dict(link) for link in value
952
                ]
953
954
            # Ex: current_path,
955
            #     primary_path,
956
            #     backup_path
957 1
            if "path" in attribute and attribute != "dynamic_backup_path":
958 1
                data[attribute] = Path(
959
                    [self._link_from_dict(link) for link in value]
960
                )
961
962 1
        return data
963
964 1
    def _evc_from_dict(self, evc_dict):
965 1
        data = self._evc_dict_with_instances(evc_dict)
966 1
        data["table_group"] = self.table_group
967 1
        return EVC(self.controller, **data)
968
969 1
    def _uni_from_dict(self, uni_dict):
970
        """Return a UNI object from python dict."""
971 1
        if uni_dict is None:
972 1
            return False
973
974 1
        interface_id = uni_dict.get("interface_id")
975 1
        interface = self.controller.get_interface_by_id(interface_id)
976 1
        if interface is None:
977 1
            result = (
978
                "Error creating UNI:"
979
                + f"Could not instantiate interface {interface_id}"
980
            )
981 1
            raise ValueError(result) from ValueError
982 1
        tag_convert = {1: "vlan"}
983 1
        tag_dict = uni_dict.get("tag", None)
984 1
        if tag_dict:
985 1
            tag_type = tag_dict.get("tag_type")
986 1
            tag_type = tag_convert.get(tag_type, tag_type)
987 1
            tag_value = tag_dict.get("value")
988 1
            tag = TAG(tag_type, tag_value)
989
        else:
990 1
            tag = None
991 1
        uni = UNI(interface, tag)
992
993 1
        return uni
994
995 1
    def _link_from_dict(self, link_dict):
996
        """Return a Link object from python dict."""
997 1
        id_a = link_dict.get("endpoint_a").get("id")
998 1
        id_b = link_dict.get("endpoint_b").get("id")
999
1000 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1001 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1002 1
        if not endpoint_a:
1003 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1004 1
            raise ValueError(error_msg)
1005 1
        if not endpoint_b:
1006
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1007
            raise ValueError(error_msg)
1008
1009 1
        link = Link(endpoint_a, endpoint_b)
1010 1
        if "metadata" in link_dict:
1011 1
            link.extend_metadata(link_dict.get("metadata"))
1012
1013 1
        s_vlan = link.get_metadata("s_vlan")
1014 1
        if s_vlan:
1015 1
            tag = TAG.from_dict(s_vlan)
1016 1
            if tag is False:
1017
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1018
                raise ValueError(error_msg)
1019 1
            link.update_metadata("s_vlan", tag)
1020 1
        return link
1021
1022 1
    def _find_evc_by_schedule_id(self, schedule_id):
1023
        """
1024
        Find an EVC and CircuitSchedule based on schedule_id.
1025
1026
        :param schedule_id: Schedule ID
1027
        :return: EVC and Schedule
1028
        """
1029 1
        circuits = self._get_circuits_buffer()
1030 1
        found_schedule = None
1031 1
        evc = None
1032
1033
        # pylint: disable=unused-variable
1034 1
        for c_id, circuit in circuits.items():
1035 1
            for schedule in circuit.circuit_scheduler:
1036 1
                if schedule.id == schedule_id:
1037 1
                    found_schedule = schedule
1038 1
                    evc = circuit
1039 1
                    break
1040 1
            if found_schedule:
1041 1
                break
1042 1
        return evc, found_schedule
1043
1044 1
    def _get_circuits_buffer(self):
1045
        """
1046
        Return the circuit buffer.
1047
1048
        If the buffer is empty, try to load data from mongodb.
1049
        """
1050 1
        if not self.circuits:
1051
            # Load circuits from mongodb to buffer
1052 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1053 1
            for c_id, circuit in circuits.items():
1054 1
                evc = self._evc_from_dict(circuit)
1055 1
                self.circuits[c_id] = evc
1056 1
        return self.circuits
1057
1058
    # pylint: disable=attribute-defined-outside-init
1059 1
    @alisten_to("kytos/of_multi_table.enable_table")
1060 1
    async def on_table_enabled(self, event):
1061
        """Handle a recently table enabled."""
1062 1
        table_group = event.content.get("mef_eline", None)
1063 1
        if not table_group:
1064
            return
1065 1
        for group in table_group:
1066 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1067 1
                log.error(f'The table group "{group}" is not allowed for '
1068
                          f'mef_eline. Allowed table groups are '
1069
                          f'{settings.TABLE_GROUP_ALLOWED}')
1070 1
                return
1071 1
        self.table_group.update(table_group)
1072 1
        content = {"group_table": self.table_group}
1073 1
        name = "kytos/mef_eline.enable_table"
1074
        await aemit_event(self.controller, name, content)
1075