Test Failed
Pull Request — master (#320)
by Vinicius
04:00
created

build.main.Main.add_metadata()   A

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.0884

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 2
dl 0
loc 19
rs 9.55
c 0
b 0
f 0
ccs 11
cts 14
cp 0.7856
crap 3.0884
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11 1
12 1
from kytos.core import KytosNApp, log, rest
13
from kytos.core.helpers import listen_to, load_spec, validate_openapi
14
from kytos.core.interface import TAG, UNI
15
from kytos.core.link import Link
16 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
17 1
                                 get_json_or_400)
18 1
from napps.kytos.mef_eline import controllers, settings
19 1
from napps.kytos.mef_eline.exceptions import InvalidPath
20 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
21 1
                                          Path)
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23
from napps.kytos.mef_eline.utils import emit_event, map_evc_event_content
24 1
25 1
26
# pylint: disable=too-many-public-methods
27
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29 1
30
    This class is the entry point for this napp.
31
    """
32
33
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37 1
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44
        self.sched = Scheduler()
45
46 1
        # object to save and load circuits
47
        self.mongo_controller = self.get_eline_controller()
48
        self.mongo_controller.bootstrap_indexes()
49 1
50 1
        # set the controller that will manager the dynamic paths
51
        DynamicPathManager.set_controller(self.controller)
52
53 1
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55
        self.circuits = {}
56
57 1
        self._lock = Lock()
58
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
59 1
60 1
        self.load_all_evcs()
61
62 1
    def get_evcs_by_svc_level(self) -> list:
63
        """Get circuits sorted by desc service level and asc creation_time.
64 1
65
        In the future, as more ops are offloaded it should be get from the DB.
66
        """
67
        return sorted(self.circuits.values(),
68
                      key=lambda x: (-x.service_level, x.creation_time))
69 1
70
    @staticmethod
71
    def get_eline_controller():
72 1
        """Return the ELineController instance."""
73 1
        return controllers.ELineController()
74
75
    def execute(self):
76
        """Execute once when the napp is running."""
77 1
        if self._lock.locked():
78
            return
79 1
        log.debug("Starting consistency routine")
80 1
        with self._lock:
81 1
            self.execute_consistency()
82 1
        log.debug("Finished consistency routine")
83 1
84 1
    @staticmethod
85
    def should_be_checked(circuit):
86 1
        "Verify if the circuit meets the necessary conditions to be checked"
87 1
        # pylint: disable=too-many-boolean-expressions
88
        if (
89
                circuit.is_enabled()
90 1
                and not circuit.is_active()
91
                and not circuit.lock.locked()
92
                and not circuit.has_recent_removed_flow()
93
                and not circuit.is_recent_updated()
94
                # if a inter-switch EVC does not have current_path, it does not
95
                # make sense to run sdntrace on it
96
                and (circuit.is_intra_switch() or circuit.current_path)
97
                ):
98
            return True
99
        return False
100 1
101
    def execute_consistency(self):
102
        """Execute consistency routine."""
103 1
        circuits_to_check = []
104
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
105 1
        for circuit in self.get_evcs_by_svc_level():
106 1
            stored_circuits.pop(circuit.id, None)
107 1
            if self.should_be_checked(circuit):
108 1
                circuits_to_check.append(circuit)
109 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
110 1
        for circuit in circuits_to_check:
111 1
            is_checked = circuits_checked.get(circuit.id)
112 1
            if is_checked:
113 1
                circuit.execution_rounds = 0
114 1
                log.info(f"{circuit} enabled but inactive - activating")
115 1
                with circuit.lock:
116 1
                    circuit.activate()
117 1
                    circuit.sync()
118 1
            else:
119 1
                circuit.execution_rounds += 1
120
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
121 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
122 1
                    with circuit.lock:
123 1
                        circuit.deploy()
124 1
        for circuit_id in stored_circuits:
125 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
126 1
            self._load_evc(stored_circuits[circuit_id])
127 1
128 1
    def shutdown(self):
129
        """Execute when your napp is unloaded.
130 1
131
        If you have some cleanup procedure, insert it here.
132
        """
133
134
    @rest("/v2/evc/", methods=["GET"])
135
    def list_circuits(self, request: Request) -> JSONResponse:
136 1
        """Endpoint to return circuits stored.
137 1
138
        archive query arg if defined (not null) will be filtered
139
        accordingly, by default only non archived evcs will be listed
