Passed
Push — master ( 6c74fb...d0911c )
by Italo Valcy
02:49 queued 15s
created

build.main.Main._link_from_dict()   B

Complexity

Conditions 6

Size

Total Lines 26
Code Lines 22

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 6.2163

Importance

Changes 0
Metric Value
eloc 22
dl 0
loc 26
rs 8.4186
c 0
b 0
f 0
ccs 18
cts 22
cp 0.8182
cc 6
nop 2
crap 6.2163
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
15
                                validate_openapi)
16 1
from kytos.core.interface import TAG, UNI
17 1
from kytos.core.link import Link
18 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
19
                                 get_json_or_400)
20 1
from napps.kytos.mef_eline import controllers, settings
21 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
22 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
23
                                          Path)
24 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
25 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
26
                                         emit_event, map_evc_event_content)
27
28
29
# pylint: disable=too-many-public-methods
30 1
class Main(KytosNApp):
31
    """Main class of amlight/mef_eline NApp.
32
33
    This class is the entry point for this napp.
34
    """
35
36 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
37
38 1
    def setup(self):
39
        """Replace the '__init__' method for the KytosNApp subclass.
40
41
        The setup method is automatically called by the controller when your
42
        application is loaded.
43
44
        So, if you have any setup routine, insert it here.
45
        """
46
        # object used to scheduler circuit events
47 1
        self.sched = Scheduler()
48
49
        # object to save and load circuits
50 1
        self.mongo_controller = self.get_eline_controller()
51 1
        self.mongo_controller.bootstrap_indexes()
52
53
        # set the controller that will manager the dynamic paths
54 1
        DynamicPathManager.set_controller(self.controller)
55
56
        # dictionary of EVCs created. It acts as a circuit buffer.
57
        # Every create/update/delete must be synced to mongodb.
58 1
        self.circuits = {}
59
60 1
        self.table_group = {"epl": 0, "evpl": 0}
61 1
        self._lock = Lock()
62 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
63
64 1
        self.load_all_evcs()
65 1
        self._topology_updated_at = None
66
67 1
    def get_evcs_by_svc_level(self) -> list:
68
        """Get circuits sorted by desc service level and asc creation_time.
69
70
        In the future, as more ops are offloaded it should be get from the DB.
71
        """
72 1
        return sorted(self.circuits.values(),
73
                      key=lambda x: (-x.service_level, x.creation_time))
74
75 1
    @staticmethod
76 1
    def get_eline_controller():
77
        """Return the ELineController instance."""
78
        return controllers.ELineController()
79
80 1
    def execute(self):
81
        """Execute once when the napp is running."""
82 1
        if self._lock.locked():
83 1
            return
84 1
        log.debug("Starting consistency routine")
85 1
        with self._lock:
86 1
            self.execute_consistency()
87 1
        log.debug("Finished consistency routine")
88
89 1
    def should_be_checked(self, circuit):
90
        "Verify if the circuit meets the necessary conditions to be checked"
91
        # pylint: disable=too-many-boolean-expressions
92 1
        if (
93
                circuit.is_enabled()
94
                and not circuit.is_active()
95
                and not circuit.lock.locked()
96
                and not circuit.has_recent_removed_flow()
97
                and not circuit.is_recent_updated()
98
                and circuit.are_unis_active(self.controller.switches)
99
                # if a inter-switch EVC does not have current_path, it does not
100
                # make sense to run sdntrace on it
101
                and (circuit.is_intra_switch() or circuit.current_path)
102
                ):
103 1
            return True
104
        return False
105
106 1
    def execute_consistency(self):
107
        """Execute consistency routine."""
108 1
        circuits_to_check = []
109 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
110 1
        for circuit in self.get_evcs_by_svc_level():
111 1
            stored_circuits.pop(circuit.id, None)
112 1
            if self.should_be_checked(circuit):
113 1
                circuits_to_check.append(circuit)
114 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
115 1
        for circuit in circuits_to_check:
116 1
            is_checked = circuits_checked.get(circuit.id)
117 1
            if is_checked:
118 1
                circuit.execution_rounds = 0
119 1
                log.info(f"{circuit} enabled but inactive - activating")
120 1
                with circuit.lock:
121 1
                    circuit.activate()
122 1
                    circuit.sync()
123
            else:
124 1
                circuit.execution_rounds += 1
125 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
126 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
127 1
                    with circuit.lock:
128 1
                        circuit.deploy()
129 1
        for circuit_id in stored_circuits:
130 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
131 1
            self._load_evc(stored_circuits[circuit_id])
132
133 1
    def shutdown(self):
134
        """Execute when your napp is unloaded.
135
136
        If you have some cleanup procedure, insert it here.
137
        """
138
139 1
    @rest("/v2/evc/", methods=["GET"])
140 1
    def list_circuits(self, request: Request) -> JSONResponse:
141
        """Endpoint to return circuits stored.
142
143
        archive query arg if defined (not null) will be filtered
144
        accordingly, by default only non archived evcs will be listed
145
        """
146 1
        log.debug("list_circuits /v2/evc")
147 1
        args = request.query_params
148 1
        archived = args.get("archived", "false").lower()
149 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
150 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
151
                                                      metadata=args)
152 1
        circuits = circuits['circuits']
153 1
        return JSONResponse(circuits)
154
155 1
    @rest("/v2/evc/schedule", methods=["GET"])
156 1
    def list_schedules(self, _request: Request) -> JSONResponse:
157
        """Endpoint to return all schedules stored for all circuits.
158
159
        Return a JSON with the following template:
160
        [{"schedule_id": <schedule_id>,
161
         "circuit_id": <circuit_id>,
162
         "schedule": <schedule object>}]
163
        """
164 1
        log.debug("list_schedules /v2/evc/schedule")
165 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
166 1
        if not circuits:
167 1
            result = {}
168 1
            status = 200
169 1
            return JSONResponse(result, status_code=status)
170
171 1
        result = []
172 1
        status = 200
173 1
        for circuit in circuits:
174 1
            circuit_scheduler = circuit.get("circuit_scheduler")
175 1
            if circuit_scheduler:
176 1
                for scheduler in circuit_scheduler:
177 1
                    value = {
178
                        "schedule_id": scheduler.get("id"),
179
                        "circuit_id": circuit.get("id"),
180
                        "schedule": scheduler,
181
                    }
182 1
                    result.append(value)
183
184 1
        log.debug("list_schedules result %s %s", result, status)
185 1
        return JSONResponse(result, status_code=status)
186
187 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
188 1
    def get_circuit(self, request: Request) -> JSONResponse:
189
        """Endpoint to return a circuit based on id."""
190 1
        circuit_id = request.path_params["circuit_id"]
191 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
192 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
193 1
        if not circuit:
194 1
            result = f"circuit_id {circuit_id} not found"
195 1
            log.debug("get_circuit result %s %s", result, 404)
196 1
            raise HTTPException(404, detail=result)
197 1
        status = 200
198 1
        log.debug("get_circuit result %s %s", circuit, status)
199 1
        return JSONResponse(circuit, status_code=status)
200
201 1
    @rest("/v2/evc/", methods=["POST"])
202 1
    @validate_openapi(spec)
203 1
    def create_circuit(self, request: Request) -> JSONResponse:
204
        """Try to create a new circuit.
205
206
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
207
        UNI_Z's requested C-VID are available from the interfaces' pools. This
208
        is checked when creating the UNI object.
209
210
        Then, E-Line NApp requests a primary and a backup path to the
211
        Pathfinder NApp using the attributes primary_links and backup_links
212
        submitted via REST
213
214
        # For each link composing paths in #3:
215
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
216
        #  - Using the S-VID obtained, generate abstract flow entries to be
217
        #    sent to FlowManager
218
219
        Push abstract flow entries to FlowManager and FlowManager pushes
220
        OpenFlow entries to datapaths
221
222
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
223
        creation
224
225
        Finnaly, notify user of the status of its request.
226
        """
227
        # Try to create the circuit object
228 1
        log.debug("create_circuit /v2/evc/")
229 1
        data = get_json_or_400(request, self.controller.loop)
230
231 1
        try:
232 1
            evc = self._evc_from_dict(data)
233 1
        except ValueError as exception:
234 1
            log.debug("create_circuit result %s %s", exception, 400)
235 1
            raise HTTPException(400, detail=str(exception)) from exception
236
237 1
        try:
238 1
            check_disabled_component(evc.uni_a, evc.uni_z)
239 1
        except DisabledSwitch as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 409)
241 1
            raise HTTPException(
242
                    409,
243
                    detail=f"Path is not valid: {exception}"
244
                ) from exception
245
246 1
        if evc.primary_path:
247
            try:
248
                evc.primary_path.is_valid(
249
                    evc.uni_a.interface.switch,
250
                    evc.uni_z.interface.switch,
251
                    bool(evc.circuit_scheduler),
252
                )
253
            except InvalidPath as exception:
254
                raise HTTPException(
255
                    400,
256
                    detail=f"primary_path is not valid: {exception}"
257
                ) from exception
258 1
        if evc.backup_path:
259
            try:
260
                evc.backup_path.is_valid(
261
                    evc.uni_a.interface.switch,
262
                    evc.uni_z.interface.switch,
263
                    bool(evc.circuit_scheduler),
264
                )
265
            except InvalidPath as exception:
266
                raise HTTPException(
267
                    400,
268
                    detail=f"backup_path is not valid: {exception}"
269
                ) from exception
270
271
        # verify duplicated evc