140
        """
141
        log.debug("list_circuits /v2/evc")
142
        args = request.query_params
143 1
        archived = args.get("archived", "false").lower()
144 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
145 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
146 1
                                                      metadata=args)
147
        circuits = circuits['circuits']
148 1
        return JSONResponse(circuits)
149 1
150
    @rest("/v2/evc/schedule", methods=["GET"])
151 1
    def list_schedules(self, _request: Request) -> JSONResponse:
152 1
        """Endpoint to return all schedules stored for all circuits.
153
154 1
        Return a JSON with the following template:
155 1
        [{"schedule_id": <schedule_id>,
156 1
         "circuit_id": <circuit_id>,
157 1
         "schedule": <schedule object>}]
158 1
        """
159 1
        log.debug("list_schedules /v2/evc/schedule")
160 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
161 1
        if not circuits:
162 1
            result = {}
163
            status = 200
164 1
            return JSONResponse(result, status_code=status)
165 1
166 1
        result = []
167
        status = 200
168
        for circuit in circuits:
169
            circuit_scheduler = circuit.get("circuit_scheduler")
170
            if circuit_scheduler:
171
                for scheduler in circuit_scheduler:
172
                    value = {
173
                        "schedule_id": scheduler.get("id"),
174
                        "circuit_id": circuit.get("id"),
175
                        "schedule": scheduler,
176
                    }
177
                    result.append(value)
178
179
        log.debug("list_schedules result %s %s", result, status)
180
        return JSONResponse(result, status_code=status)
181
182
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
183
    def get_circuit(self, request: Request) -> JSONResponse:
184
        """Endpoint to return a circuit based on id."""
185
        circuit_id = request.path_params["circuit_id"]
186
        log.debug("get_circuit /v2/evc/%s", circuit_id)
187
        circuit = self.mongo_controller.get_circuit(circuit_id)
188
        if not circuit:
189
            result = f"circuit_id {circuit_id} not found"
190
            log.debug("get_circuit result %s %s", result, 404)
191 1
            raise HTTPException(404, detail=result)
192
        status = 200
193 1
        log.debug("get_circuit result %s %s", circuit, status)
194 1
        return JSONResponse(circuit, status_code=status)
195 1
196 1
    @rest("/v2/evc/", methods=["POST"])
197 1
    @validate_openapi(spec)
198
    def create_circuit(self, request: Request) -> JSONResponse:
199 1
        """Try to create a new circuit.
200
201
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
202
        UNI_Z's requested C-VID are available from the interfaces' pools. This
203
        is checked when creating the UNI object.
204
205
        Then, E-Line NApp requests a primary and a backup path to the
206
        Pathfinder NApp using the attributes primary_links and backup_links
207
        submitted via REST
208
209
        # For each link composing paths in #3:
210 1
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
211
        #  - Using the S-VID obtained, generate abstract flow entries to be
212
        #    sent to FlowManager
213
214
        Push abstract flow entries to FlowManager and FlowManager pushes
215
        OpenFlow entries to datapaths
216
217
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
218
        creation
219
220
        Finnaly, notify user of the status of its request.