272 1
        if self._is_duplicated_evc(evc):
273 1
            result = "The EVC already exists."
274 1
            log.debug("create_circuit result %s %s", result, 409)
275 1
            raise HTTPException(409, detail=result)
276
277 1
        try:
278 1
            evc._validate_has_primary_or_dynamic()
279 1
        except ValueError as exception:
280 1
            raise HTTPException(400, detail=str(exception)) from exception
281
282
        # save circuit
283 1
        try:
284 1
            evc.sync()
285
        except ValidationError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288
        # store circuit in dictionary
289 1
        self.circuits[evc.id] = evc
290
291
        # Schedule the circuit deploy
292 1
        self.sched.add(evc)
293
294
        # Circuit has no schedule, deploy now
295 1
        if not evc.circuit_scheduler:
296 1
            with evc.lock:
297 1
                evc.deploy()
298
299
        # Notify users
300 1
        result = {"circuit_id": evc.id}
301 1
        status = 201
302 1
        log.debug("create_circuit result %s %s", result, status)
303 1
        emit_event(self.controller, name="created",
304
                   content=map_evc_event_content(evc))
305 1
        return JSONResponse(result, status_code=status)
306
307 1
    @listen_to('kytos/flow_manager.flow.removed')
308 1
    def on_flow_delete(self, event):
309
        """Capture delete messages to keep track when flows got removed."""
310
        self.handle_flow_delete(event)
311
312 1
    def handle_flow_delete(self, event):
313
        """Keep track when the EVC got flows removed by deriving its cookie."""
314 1
        flow = event.content["flow"]
315 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
316 1
        if evc:
317 1
            log.debug("Flow removed in EVC %s", evc.id)
318 1
            evc.set_flow_removed_at()
319
320 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
321 1
    @validate_openapi(spec)
322 1
    def update(self, request: Request) -> JSONResponse:
323
        """Update a circuit based on payload.
324
325
        The EVC attributes (creation_time, active, current_path,
326
        failover_path, _id, archived) can't be updated.
327
        """
328 1
        data = get_json_or_400(request, self.controller.loop)
329 1
        circuit_id = request.path_params["circuit_id"]
330 1
        log.debug("update /v2/evc/%s", circuit_id)
331 1
        try:
332 1
            evc = self.circuits[circuit_id]
333 1
        except KeyError:
334 1
            result = f"circuit_id {circuit_id} not found"
335 1
            log.debug("update result %s %s", result, 404)
336 1
            raise HTTPException(404, detail=result) from KeyError
337
338 1
        if evc.archived:
339 1
            result = "Can't update archived EVC"
340 1
            log.debug("update result %s %s", result, 409)
341 1
            raise HTTPException(409, detail=result)
342
343 1
        try:
344 1
            enable, redeploy = evc.update(
345
                **self._evc_dict_with_instances(data)
346
            )
347 1
        except ValidationError as exception:
348
            raise HTTPException(400, detail=str(exception)) from exception
349 1
        except ValueError as exception:
350 1
            log.error(exception)
351 1
            log.debug("update result %s %s", exception, 400)
352 1
            raise HTTPException(400, detail=str(exception)) from exception
353 1
        except DisabledSwitch as exception:
354 1
            log.debug("update result %s %s", exception, 409)
355 1
            raise HTTPException(
356
                    409,
357
                    detail=f"Path is not valid: {exception}"
358
                ) from exception
359
360 1
        if evc.is_active():
361
            if enable is False:  # disable if active
362
                with evc.lock:
363
                    evc.remove()
364
            elif redeploy is not None:  # redeploy if active
365
                with evc.lock:
366
                    evc.remove()
367
                    evc.deploy()
368
        else:
369 1
            if enable is True:  # enable if inactive
370 1
                with evc.lock:
371 1
                    evc.deploy()
372 1
        result = {evc.id: evc.as_dict()}
373 1
        status = 200
374
375 1
        log.debug("update result %s %s", result, status)
376 1
        emit_event(self.controller, "updated",
377
                   content=map_evc_event_content(evc, **data))
378 1
        return JSONResponse(result, status_code=status)
379
380 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
381 1
    def delete_circuit(self, request: Request) -> JSONResponse:
382
        """Remove a circuit.
383
384
        First, the flows are removed from the switches, and then the EVC is
385
        disabled.
386
        """
387 1
        circuit_id = request.path_params["circuit_id"]
388 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
389 1
        try:
390 1
            evc = self.circuits[circuit_id]
391 1
        except KeyError:
392 1
            result = f"circuit_id {circuit_id} not found"
393 1
            log.debug("delete_circuit result %s %s", result, 404)
394 1
            raise HTTPException(404, detail=result) from KeyError
395
396 1
        if evc.archived:
397 1
            result = f"Circuit {circuit_id} already removed"