221
        """
222
        # Try to create the circuit object
223 1
        log.debug("create_circuit /v2/evc/")
224 1
        data = get_json_or_400(request)
225 1
226 1
        try:
227
            evc = self._evc_from_dict(data)
228 1
        except ValueError as exception:
229 1
            log.debug("create_circuit result %s %s", exception, 400)
230 1
            raise HTTPException(400, detail=str(exception)) from exception
231 1
232
        if evc.primary_path:
233
            try:
234 1
                evc.primary_path.is_valid(
235 1
                    evc.uni_a.interface.switch,
236
                    evc.uni_z.interface.switch,
237
                    bool(evc.circuit_scheduler),
238
                )
239
            except InvalidPath as exception:
240 1
                raise HTTPException(
241
                    400,
242
                    detail=f"primary_path is not valid: {exception}"
243 1
                ) from exception
244
        if evc.backup_path:
245
            try:
246 1
                evc.backup_path.is_valid(
247 1
                    evc.uni_a.interface.switch,
248 1
                    evc.uni_z.interface.switch,
249
                    bool(evc.circuit_scheduler),
250
                )
251 1
            except InvalidPath as exception:
252 1
                raise HTTPException(
253 1
                    400,
254 1
                    detail=f"backup_path is not valid: {exception}"
255
                ) from exception
256 1
257
        # verify duplicated evc
258 1
        if self._is_duplicated_evc(evc):
259 1
            result = "The EVC already exists."
260
            log.debug("create_circuit result %s %s", result, 409)
261
            raise HTTPException(409, detail=result)
262
263 1
        try:
264
            evc._validate_has_primary_or_dynamic()
265 1
        except ValueError as exception:
266 1
            raise HTTPException(400, detail=str(exception)) from exception
267 1
268 1
        # save circuit
269 1
        try:
270
            evc.sync()
271 1
        except ValidationError as exception:
272 1
            raise HTTPException(400, detail=str(exception)) from exception
273 1
274
        # store circuit in dictionary
275
        self.circuits[evc.id] = evc
276
277
        # Schedule the circuit deploy
278
        self.sched.add(evc)
279 1
280 1
        # Circuit has no schedule, deploy now
281 1
        if not evc.circuit_scheduler:
282 1
            with evc.lock:
283 1
                evc.deploy()
284 1
285 1
        # Notify users
286
        result = {"circuit_id": evc.id}
287 1
        status = 201
288 1
        log.debug("create_circuit result %s %s", result, status)
289 1
        emit_event(self.controller, name="created",
290 1
                   content=map_evc_event_content(evc))
291
        return JSONResponse(result, status_code=status)
292 1
293 1
    @listen_to('kytos/flow_manager.flow.removed')
294
    def on_flow_delete(self, event):
295
        """Capture delete messages to keep track when flows got removed."""
296 1
        self.handle_flow_delete(event)
297
298 1
    def handle_flow_delete(self, event):
299 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
300 1
        flow = event.content["flow"]
301 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
302
        if evc:
303 1
            log.debug("Flow removed in EVC %s", evc.id)
304
            evc.set_flow_removed_at()
305
306
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
307
    @validate_openapi(spec)
308
    def update(self, request: Request) -> JSONResponse:
309
        """Update a circuit based on payload.
310
311
        The EVC attributes (creation_time, active, current_path,
312 1
        failover_path, _id, archived) can't be updated.
313 1
        """
314 1
        data = get_json_or_400(request)
315 1
        circuit_id = request.path_params["circuit_id"]
316 1
        log.debug("update /v2/evc/%s", circuit_id)
317
        try:
318 1
            evc = self.circuits[circuit_id]
319 1
        except KeyError:
320
            result = f"circuit_id {circuit_id} not found"
321 1
            log.debug("update result %s %s", result, 404)
322
            raise HTTPException(404, detail=result) from KeyError
323 1
324 1
        if evc.archived:
325
            result = "Can't update archived EVC"
326
            log.debug("update result %s %s", result, 409)
327
            raise HTTPException(409, detail=result)
328
329
        try:
330 1
            enable, redeploy = evc.update(
331 1
                **self._evc_dict_with_instances(data)
332 1
            )
333 1
        except ValidationError as exception:
334 1
            raise HTTPException(400, detail=str(exception)) from exception
335 1
        except ValueError as exception:
336 1
            log.error(exception)
337
            log.debug("update result %s %s", exception, 400)
338 1
            raise HTTPException(400, detail=str(exception)) from exception
339 1
340 1
        if evc.is_active():
341 1
            if enable is False:  # disable if active
342
                with evc.lock:
343 1
                    evc.remove()
344 1
            elif redeploy is not None:  # redeploy if active
345 1
                with evc.lock:
346 1
                    evc.remove()
347 1
                    evc.deploy()
348 1
        else:
349 1
            if enable is True:  # enable if inactive
350 1
                with evc.lock:
351 1
                    evc.deploy()
352 1
        result = {evc.id: evc.as_dict()}
353 1
        status = 200
354 1
355
        log.debug("update result %s %s", result, status)
356 1
        emit_event(self.controller, "updated",
357 1
                   content=map_evc_event_content(evc, **data))
358
        return JSONResponse(result, status_code=status)
359 1
360
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
361 1
    def delete_circuit(self, request: Request) -> JSONResponse:
362 1
        """Remove a circuit.
363
364 1
        First, the flows are removed from the switches, and then the EVC is
365 1
        disabled.
366
        """
367
        circuit_id = request.path_params["circuit_id"]
368
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
369
        try:
370
            evc = self.circuits[circuit_id]
371
        except KeyError:
372 1
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("delete_circuit result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375
376 1
        if evc.archived:
377 1
            result = f"Circuit {circuit_id} already removed"
378
            log.debug("delete_circuit result %s %s", result, 404)
379 1
            raise HTTPException(404, detail=result)
380 1
381 1
        log.info("Removing %s", evc)
382 1
        with evc.lock:
383 1
            evc.remove_current_flows()
384 1
            evc.remove_failover_flows(sync=False)
385 1
            evc.deactivate()
386
            evc.disable()
387 1
            self.sched.remove(evc)
388 1
            evc.archive()
389 1
            evc.sync()
390
        log.info("EVC removed. %s", evc)
391 1
        result = {"response": f"Circuit {circuit_id} removed"}
392 1
        status = 200
393
394 1
        log.debug("delete_circuit result %s %s", result, status)
395 1
        emit_event(self.controller, "deleted",
396 1
                   content=map_evc_event_content(evc))
397 1
        return JSONResponse(result, status_code=status)
398 1
399 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
400 1
    def get_metadata(self, request: Request) -> JSONResponse:
401 1
        """Get metadata from an EVC."""
402 1
        circuit_id = request.path_params["circuit_id"]
403 1
        try:
404 1
            return (
405 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
406
            )
407
        except KeyError as error:
408
            raise HTTPException(
409
                404,
410
                detail=f"circuit_id {circuit_id} not found."
411 1
            ) from error
412
413 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
414 1
    @validate_openapi(spec)
415 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
416 1
        """Add metadata to a bulk of EVCs."""
417
        data = get_json_or_400(request)
418 1
        circuit_ids = data.pop("circuit_ids")
419 1
420 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
421
422 1
        fail_evcs = []
423 1
        for _id in circuit_ids:
424 1
            try:
425
                evc = self.circuits[_id]
426 1
                evc.extend_metadata(data)
427 1
            except KeyError:
428
                fail_evcs.append(_id)
429 1
430 1
        if fail_evcs:
431 1
            raise HTTPException(404, detail=fail_evcs)
432 1
        return JSONResponse("Operation successful", status_code=201)
433 1
434
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
435
    @validate_openapi(spec)
436
    def add_metadata(self, request: Request) -> JSONResponse:
437 1
        """Add metadata to an EVC."""
438
        circuit_id = request.path_params["circuit_id"]
439 1
        metadata = get_json_or_400(request)
440
        if not isinstance(metadata, dict):
441 1
            raise HTTPException(400, "Invalid metadata value: {metadata}")
442 1
        try:
443
            evc = self.circuits[circuit_id]
444 1
        except KeyError as error:
445 1
            raise HTTPException(
446 1
                404,
447 1
                detail=f"circuit_id {circuit_id} not found."
448
            ) from error
449 1
450 1
        evc.extend_metadata(metadata)
451 1
        evc.sync()
452
        return JSONResponse("Operation successful", status_code=201)
453 1
454 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
455
    @validate_openapi(spec)
456 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
457 1
        """Delete metada from a bulk of EVCs"""
458 1
        data = get_json_or_400(request)
459 1
        key = request.path_params["key"]
460 1
        circuit_ids = data.pop("circuit_ids")
461 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
462 1
463 1
        fail_evcs = []
464 1
        for _id in circuit_ids:
465 1
            try:
466 1
                evc = self.circuits[_id]
467 1
                evc.remove_metadata(key)
468
            except KeyError:
469 1
                fail_evcs.append(_id)
470 1
471
        if fail_evcs:
472 1
            raise HTTPException(404, detail=fail_evcs)
473
        return JSONResponse("Operation successful")
474 1
475 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
476
    def delete_metadata(self, request: Request) -> JSONResponse:
477
        """Delete metadata from an EVC."""
478
        circuit_id = request.path_params["circuit_id"]
479
        key = request.path_params["key"]
480
        try:
481
            evc = self.circuits[circuit_id]
482
        except KeyError as error:
483 1
            raise HTTPException(
484 1
                404,
485 1
                detail=f"circuit_id {circuit_id} not found."
486 1
            ) from error
487 1
488 1
        evc.remove_metadata(key)
489
        evc.sync()
490 1
        return JSONResponse("Operation successful")
491 1
492 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
493 1
    def redeploy(self, request: Request) -> JSONResponse:
494 1
        """Endpoint to force the redeployment of an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
497
        try:
498
            evc = self.circuits[circuit_id]
499
        except KeyError:
500
            raise HTTPException(
501 1
                404,
502
                detail=f"circuit_id {circuit_id} not found"
503 1
            ) from KeyError