398 1
            log.debug("delete_circuit result %s %s", result, 404)
399 1
            raise HTTPException(404, detail=result)
400
401 1
        log.info("Removing %s", evc)
402 1
        with evc.lock:
403 1
            evc.remove_current_flows()
404 1
            evc.remove_failover_flows(sync=False)
405 1
            evc.deactivate()
406 1
            evc.disable()
407 1
            self.sched.remove(evc)
408 1
            evc.archive()
409 1
            evc.sync()
410 1
        log.info("EVC removed. %s", evc)
411 1
        result = {"response": f"Circuit {circuit_id} removed"}
412 1
        status = 200
413
414 1
        log.debug("delete_circuit result %s %s", result, status)
415 1
        emit_event(self.controller, "deleted",
416
                   content=map_evc_event_content(evc))
417 1
        return JSONResponse(result, status_code=status)
418
419 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
420 1
    def get_metadata(self, request: Request) -> JSONResponse:
421
        """Get metadata from an EVC."""
422 1
        circuit_id = request.path_params["circuit_id"]
423 1
        try:
424 1
            return (
425
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
426
            )
427
        except KeyError as error:
428
            raise HTTPException(
429
                404,
430
                detail=f"circuit_id {circuit_id} not found."
431
            ) from error
432
433 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
434 1
    @validate_openapi(spec)
435 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
436
        """Add metadata to a bulk of EVCs."""
437 1
        data = get_json_or_400(request, self.controller.loop)
438 1
        circuit_ids = data.pop("circuit_ids")
439
440 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
441
442 1
        fail_evcs = []
443 1
        for _id in circuit_ids:
444 1
            try:
445 1
                evc = self.circuits[_id]
446 1
                evc.extend_metadata(data)
447 1
            except KeyError:
448 1
                fail_evcs.append(_id)
449
450 1
        if fail_evcs:
451 1
            raise HTTPException(404, detail=fail_evcs)
452 1
        return JSONResponse("Operation successful", status_code=201)
453
454 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
455 1
    @validate_openapi(spec)
456 1
    def add_metadata(self, request: Request) -> JSONResponse:
457
        """Add metadata to an EVC."""
458 1
        circuit_id = request.path_params["circuit_id"]
459 1
        metadata = get_json_or_400(request, self.controller.loop)
460 1
        if not isinstance(metadata, dict):
461
            raise HTTPException(400, "Invalid metadata value: {metadata}")
462 1
        try:
463 1
            evc = self.circuits[circuit_id]
464 1
        except KeyError as error:
465 1
            raise HTTPException(
466
                404,
467
                detail=f"circuit_id {circuit_id} not found."
468
            ) from error
469
470 1
        evc.extend_metadata(metadata)
471 1
        evc.sync()
472 1
        return JSONResponse("Operation successful", status_code=201)
473
474 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
475 1
    @validate_openapi(spec)
476 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
477
        """Delete metada from a bulk of EVCs"""
478 1
        data = get_json_or_400(request, self.controller.loop)
479 1
        key = request.path_params["key"]
480 1
        circuit_ids = data.pop("circuit_ids")
481 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
482
483 1
        fail_evcs = []
484 1
        for _id in circuit_ids:
485 1
            try:
486 1
                evc = self.circuits[_id]
487 1
                evc.remove_metadata(key)
488
            except KeyError:
489
                fail_evcs.append(_id)
490
491 1
        if fail_evcs:
492
            raise HTTPException(404, detail=fail_evcs)
493 1
        return JSONResponse("Operation successful")
494
495 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
496 1
    def delete_metadata(self, request: Request) -> JSONResponse:
497
        """Delete metadata from an EVC."""
498 1
        circuit_id = request.path_params["circuit_id"]
499 1
        key = request.path_params["key"]
500 1
        try:
501 1
            evc = self.circuits[circuit_id]
502 1
        except KeyError as error:
503 1
            raise HTTPException(
504
                404,
505
                detail=f"circuit_id {circuit_id} not found."
506
            ) from error
507
508 1
        evc.remove_metadata(key)
509 1
        evc.sync()
510 1
        return JSONResponse("Operation successful")
511
512 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
513 1
    def redeploy(self, request: Request) -> JSONResponse:
514
        """Endpoint to force the redeployment of an EVC."""
515 1
        circuit_id = request.path_params["circuit_id"]
516 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
517 1
        try:
518 1
            evc = self.circuits[circuit_id]
519 1
        except KeyError:
520 1
            raise HTTPException(
521
                404,
522
                detail=f"circuit_id {circuit_id} not found"
523
            ) from KeyError
524 1
        if evc.is_enabled():
525 1
            with evc.lock:
526 1
                evc.remove_current_flows()
527 1
                evc.deploy()
528 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
529 1
            status = 202
530
        else:
531 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
532 1
            status = 409
533
534 1
        return JSONResponse(result, status_code=status)
535
536 1
    @rest("/v2/evc/schedule/", methods=["POST"])
537 1
    @validate_openapi(spec)
538 1
    def create_schedule(self, request: Request) -> JSONResponse:
539
        """
540
        Create a new schedule for a given circuit.
541
542
        This service do no check if there are conflicts with another schedule.
543
        Payload example:
544
            {
545
              "circuit_id":"aa:bb:cc",
546
              "schedule": {
547
                "date": "2019-08-07T14:52:10.967Z",
548
                "interval": "string",
549
                "frequency": "1 * * * *",
550
                "action": "create"
551
              }
552
            }
553
        """
554 1
        log.debug("create_schedule /v2/evc/schedule/")
555 1
        data = get_json_or_400(request, self.controller.loop)
556 1
        circuit_id = data["circuit_id"]
557 1
        schedule_data = data["schedule"]
558
559
        # Get EVC from circuits buffer
560 1
        circuits = self._get_circuits_buffer()
561
562
        # get the circuit
563 1
        evc = circuits.get(circuit_id)
564
565
        # get the circuit
566 1
        if not evc:
567 1
            result = f"circuit_id {circuit_id} not found"
568 1
            log.debug("create_schedule result %s %s", result, 404)
569 1
            raise HTTPException(404, detail=result)
570
        # Can not modify circuits deleted and archived
571 1
        if evc.archived:
572 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
573 1
            log.debug("create_schedule result %s %s", result, 409)
574 1
            raise HTTPException(409, detail=result)
575
576
        # new schedule from dict
577 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
578
579
        # If there is no schedule, create the list
580 1
        if not evc.circuit_scheduler:
581 1
            evc.circuit_scheduler = []
582
583
        # Add the new schedule
584 1
        evc.circuit_scheduler.append(new_schedule)
585
586
        # Add schedule job
587 1
        self.sched.add_circuit_job(evc, new_schedule)
588
589
        # save circuit to mongodb
590 1
        evc.sync()
591
592 1
        result = new_schedule.as_dict()
593 1
        status = 201
594
595 1
        log.debug("create_schedule result %s %s", result, status)
596 1
        return JSONResponse(result, status_code=status)
597
598 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
599 1
    @validate_openapi(spec)
600 1
    def update_schedule(self, request: Request) -> JSONResponse:
601
        """Update a schedule.
602
603
        Change all attributes from the given schedule from a EVC circuit.
604
        The schedule ID is preserved as default.
605
        Payload example:
606
            {
607
              "date": "2019-08-07T14:52:10.967Z",
608
              "interval": "string",
609
              "frequency": "1 * * *",
610
              "action": "create"
611
            }
612
        """
613 1
        data = get_json_or_400(request, self.controller.loop)
614 1
        schedule_id = request.path_params["schedule_id"]
615 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
616
617
        # Try to find a circuit schedule
618 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
619
620
        # Can not modify circuits deleted and archived
621 1
        if not found_schedule:
622 1
            result = f"schedule_id {schedule_id} not found"
623 1
            log.debug("update_schedule result %s %s", result, 404)
624 1
            raise HTTPException(404, detail=result)
625 1
        if evc.archived:
626 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
627 1
            log.debug("update_schedule result %s %s", result, 409)
628 1
            raise HTTPException(409, detail=result)
629
630 1
        new_schedule = CircuitSchedule.from_dict(data)
631 1
        new_schedule.id = found_schedule.id
632
        # Remove the old schedule
633 1
        evc.circuit_scheduler.remove(found_schedule)
634
        # Append the modified schedule
635 1
        evc.circuit_scheduler.append(new_schedule)
636
637
        # Cancel all schedule jobs
638 1
        self.sched.cancel_job(found_schedule.id)
639
        # Add the new circuit schedule
640 1
        self.sched.add_circuit_job(evc, new_schedule)
641
        # Save EVC to mongodb
642 1
        evc.sync()
643
644 1
        result = new_schedule.as_dict()
645 1
        status = 200
646
647 1
        log.debug("update_schedule result %s %s", result, status)
648 1
        return JSONResponse(result, status_code=status)
649
650 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
651 1
    def delete_schedule(self, request: Request) -> JSONResponse:
652
        """Remove a circuit schedule.
653
654
        Remove the Schedule from EVC.
655
        Remove the Schedule from cron job.
656
        Save the EVC to the Storehouse.
657
        """
658 1
        schedule_id = request.path_params["schedule_id"]
659 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
660 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
661
662
        # Can not modify circuits deleted and archived
663 1
        if not found_schedule:
664 1
            result = f"schedule_id {schedule_id} not found"
665 1
            log.debug("delete_schedule result %s %s", result, 404)
666 1
            raise HTTPException(404, detail=result)