504 1
        if evc.is_enabled():
505
            with evc.lock:
506 1
                evc.remove_current_flows()
507 1
                evc.deploy()
508 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
509
            status = 202
510
        else:
511
            result = {"response": f"Circuit {circuit_id} is disabled."}
512
            status = 409
513
514
        return JSONResponse(result, status_code=status)
515
516
    @rest("/v2/evc/schedule/", methods=["POST"])
517
    @validate_openapi(spec)
518
    def create_schedule(self, request: Request) -> JSONResponse:
519
        """
520
        Create a new schedule for a given circuit.
521
522
        This service do no check if there are conflicts with another schedule.
523
        Payload example:
524 1
            {
525 1
              "circuit_id":"aa:bb:cc",
526 1
              "schedule": {
527
                "date": "2019-08-07T14:52:10.967Z",
528
                "interval": "string",
529 1
                "frequency": "1 * * * *",
530
                "action": "create"
531
              }
532 1
            }
533
        """
534
        log.debug("create_schedule /v2/evc/schedule/")
535 1
        data = get_json_or_400(request)
536 1
        circuit_id = data["circuit_id"]
537 1
        schedule_data = data["schedule"]
538 1
539
        # Get EVC from circuits buffer
540 1
        circuits = self._get_circuits_buffer()
541 1
542 1
        # get the circuit
543 1
        evc = circuits.get(circuit_id)
544
545
        # get the circuit
546 1
        if not evc:
547
            result = f"circuit_id {circuit_id} not found"
548
            log.debug("create_schedule result %s %s", result, 404)
549 1
            raise HTTPException(404, detail=result)
550 1
        # Can not modify circuits deleted and archived
551
        if evc.archived:
552
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
553 1
            log.debug("create_schedule result %s %s", result, 409)
554
            raise HTTPException(409, detail=result)
555
556 1
        # new schedule from dict
557
        new_schedule = CircuitSchedule.from_dict(schedule_data)
558
559 1
        # If there is no schedule, create the list
560
        if not evc.circuit_scheduler:
561 1
            evc.circuit_scheduler = []
562 1
563
        # Add the new schedule
564 1
        evc.circuit_scheduler.append(new_schedule)
565 1
566
        # Add schedule job
567 1
        self.sched.add_circuit_job(evc, new_schedule)
568 1
569 1
        # save circuit to mongodb
570
        evc.sync()
571
572
        result = new_schedule.as_dict()
573
        status = 201
574
575
        log.debug("create_schedule result %s %s", result, status)
576
        return JSONResponse(result, status_code=status)
577
578
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
579
    @validate_openapi(spec)
580
    def update_schedule(self, request: Request) -> JSONResponse:
581
        """Update a schedule.
582 1
583
        Change all attributes from the given schedule from a EVC circuit.
584
        The schedule ID is preserved as default.
585 1
        Payload example:
586
            {
587
              "date": "2019-08-07T14:52:10.967Z",
588 1
              "interval": "string",
589 1
              "frequency": "1 * * *",
590 1
              "action": "create"
591 1
            }
592 1
        """
593 1
        data = get_json_or_400(request)
594 1
        schedule_id = request.path_params["schedule_id"]
595 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
596
597 1
        # Try to find a circuit schedule
598 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
599
600 1
        # Can not modify circuits deleted and archived
601
        if not found_schedule:
602 1
            result = f"schedule_id {schedule_id} not found"
603
            log.debug("update_schedule result %s %s", result, 404)
604
            raise HTTPException(404, detail=result)
605 1
        if evc.archived:
606
            result = f"Circuit {evc.id} is archived. Update is forbidden."
607 1
            log.debug("update_schedule result %s %s", result, 409)
608
            raise HTTPException(409, detail=result)
609 1
610
        new_schedule = CircuitSchedule.from_dict(data)
611 1
        new_schedule.id = found_schedule.id
612 1
        # Remove the old schedule
613
        evc.circuit_scheduler.remove(found_schedule)
614 1
        # Append the modified schedule
615 1
        evc.circuit_scheduler.append(new_schedule)
616
617 1
        # Cancel all schedule jobs
618 1
        self.sched.cancel_job(found_schedule.id)
619
        # Add the new circuit schedule
620
        self.sched.add_circuit_job(evc, new_schedule)
621
        # Save EVC to mongodb
622
        evc.sync()
623
624
        result = new_schedule.as_dict()
625 1
        status = 200
626 1
627
        log.debug("update_schedule result %s %s", result, status)
628
        return JSONResponse(result, status_code=status)
629 1
630 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
631 1
    def delete_schedule(self, request: Request) -> JSONResponse:
632 1
        """Remove a circuit schedule.
633
634 1
        Remove the Schedule from EVC.
635 1
        Remove the Schedule from cron job.
636 1
        Save the EVC to the Storehouse.
637 1
        """
638
        schedule_id = request.path_params["schedule_id"]
639
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
640 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
641
642
        # Can not modify circuits deleted and archived
643 1
        if not found_schedule:
644
            result = f"schedule_id {schedule_id} not found"
645 1
            log.debug("delete_schedule result %s %s", result, 404)
646
            raise HTTPException(404, detail=result)
647 1
648 1
        if evc.archived:
649
            result = f"Circuit {evc.id} is archived. Update is forbidden."
650 1
            log.debug("delete_schedule result %s %s", result, 409)
651 1
            raise HTTPException(409, detail=result)
652
653 1
        # Remove the old schedule
654
        evc.circuit_scheduler.remove(found_schedule)
655
656
        # Cancel all schedule jobs
657
        self.sched.cancel_job(found_schedule.id)
658
        # Save EVC to mongodb
659
        evc.sync()
660
661
        result = "Schedule removed"
662
        status = 200
663 1
664 1
        log.debug("delete_schedule result %s %s", result, status)
665 1
        return JSONResponse(result, status_code=status)
666 1
667
    def _is_duplicated_evc(self, evc):
668 1
        """Verify if the circuit given is duplicated with the stored evcs.
669 1
670
        Args:
671
            evc (EVC): circuit to be analysed.
672
673 1
        Returns:
674
            boolean: True if the circuit is duplicated, otherwise False.
675 1
676 1
        """
677 1
        for circuit in tuple(self.circuits.values()):
678 1
            if not circuit.archived and circuit.shares_uni(evc):
679 1
                return True
680
        return False
681 1
682 1
    @listen_to("kytos/topology.link_up")
683
    def on_link_up(self, event):
684
        """Change circuit when link is up or end_maintenance."""
685
        self.handle_link_up(event)
686 1
687
    def handle_link_up(self, event):
688 1
        """Change circuit when link is up or end_maintenance."""
689 1
        log.info("Event handle_link_up %s", event.content["link"])
690 1
        for evc in self.get_evcs_by_svc_level():
691 1
            if evc.is_enabled() and not evc.archived:
692 1
                with evc.lock:
693 1
                    evc.handle_link_up(event.content["link"])
694 1
695 1
    @listen_to("kytos/topology.link_down")
696
    def on_link_down(self, event):
697
        """Change circuit when link is down or under_mantenance."""
698 1
        self.handle_link_down(event)
699
700
    def handle_link_down(self, event):
701
        """Change circuit when link is down or under_mantenance."""
702 1
        link = event.content["link"]
703 1
        log.info("Event handle_link_down %s", link)
704 1
        switch_flows = {}
705 1
        evcs_with_failover = []
706 1
        evcs_normal = []
707 1
        check_failover = []
708
        for evc in self.get_evcs_by_svc_level():
709 1
            if evc.is_affected_by_link(link):
710
                # if there is no failover path, handles link down the
711 1
                # tradditional way
712 1
                if (
713 1
                    not getattr(evc, 'failover_path', None) or
714 1
                    evc.is_failover_path_affected_by_link(link)
715 1
                ):
716 1
                    evcs_normal.append(evc)
717
                    continue
718
                for dpid, flows in evc.get_failover_flows().items():
719
                    switch_flows.setdefault(dpid, [])
720
                    switch_flows[dpid].extend(flows)
721
                evcs_with_failover.append(evc)
722
            else:
723
                check_failover.append(evc)
724
725 1
        offset = 0
726 1
        while switch_flows:
727 1
            offset = (offset + settings.BATCH_SIZE) or None
728 1
            switches = list(switch_flows.keys())
729 1
            for dpid in switches:
730
                emit_event(
731 1
                    self.controller,
732 1
                    context="kytos.flow_manager",
733 1
                    name="flows.install",
734 1
                    content={
735 1
                        "dpid": dpid,
736 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
737 1
                    }
738
                )
739 1
                if offset is None or offset >= len(switch_flows[dpid]):
740
                    del switch_flows[dpid]