667
668 1
        if evc.archived:
669 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
670 1
            log.debug("delete_schedule result %s %s", result, 409)
671 1
            raise HTTPException(409, detail=result)
672
673
        # Remove the old schedule
674 1
        evc.circuit_scheduler.remove(found_schedule)
675
676
        # Cancel all schedule jobs
677 1
        self.sched.cancel_job(found_schedule.id)
678
        # Save EVC to mongodb
679 1
        evc.sync()
680
681 1
        result = "Schedule removed"
682 1
        status = 200
683
684 1
        log.debug("delete_schedule result %s %s", result, status)
685 1
        return JSONResponse(result, status_code=status)
686
687 1
    def _is_duplicated_evc(self, evc):
688
        """Verify if the circuit given is duplicated with the stored evcs.
689
690
        Args:
691
            evc (EVC): circuit to be analysed.
692
693
        Returns:
694
            boolean: True if the circuit is duplicated, otherwise False.
695
696
        """
697 1
        for circuit in tuple(self.circuits.values()):
698 1
            if not circuit.archived and circuit.shares_uni(evc):
699 1
                return True
700 1
        return False
701
702 1
    @listen_to("kytos/topology.link_up")
703 1
    def on_link_up(self, event):
704
        """Change circuit when link is up or end_maintenance."""
705
        self.handle_link_up(event)
706
707 1
    def handle_link_up(self, event):
708
        """Change circuit when link is up or end_maintenance."""
709 1
        log.info("Event handle_link_up %s", event.content["link"])
710 1
        for evc in self.get_evcs_by_svc_level():
711 1
            if evc.is_enabled() and not evc.archived:
712 1
                with evc.lock:
713 1
                    evc.handle_link_up(event.content["link"])
714
715 1
    @listen_to("kytos/topology.updated")
716 1
    def on_topology_update(self, event):
717
        """Capture topology update event"""
718
        self.handle_topology_update(event)
719
720 1
    def handle_topology_update(self, event):
721
        """Handle topology update"""
722 1
        with self._lock:
723 1
            if (
724
                self._topology_updated_at
725
                and self._topology_updated_at > event.timestamp
726
            ):
727
                return
728 1
            self._topology_updated_at = event.timestamp
729 1
            for evc in self.get_evcs_by_svc_level():
730 1
                if evc.is_enabled() and not evc.archived:
731 1
                    with evc.lock:
732 1
                        evc.handle_topology_update(
733
                            event.content["topology"].switches
734
                        )
735
736 1
    @listen_to("kytos/topology.link_down")
737 1
    def on_link_down(self, event):
738
        """Change circuit when link is down or under_mantenance."""
739
        self.handle_link_down(event)
740
741 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
742
        """Change circuit when link is down or under_mantenance."""
743 1
        link = event.content["link"]
744 1
        log.info("Event handle_link_down %s", link)
745 1
        switch_flows = {}
746 1
        evcs_with_failover = []
747 1
        evcs_normal = []
748 1
        check_failover = []
749 1
        for evc in self.get_evcs_by_svc_level():
750 1
            if evc.is_affected_by_link(link):
751
                # if there is no failover path, handles link down the
752
                # tradditional way
753 1
                if (
754
                    not getattr(evc, 'failover_path', None) or
755
                    evc.is_failover_path_affected_by_link(link)
756
                ):
757 1
                    evcs_normal.append(evc)
758 1
                    continue
759 1
                try:
760 1
                    dpid_flows = evc.get_failover_flows()
761
                # pylint: disable=broad-except
762 1
                except Exception:
763 1
                    err = traceback.format_exc().replace("\n", ", ")
764 1
                    log.error(
765
                        f"Ignore Failover path for {evc} due to error: {err}"
766
                    )
767 1
                    evcs_normal.append(evc)
768 1
                    continue
769 1
                for dpid, flows in dpid_flows.items():
770 1
                    switch_flows.setdefault(dpid, [])
771 1
                    switch_flows[dpid].extend(flows)
772 1
                evcs_with_failover.append(evc)
773
            else:
774 1
                check_failover.append(evc)
775
776 1
        while switch_flows:
777 1
            offset = settings.BATCH_SIZE or None
778 1
            switches = list(switch_flows.keys())
779 1
            for dpid in switches:
780 1
                emit_event(
781
                    self.controller,
782
                    context="kytos.flow_manager",
783
                    name="flows.install",
784
                    content={
785
                        "dpid": dpid,
786
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
787
                    }
788
                )
789 1
                if offset is None or offset >= len(switch_flows[dpid]):
790 1
                    del switch_flows[dpid]
791 1
                    continue
792 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
793 1
            time.sleep(settings.BATCH_INTERVAL)
794
795 1
        for evc in evcs_with_failover:
796 1
            with evc.lock:
797 1
                old_path = evc.current_path
798 1
                evc.current_path = evc.failover_path
799 1
                evc.failover_path = old_path
800 1
                evc.sync()
801 1
            emit_event(self.controller, "redeployed_link_down",
802
                       content=map_evc_event_content(evc))
803 1
            log.info(
804
                f"{evc} redeployed with failover due to link down {link.id}"
805
            )
806
807 1
        for evc in evcs_normal:
808 1
            emit_event(
809
                self.controller,
810
                "evc_affected_by_link_down",
811
                content={"link_id": link.id} | map_evc_event_content(evc)
812
            )
813
814
        # After handling the hot path, check if new failover paths are needed.
815
        # Note that EVCs affected by link down will generate a KytosEvent for
816
        # deployed|redeployed, which will trigger the failover path setup.
817
        # Thus, we just need to further check the check_failover list
818 1
        for evc in check_failover:
819 1
            if evc.is_failover_path_affected_by_link(link):
820 1
                with evc.lock:
821 1
                    evc.setup_failover_path()
822
823 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
824 1
    def on_evc_affected_by_link_down(self, event):
825
        """Change circuit when link is down or under_mantenance."""
826
        self.handle_evc_affected_by_link_down(event)
827
828 1
    def handle_evc_affected_by_link_down(self, event):
829
        """Change circuit when link is down or under_mantenance."""
830 1
        evc = self.circuits.get(event.content["evc_id"])
831 1
        link_id = event.content['link_id']
832 1
        if not evc:
833 1
            return
834 1
        with evc.lock:
835 1
            result = evc.handle_link_down()
836 1
        event_name = "error_redeploy_link_down"
837 1
        if result:
838 1
            log.info(f"{evc} redeployed due to link down {link_id}")
839 1
            event_name = "redeployed_link_down"
840 1
        emit_event(self.controller, event_name,
841
                   content=map_evc_event_content(evc))
842
843 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
844 1
    def on_evc_deployed(self, event):
845
        """Handle EVC deployed|redeployed_link_down."""
846
        self.handle_evc_deployed(event)
847
848 1
    def handle_evc_deployed(self, event):
849
        """Setup failover path on evc deployed."""
850 1
        evc = self.circuits.get(event.content["evc_id"])
851 1
        if not evc:
852 1
            return
853 1
        with evc.lock:
854 1
            evc.setup_failover_path()
855
856 1
    @listen_to("kytos/topology.topology_loaded")
857 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
858
        """Load EVCs once the topology is available."""
859
        self.load_all_evcs()
860
861 1
    def load_all_evcs(self):
862
        """Try to load all EVCs on startup."""
863 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
864 1
        for circuit_id, circuit in circuits:
865 1
            if circuit_id not in self.circuits:
866 1
                self._load_evc(circuit)
867
868 1
    def _load_evc(self, circuit_dict):
869
        """Load one EVC from mongodb to memory."""
870 1
        try:
871 1
            evc = self._evc_from_dict(circuit_dict)
872 1
        except ValueError as exception:
873 1
            log.error(
874
                f"Could not load EVC: dict={circuit_dict} error={exception}"
875
            )
876 1
            return None
877
878 1
        if evc.archived:
879 1
            return None
880 1
        evc.deactivate()
881 1
        evc.sync()
882 1
        self.circuits.setdefault(evc.id, evc)
883 1
        self.sched.add(evc)
884 1
        return evc
885
886 1
    @listen_to("kytos/flow_manager.flow.error")
887 1
    def on_flow_mod_error(self, event):
888
        """Handle flow mod errors related to an EVC."""
889
        self.handle_flow_mod_error(event)
890
891 1
    def handle_flow_mod_error(self, event):
892
        """Handle flow mod errors related to an EVC."""
893 1
        flow = event.content["flow"]
894 1
        command = event.content.get("error_command")
895 1
        if command != "add":
896
            return
897 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
898 1
        if evc:
899 1
            with evc.lock:
900 1
                evc.remove_current_flows()
901
902 1
    def _evc_dict_with_instances(self, evc_dict):
903
        """Convert some dict values to instance of EVC classes.
904
905
        This method will convert: [UNI, Link]
906
        """
907 1
        data = evc_dict.copy()  # Do not modify the original dict
908 1
        for attribute, value in data.items():
909
            # Get multiple attributes.
910
            # Ex: uni_a, uni_z
911 1
            if "uni" in attribute:
912 1
                try:
913 1
                    data[attribute] = self._uni_from_dict(value)
914 1
                except ValueError as exception:
915 1
                    result = "Error creating UNI: Invalid value"
916 1
                    raise ValueError(result) from exception
917
918 1
            if attribute == "circuit_scheduler":
919 1
                data[attribute] = []
920 1
                for schedule in value:
921 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
922
923
            # Get multiple attributes.