741
                    continue
742
                switch_flows[dpid] = switch_flows[dpid][offset:]
743 1
            time.sleep(settings.BATCH_INTERVAL)
744 1
745
        for evc in evcs_with_failover:
746
            with evc.lock:
747
                old_path = evc.current_path
748
                evc.current_path = evc.failover_path
749
                evc.failover_path = old_path
750
                evc.sync()
751
            emit_event(self.controller, "redeployed_link_down",
752
                       content=map_evc_event_content(evc))
753
            log.info(
754 1
                f"{evc} redeployed with failover due to link down {link.id}"
755 1
            )
756 1
757
        for evc in evcs_normal:
758 1
            emit_event(
759 1
                self.controller,
760
                "evc_affected_by_link_down",
761
                content={"link_id": link.id} | map_evc_event_content(evc)
762
            )
763 1
764
        # After handling the hot path, check if new failover paths are needed.
765 1
        # Note that EVCs affected by link down will generate a KytosEvent for
766 1
        # deployed|redeployed, which will trigger the failover path setup.
767 1
        # Thus, we just need to further check the check_failover list
768 1
        for evc in check_failover:
769 1
            if evc.is_failover_path_affected_by_link(link):
770 1
                evc.setup_failover_path()
771 1
772 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
773 1
    def on_evc_affected_by_link_down(self, event):
774 1
        """Change circuit when link is down or under_mantenance."""
775 1
        self.handle_evc_affected_by_link_down(event)
776
777
    def handle_evc_affected_by_link_down(self, event):
778 1
        """Change circuit when link is down or under_mantenance."""
779 1
        evc = self.circuits.get(event.content["evc_id"])
780
        link_id = event.content['link_id']
781
        if not evc:
782
            return
783 1
        with evc.lock:
784
            result = evc.handle_link_down()
785 1
        event_name = "error_redeploy_link_down"
786 1
        if result:
787 1
            log.info(f"{evc} redeployed due to link down {link_id}")
788 1
            event_name = "redeployed_link_down"
789 1
        emit_event(self.controller, event_name,
790
                   content=map_evc_event_content(evc))
791 1
792 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
793
    def on_evc_deployed(self, event):
794
        """Handle EVC deployed|redeployed_link_down."""
795
        self.handle_evc_deployed(event)
796 1
797
    def handle_evc_deployed(self, event):
798 1
        """Setup failover path on evc deployed."""
799 1
        evc = self.circuits.get(event.content["evc_id"])
800 1
        if not evc:
801 1
            return
802
        with evc.lock:
803 1
            evc.setup_failover_path()
804
805 1
    @listen_to("kytos/topology.topology_loaded")
806 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
807 1
        """Load EVCs once the topology is available."""
808 1
        self.load_all_evcs()
809
810
    def load_all_evcs(self):
811 1
        """Try to load all EVCs on startup."""
812
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
813 1
        for circuit_id, circuit in circuits:
814 1
            if circuit_id not in self.circuits:
815 1
                self._load_evc(circuit)
816 1
817 1
    def _load_evc(self, circuit_dict):
818 1
        """Load one EVC from mongodb to memory."""
819 1
        try:
820
            evc = self._evc_from_dict(circuit_dict)
821 1
        except ValueError as exception:
822 1
            log.error(
823
                f"Could not load EVC: dict={circuit_dict} error={exception}"
824
            )
825
            return None
826 1
827
        if evc.archived:
828 1
            return None
829 1
        evc.deactivate()
830 1
        evc.sync()
831
        self.circuits.setdefault(evc.id, evc)
832 1
        self.sched.add(evc)
833 1
        return evc
834 1
835
    @listen_to("kytos/flow_manager.flow.error")
836 1
    def on_flow_mod_error(self, event):
837
        """Handle flow mod errors related to an EVC."""
838
        self.handle_flow_mod_error(event)
839
840
    def handle_flow_mod_error(self, event):
841 1
        """Handle flow mod errors related to an EVC."""
842 1
        flow = event.content["flow"]
843
        command = event.content.get("error_command")
844
        if command != "add":
845 1
            return
846 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
847 1
        if evc:
848 1
            evc.remove_current_flows()
849 1
850 1
    def _evc_dict_with_instances(self, evc_dict):
851
        """Convert some dict values to instance of EVC classes.
852 1
853 1
        This method will convert: [UNI, Link]
854 1
        """
855 1
        data = evc_dict.copy()  # Do not modify the original dict
856
        for attribute, value in data.items():
857
            # Get multiple attributes.
858
            # Ex: uni_a, uni_z
859
            if "uni" in attribute:
860
                try:
861
                    data[attribute] = self._uni_from_dict(value)
862
                except ValueError as exception:
863 1
                    result = "Error creating UNI: Invalid value"
864 1
                    raise ValueError(result) from exception
865
866
            if attribute == "circuit_scheduler":
867
                data[attribute] = []
868
                for schedule in value:
869
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
870
871 1
            # Get multiple attributes.
872 1
            # Ex: primary_links,
873
            #     backup_links,
874
            #     current_links_cache,
875
            #     primary_links_cache,
876 1
            #     backup_links_cache
877
            if "links" in attribute:
878 1
                data[attribute] = [
879 1
                    self._link_from_dict(link) for link in value
880 1
                ]
881
882 1
            # Ex: current_path,
883
            #     primary_path,
884 1
            #     backup_path
885 1
            if "path" in attribute and attribute != "dynamic_backup_path":
886
                data[attribute] = Path(
887 1
                    [self._link_from_dict(link) for link in value]
888 1
                )
889 1
890 1
        return data
891
892
    def _evc_from_dict(self, evc_dict):
893
        data = self._evc_dict_with_instances(evc_dict)
894 1
        return EVC(self.controller, **data)
895
896 1
    def _uni_from_dict(self, uni_dict):
897 1
        """Return a UNI object from python dict."""
898 1
        if uni_dict is None:
899
            return False
900 1
901 1
        interface_id = uni_dict.get("interface_id")
902
        interface = self.controller.get_interface_by_id(interface_id)
903 1
        if interface is None:
904
            result = (
905 1
                "Error creating UNI:"
906
                + f"Could not instantiate interface {interface_id}"
907 1
            )
908 1
            raise ValueError(result) from ValueError
909
910 1
        tag_dict = uni_dict.get("tag", None)
911 1
        if tag_dict:
912 1
            tag = TAG.from_dict(tag_dict)
913 1
        else:
914 1
            tag = None
915 1
        uni = UNI(interface, tag)
916
917
        return uni
918
919 1
    def _link_from_dict(self, link_dict):
920 1
        """Return a Link object from python dict."""
921 1
        id_a = link_dict.get("endpoint_a").get("id")
922
        id_b = link_dict.get("endpoint_b").get("id")
923 1
924 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
925 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
926 1
        if not endpoint_a:
927
            error_msg = f"Could not get interface endpoint_a id {id_a}"
928
            raise ValueError(error_msg)
929 1
        if not endpoint_b:
930 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
931
            raise ValueError(error_msg)
932 1
933
        link = Link(endpoint_a, endpoint_b)
934
        if "metadata" in link_dict:
935
            link.extend_metadata(link_dict.get("metadata"))
936
937
        s_vlan = link.get_metadata("s_vlan")
938
        if s_vlan:
939 1
            tag = TAG.from_dict(s_vlan)
940 1
            if tag is False:
941 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
942
                raise ValueError(error_msg)
943
            link.update_metadata("s_vlan", tag)
944 1
        return link
945 1
946 1
    def _find_evc_by_schedule_id(self, schedule_id):
947 1
        """
948 1
        Find an EVC and CircuitSchedule based on schedule_id.
949 1
950 1
        :param schedule_id: Schedule ID
951 1
        :return: EVC and Schedule
952 1
        """
953
        circuits = self._get_circuits_buffer()
954 1
        found_schedule = None
955
        evc = None
956
957
        # pylint: disable=unused-variable
958
        for c_id, circuit in circuits.items():
959
            for schedule in circuit.circuit_scheduler:
960 1
                if schedule.id == schedule_id:
961
                    found_schedule = schedule
962 1
                    evc = circuit
963 1
                    break
964 1
            if found_schedule:
965 1
                break
966 1
        return evc, found_schedule
967
968
    def _get_circuits_buffer(self):
969
        """
970
        Return the circuit buffer.
971
972
        If the buffer is empty, try to load data from mongodb.
973
        """
974
        if not self.circuits:
975
            # Load circuits from mongodb to buffer
976
            circuits = self.mongo_controller.get_circuits()['circuits']
977
            for c_id, circuit in circuits.items():
978
                evc = self._evc_from_dict(circuit)
979
                self.circuits[c_id] = evc
980
        return self.circuits
981