924
            # Ex: primary_links,
925
            #     backup_links,
926
            #     current_links_cache,
927
            #     primary_links_cache,
928
            #     backup_links_cache
929 1
            if "links" in attribute:
930 1
                data[attribute] = [
931
                    self._link_from_dict(link) for link in value
932
                ]
933
934
            # Ex: current_path,
935
            #     primary_path,
936
            #     backup_path
937 1
            if "path" in attribute and attribute != "dynamic_backup_path":
938 1
                data[attribute] = Path(
939
                    [self._link_from_dict(link) for link in value]
940
                )
941
942 1
        return data
943
944 1
    def _evc_from_dict(self, evc_dict):
945 1
        data = self._evc_dict_with_instances(evc_dict)
946 1
        data["table_group"] = self.table_group
947 1
        return EVC(self.controller, **data)
948
949 1
    def _uni_from_dict(self, uni_dict):
950
        """Return a UNI object from python dict."""
951 1
        if uni_dict is None:
952 1
            return False
953
954 1
        interface_id = uni_dict.get("interface_id")
955 1
        interface = self.controller.get_interface_by_id(interface_id)
956 1
        if interface is None:
957 1
            result = (
958
                "Error creating UNI:"
959
                + f"Could not instantiate interface {interface_id}"
960
            )
961 1
            raise ValueError(result) from ValueError
962
963 1
        tag_dict = uni_dict.get("tag", None)
964 1
        if tag_dict:
965 1
            tag = TAG.from_dict(tag_dict)
966
        else:
967 1
            tag = None
968 1
        uni = UNI(interface, tag)
969
970 1
        return uni
971
972 1
    def _link_from_dict(self, link_dict):
973
        """Return a Link object from python dict."""
974 1
        id_a = link_dict.get("endpoint_a").get("id")
975 1
        id_b = link_dict.get("endpoint_b").get("id")
976
977 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
978 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
979 1
        if not endpoint_a:
980 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
981 1
            raise ValueError(error_msg)
982 1
        if not endpoint_b:
983
            error_msg = f"Could not get interface endpoint_b id {id_b}"
984
            raise ValueError(error_msg)
985
986 1
        link = Link(endpoint_a, endpoint_b)
987 1
        if "metadata" in link_dict:
988 1
            link.extend_metadata(link_dict.get("metadata"))
989
990 1
        s_vlan = link.get_metadata("s_vlan")
991 1
        if s_vlan:
992 1
            tag = TAG.from_dict(s_vlan)
993 1
            if tag is False:
994
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
995
                raise ValueError(error_msg)
996 1
            link.update_metadata("s_vlan", tag)
997 1
        return link
998
999 1
    def _find_evc_by_schedule_id(self, schedule_id):
1000
        """
1001
        Find an EVC and CircuitSchedule based on schedule_id.
1002
1003
        :param schedule_id: Schedule ID
1004
        :return: EVC and Schedule
1005
        """
1006 1
        circuits = self._get_circuits_buffer()
1007 1
        found_schedule = None
1008 1
        evc = None
1009
1010
        # pylint: disable=unused-variable
1011 1
        for c_id, circuit in circuits.items():
1012 1
            for schedule in circuit.circuit_scheduler:
1013 1
                if schedule.id == schedule_id:
1014 1
                    found_schedule = schedule
1015 1
                    evc = circuit
1016 1
                    break
1017 1
            if found_schedule:
1018 1
                break
1019 1
        return evc, found_schedule
1020
1021 1
    def _get_circuits_buffer(self):
1022
        """
1023
        Return the circuit buffer.
1024
1025
        If the buffer is empty, try to load data from mongodb.
1026
        """
1027 1
        if not self.circuits:
1028
            # Load circuits from mongodb to buffer
1029 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1030 1
            for c_id, circuit in circuits.items():
1031 1
                evc = self._evc_from_dict(circuit)
1032 1
                self.circuits[c_id] = evc
1033 1
        return self.circuits
1034
1035
    # pylint: disable=attribute-defined-outside-init
1036 1
    @alisten_to("kytos/of_multi_table.enable_table")
1037 1
    async def on_table_enabled(self, event):
1038
        """Handle a recently table enabled."""
1039 1
        table_group = event.content.get("mef_eline", None)
1040 1
        if not table_group:
1041
            return
1042 1
        for group in table_group:
1043 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1044 1
                log.error(f'The table group "{group}" is not allowed for '
1045
                          f'mef_eline. Allowed table groups are '
1046
                          f'{settings.TABLE_GROUP_ALLOWED}')
1047 1
                return
1048 1
        self.table_group.update(table_group)
1049 1
        content = {"group_table": self.table_group}
1050 1
        name = "kytos/mef_eline.enable_table"
1051
        await aemit_event(self.controller, name, content)
1052