Passed
Pull Request — master (#320)
by Vinicius
07:35 queued 03:38
created

build.main.Main.add_metadata()   A

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 3.0032

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 2
dl 0
loc 19
ccs 13
cts 14
cp 0.9286
crap 3.0032
rs 9.55
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import listen_to, load_spec, validate_openapi
14 1
from kytos.core.interface import TAG, UNI
15 1
from kytos.core.link import Link
16 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
17
                                 get_json_or_400)
18 1
from napps.kytos.mef_eline import controllers, settings
19 1
from napps.kytos.mef_eline.exceptions import InvalidPath
20 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
21
                                          Path)
22 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
23 1
from napps.kytos.mef_eline.utils import emit_event, map_evc_event_content
24
25
26
# pylint: disable=too-many-public-methods
27 1
class Main(KytosNApp):
28
    """Main class of amlight/mef_eline NApp.
29
30
    This class is the entry point for this napp.
31
    """
32
33 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
34
35 1
    def setup(self):
36
        """Replace the '__init__' method for the KytosNApp subclass.
37
38
        The setup method is automatically called by the controller when your
39
        application is loaded.
40
41
        So, if you have any setup routine, insert it here.
42
        """
43
        # object used to scheduler circuit events
44 1
        self.sched = Scheduler()
45
46
        # object to save and load circuits
47 1
        self.mongo_controller = self.get_eline_controller()
48 1
        self.mongo_controller.bootstrap_indexes()
49
50
        # set the controller that will manager the dynamic paths
51 1
        DynamicPathManager.set_controller(self.controller)
52
53
        # dictionary of EVCs created. It acts as a circuit buffer.
54
        # Every create/update/delete must be synced to mongodb.
55 1
        self.circuits = {}
56
57 1
        self._lock = Lock()
58 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
59
60 1
        self.load_all_evcs()
61
62 1
    def get_evcs_by_svc_level(self) -> list:
63
        """Get circuits sorted by desc service level and asc creation_time.
64
65
        In the future, as more ops are offloaded it should be get from the DB.
66
        """
67 1
        return sorted(self.circuits.values(),
68
                      key=lambda x: (-x.service_level, x.creation_time))
69
70 1
    @staticmethod
71 1
    def get_eline_controller():
72
        """Return the ELineController instance."""
73
        return controllers.ELineController()
74
75 1
    def execute(self):
76
        """Execute once when the napp is running."""
77 1
        if self._lock.locked():
78 1
            return
79 1
        log.debug("Starting consistency routine")
80 1
        with self._lock:
81 1
            self.execute_consistency()
82 1
        log.debug("Finished consistency routine")
83
84 1
    @staticmethod
85 1
    def should_be_checked(circuit):
86
        "Verify if the circuit meets the necessary conditions to be checked"
87
        # pylint: disable=too-many-boolean-expressions
88 1
        if (
89
                circuit.is_enabled()
90
                and not circuit.is_active()
91
                and not circuit.lock.locked()
92
                and not circuit.has_recent_removed_flow()
93
                and not circuit.is_recent_updated()
94
                # if a inter-switch EVC does not have current_path, it does not
95
                # make sense to run sdntrace on it
96
                and (circuit.is_intra_switch() or circuit.current_path)
97
                ):
98 1
            return True
99
        return False
100
101 1
    def execute_consistency(self):
102
        """Execute consistency routine."""
103 1
        circuits_to_check = []
104 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
105 1
        for circuit in self.get_evcs_by_svc_level():
106 1
            stored_circuits.pop(circuit.id, None)
107 1
            if self.should_be_checked(circuit):
108 1
                circuits_to_check.append(circuit)
109 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
110 1
        for circuit in circuits_to_check:
111 1
            is_checked = circuits_checked.get(circuit.id)
112 1
            if is_checked:
113 1
                circuit.execution_rounds = 0
114 1
                log.info(f"{circuit} enabled but inactive - activating")
115 1
                with circuit.lock:
116 1
                    circuit.activate()
117 1
                    circuit.sync()
118
            else:
119 1
                circuit.execution_rounds += 1
120 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
121 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
122 1
                    with circuit.lock:
123 1
                        circuit.deploy()
124 1
        for circuit_id in stored_circuits:
125 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
126 1
            self._load_evc(stored_circuits[circuit_id])
127
128 1
    def shutdown(self):
129
        """Execute when your napp is unloaded.
130
131
        If you have some cleanup procedure, insert it here.
132
        """
133
134 1
    @rest("/v2/evc/", methods=["GET"])
135 1
    def list_circuits(self, request: Request) -> JSONResponse:
136
        """Endpoint to return circuits stored.
137
138
        archive query arg if defined (not null) will be filtered
139
        accordingly, by default only non archived evcs will be listed
140
        """
141 1
        log.debug("list_circuits /v2/evc")
142 1
        args = request.query_params
143 1
        archived = args.get("archived", "false").lower()
144 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
145 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
146
                                                      metadata=args)
147 1
        circuits = circuits['circuits']
148 1
        return JSONResponse(circuits)
149
150 1
    @rest("/v2/evc/schedule", methods=["GET"])
151 1
    def list_schedules(self, _request: Request) -> JSONResponse:
152
        """Endpoint to return all schedules stored for all circuits.
153
154
        Return a JSON with the following template:
155
        [{"schedule_id": <schedule_id>,
156
         "circuit_id": <circuit_id>,
157
         "schedule": <schedule object>}]
158
        """
159 1
        log.debug("list_schedules /v2/evc/schedule")
160 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
161 1
        if not circuits:
162 1
            result = {}
163 1
            status = 200
164 1
            return JSONResponse(result, status_code=status)
165
166 1
        result = []
167 1
        status = 200
168 1
        for circuit in circuits:
169 1
            circuit_scheduler = circuit.get("circuit_scheduler")
170 1
            if circuit_scheduler:
171 1
                for scheduler in circuit_scheduler:
172 1
                    value = {
173
                        "schedule_id": scheduler.get("id"),
174
                        "circuit_id": circuit.get("id"),
175
                        "schedule": scheduler,
176
                    }
177 1
                    result.append(value)
178
179 1
        log.debug("list_schedules result %s %s", result, status)
180 1
        return JSONResponse(result, status_code=status)
181
182 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
183 1
    def get_circuit(self, request: Request) -> JSONResponse:
184
        """Endpoint to return a circuit based on id."""
185 1
        circuit_id = request.path_params["circuit_id"]
186 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
187 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
188 1
        if not circuit:
189 1
            result = f"circuit_id {circuit_id} not found"
190 1
            log.debug("get_circuit result %s %s", result, 404)
191 1
            raise HTTPException(404, detail=result)
192 1
        status = 200
193 1
        log.debug("get_circuit result %s %s", circuit, status)
194 1
        return JSONResponse(circuit, status_code=status)
195
196 1
    @rest("/v2/evc/", methods=["POST"])
197 1
    @validate_openapi(spec)
198 1
    def create_circuit(self, request: Request) -> JSONResponse:
199
        """Try to create a new circuit.
200
201
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
202
        UNI_Z's requested C-VID are available from the interfaces' pools. This
203
        is checked when creating the UNI object.
204
205
        Then, E-Line NApp requests a primary and a backup path to the
206
        Pathfinder NApp using the attributes primary_links and backup_links
207
        submitted via REST
208
209
        # For each link composing paths in #3:
210
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
211
        #  - Using the S-VID obtained, generate abstract flow entries to be
212
        #    sent to FlowManager
213
214
        Push abstract flow entries to FlowManager and FlowManager pushes
215
        OpenFlow entries to datapaths
216
217
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
218
        creation
219
220
        Finnaly, notify user of the status of its request.
221
        """
222
        # Try to create the circuit object
223 1
        log.debug("create_circuit /v2/evc/")
224 1
        data = get_json_or_400(request)
225
226 1
        try:
227 1
            evc = self._evc_from_dict(data)
228 1
        except ValueError as exception:
229 1
            log.debug("create_circuit result %s %s", exception, 400)
230 1
            raise HTTPException(400, detail=str(exception)) from exception
231
232 1
        if evc.primary_path:
233
            try:
234
                evc.primary_path.is_valid(
235
                    evc.uni_a.interface.switch,
236
                    evc.uni_z.interface.switch,
237
                    bool(evc.circuit_scheduler),
238
                )
239
            except InvalidPath as exception:
240
                raise HTTPException(
241
                    400,
242
                    detail=f"primary_path is not valid: {exception}"
243
                ) from exception
244 1
        if evc.backup_path:
245
            try:
246
                evc.backup_path.is_valid(
247
                    evc.uni_a.interface.switch,
248
                    evc.uni_z.interface.switch,
249
                    bool(evc.circuit_scheduler),
250
                )
251
            except InvalidPath as exception:
252
                raise HTTPException(
253
                    400,
254
                    detail=f"backup_path is not valid: {exception}"
255
                ) from exception
256
257
        # verify duplicated evc
258 1
        if self._is_duplicated_evc(evc):
259 1
            result = "The EVC already exists."
260 1
            log.debug("create_circuit result %s %s", result, 409)
261 1
            raise HTTPException(409, detail=result)
262
263 1
        try:
264 1
            evc._validate_has_primary_or_dynamic()
265 1
        except ValueError as exception:
266 1
            raise HTTPException(400, detail=str(exception)) from exception
267
268
        # save circuit
269 1
        try:
270 1
            evc.sync()
271
        except ValidationError as exception:
272
            raise HTTPException(400, detail=str(exception)) from exception
273
274
        # store circuit in dictionary
275 1
        self.circuits[evc.id] = evc
276
277
        # Schedule the circuit deploy
278 1
        self.sched.add(evc)
279
280
        # Circuit has no schedule, deploy now
281 1
        if not evc.circuit_scheduler:
282 1
            with evc.lock:
283 1
                evc.deploy()
284
285
        # Notify users
286 1
        result = {"circuit_id": evc.id}
287 1
        status = 201
288 1
        log.debug("create_circuit result %s %s", result, status)
289 1
        emit_event(self.controller, name="created",
290
                   content=map_evc_event_content(evc))
291 1
        return JSONResponse(result, status_code=status)
292
293 1
    @listen_to('kytos/flow_manager.flow.removed')
294 1
    def on_flow_delete(self, event):
295
        """Capture delete messages to keep track when flows got removed."""
296
        self.handle_flow_delete(event)
297
298 1
    def handle_flow_delete(self, event):
299
        """Keep track when the EVC got flows removed by deriving its cookie."""
300 1
        flow = event.content["flow"]
301 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
302 1
        if evc:
303 1
            log.debug("Flow removed in EVC %s", evc.id)
304 1
            evc.set_flow_removed_at()
305
306 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
307 1
    @validate_openapi(spec)
308 1
    def update(self, request: Request) -> JSONResponse:
309
        """Update a circuit based on payload.
310
311
        The EVC attributes (creation_time, active, current_path,
312
        failover_path, _id, archived) can't be updated.
313
        """
314 1
        data = get_json_or_400(request)
315 1
        circuit_id = request.path_params["circuit_id"]
316 1
        log.debug("update /v2/evc/%s", circuit_id)
317 1
        try:
318 1
            evc = self.circuits[circuit_id]
319 1
        except KeyError:
320 1
            result = f"circuit_id {circuit_id} not found"
321 1
            log.debug("update result %s %s", result, 404)
322 1
            raise HTTPException(404, detail=result) from KeyError
323
324 1
        if evc.archived:
325 1
            result = "Can't update archived EVC"
326 1
            log.debug("update result %s %s", result, 409)
327 1
            raise HTTPException(409, detail=result)
328
329 1
        try:
330 1
            enable, redeploy = evc.update(
331
                **self._evc_dict_with_instances(data)
332
            )
333 1
        except ValidationError as exception:
334
            raise HTTPException(400, detail=str(exception)) from exception
335 1
        except ValueError as exception:
336 1
            log.error(exception)
337 1
            log.debug("update result %s %s", exception, 400)
338 1
            raise HTTPException(400, detail=str(exception)) from exception
339
340 1
        if evc.is_active():
341
            if enable is False:  # disable if active
342
                with evc.lock:
343
                    evc.remove()
344
            elif redeploy is not None:  # redeploy if active
345
                with evc.lock:
346
                    evc.remove()
347
                    evc.deploy()
348
        else:
349 1
            if enable is True:  # enable if inactive
350 1
                with evc.lock:
351 1
                    evc.deploy()
352 1
        result = {evc.id: evc.as_dict()}
353 1
        status = 200
354
355 1
        log.debug("update result %s %s", result, status)
356 1
        emit_event(self.controller, "updated",
357
                   content=map_evc_event_content(evc, **data))
358 1
        return JSONResponse(result, status_code=status)
359
360 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
361 1
    def delete_circuit(self, request: Request) -> JSONResponse:
362
        """Remove a circuit.
363
364
        First, the flows are removed from the switches, and then the EVC is
365
        disabled.
366
        """
367 1
        circuit_id = request.path_params["circuit_id"]
368 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
369 1
        try:
370 1
            evc = self.circuits[circuit_id]
371 1
        except KeyError:
372 1
            result = f"circuit_id {circuit_id} not found"
373 1
            log.debug("delete_circuit result %s %s", result, 404)
374 1
            raise HTTPException(404, detail=result) from KeyError
375
376 1
        if evc.archived:
377 1
            result = f"Circuit {circuit_id} already removed"
378 1
            log.debug("delete_circuit result %s %s", result, 404)
379 1
            raise HTTPException(404, detail=result)
380
381 1
        log.info("Removing %s", evc)
382 1
        with evc.lock:
383 1
            evc.remove_current_flows()
384 1
            evc.remove_failover_flows(sync=False)
385 1
            evc.deactivate()
386 1
            evc.disable()
387 1
            self.sched.remove(evc)
388 1
            evc.archive()
389 1
            evc.sync()
390 1
        log.info("EVC removed. %s", evc)
391 1
        result = {"response": f"Circuit {circuit_id} removed"}
392 1
        status = 200
393
394 1
        log.debug("delete_circuit result %s %s", result, status)
395 1
        emit_event(self.controller, "deleted",
396
                   content=map_evc_event_content(evc))
397 1
        return JSONResponse(result, status_code=status)
398
399 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
400 1
    def get_metadata(self, request: Request) -> JSONResponse:
401
        """Get metadata from an EVC."""
402 1
        circuit_id = request.path_params["circuit_id"]
403 1
        try:
404 1
            return (
405
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
406
            )
407
        except KeyError as error:
408
            raise HTTPException(
409
                404,
410
                detail=f"circuit_id {circuit_id} not found."
411
            ) from error
412
413 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
414 1
    @validate_openapi(spec)
415 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
416
        """Add metadata to a bulk of EVCs."""
417 1
        data = get_json_or_400(request)
418 1
        circuit_ids = data.pop("circuit_ids")
419
420 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
421
422 1
        fail_evcs = []
423 1
        for _id in circuit_ids:
424 1
            try:
425 1
                evc = self.circuits[_id]
426 1
                evc.extend_metadata(data)
427 1
            except KeyError:
428 1
                fail_evcs.append(_id)
429
430 1
        if fail_evcs:
431 1
            raise HTTPException(404, detail=fail_evcs)
432 1
        return JSONResponse("Operation successful", status_code=201)
433
434 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
435 1
    @validate_openapi(spec)
436 1
    def add_metadata(self, request: Request) -> JSONResponse:
437
        """Add metadata to an EVC."""
438 1
        circuit_id = request.path_params["circuit_id"]
439 1
        metadata = get_json_or_400(request)
440 1
        if not isinstance(metadata, dict):
441
            raise HTTPException(400, "Invalid metadata value: {metadata}")
442 1
        try:
443 1
            evc = self.circuits[circuit_id]
444 1
        except KeyError as error:
445 1
            raise HTTPException(
446
                404,
447
                detail=f"circuit_id {circuit_id} not found."
448
            ) from error
449
450 1
        evc.extend_metadata(metadata)
451 1
        evc.sync()
452 1
        return JSONResponse("Operation successful", status_code=201)
453
454 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
455 1
    @validate_openapi(spec)
456 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
457
        """Delete metada from a bulk of EVCs"""
458 1
        data = get_json_or_400(request)
459 1
        key = request.path_params["key"]
460 1
        circuit_ids = data.pop("circuit_ids")
461 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
462
463 1
        fail_evcs = []
464 1
        for _id in circuit_ids:
465 1
            try:
466 1
                evc = self.circuits[_id]
467 1
                evc.remove_metadata(key)
468
            except KeyError:
469
                fail_evcs.append(_id)
470
471 1
        if fail_evcs:
472
            raise HTTPException(404, detail=fail_evcs)
473 1
        return JSONResponse("Operation successful")
474
475 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
476 1
    def delete_metadata(self, request: Request) -> JSONResponse:
477
        """Delete metadata from an EVC."""
478 1
        circuit_id = request.path_params["circuit_id"]
479 1
        key = request.path_params["key"]
480 1
        try:
481 1
            evc = self.circuits[circuit_id]
482 1
        except KeyError as error:
483 1
            raise HTTPException(
484
                404,
485
                detail=f"circuit_id {circuit_id} not found."
486
            ) from error
487
488 1
        evc.remove_metadata(key)
489 1
        evc.sync()
490 1
        return JSONResponse("Operation successful")
491
492 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
493 1
    def redeploy(self, request: Request) -> JSONResponse:
494
        """Endpoint to force the redeployment of an EVC."""
495 1
        circuit_id = request.path_params["circuit_id"]
496 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
497 1
        try:
498 1
            evc = self.circuits[circuit_id]
499 1
        except KeyError:
500 1
            raise HTTPException(
501
                404,
502
                detail=f"circuit_id {circuit_id} not found"
503
            ) from KeyError
504 1
        if evc.is_enabled():
505 1
            with evc.lock:
506 1
                evc.remove_current_flows()
507 1
                evc.deploy()
508 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
509 1
            status = 202
510
        else:
511 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
512 1
            status = 409
513
514 1
        return JSONResponse(result, status_code=status)
515
516 1
    @rest("/v2/evc/schedule/", methods=["POST"])
517 1
    @validate_openapi(spec)
518 1
    def create_schedule(self, request: Request) -> JSONResponse:
519
        """
520
        Create a new schedule for a given circuit.
521
522
        This service do no check if there are conflicts with another schedule.
523
        Payload example:
524
            {
525
              "circuit_id":"aa:bb:cc",
526
              "schedule": {
527
                "date": "2019-08-07T14:52:10.967Z",
528
                "interval": "string",
529
                "frequency": "1 * * * *",
530
                "action": "create"
531
              }
532
            }
533
        """
534 1
        log.debug("create_schedule /v2/evc/schedule/")
535 1
        data = get_json_or_400(request)
536 1
        circuit_id = data["circuit_id"]
537 1
        schedule_data = data["schedule"]
538
539
        # Get EVC from circuits buffer
540 1
        circuits = self._get_circuits_buffer()
541
542
        # get the circuit
543 1
        evc = circuits.get(circuit_id)
544
545
        # get the circuit
546 1
        if not evc:
547 1
            result = f"circuit_id {circuit_id} not found"
548 1
            log.debug("create_schedule result %s %s", result, 404)
549 1
            raise HTTPException(404, detail=result)
550
        # Can not modify circuits deleted and archived
551 1
        if evc.archived:
552 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
553 1
            log.debug("create_schedule result %s %s", result, 409)
554 1
            raise HTTPException(409, detail=result)
555
556
        # new schedule from dict
557 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
558
559
        # If there is no schedule, create the list
560 1
        if not evc.circuit_scheduler:
561 1
            evc.circuit_scheduler = []
562
563
        # Add the new schedule
564 1
        evc.circuit_scheduler.append(new_schedule)
565
566
        # Add schedule job
567 1
        self.sched.add_circuit_job(evc, new_schedule)
568
569
        # save circuit to mongodb
570 1
        evc.sync()
571
572 1
        result = new_schedule.as_dict()
573 1
        status = 201
574
575 1
        log.debug("create_schedule result %s %s", result, status)
576 1
        return JSONResponse(result, status_code=status)
577
578 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
579 1
    @validate_openapi(spec)
580 1
    def update_schedule(self, request: Request) -> JSONResponse:
581
        """Update a schedule.
582
583
        Change all attributes from the given schedule from a EVC circuit.
584
        The schedule ID is preserved as default.
585
        Payload example:
586
            {
587
              "date": "2019-08-07T14:52:10.967Z",
588
              "interval": "string",
589
              "frequency": "1 * * *",
590
              "action": "create"
591
            }
592
        """
593 1
        data = get_json_or_400(request)
594 1
        schedule_id = request.path_params["schedule_id"]
595 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
596
597
        # Try to find a circuit schedule
598 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
599
600
        # Can not modify circuits deleted and archived
601 1
        if not found_schedule:
602 1
            result = f"schedule_id {schedule_id} not found"
603 1
            log.debug("update_schedule result %s %s", result, 404)
604 1
            raise HTTPException(404, detail=result)
605 1
        if evc.archived:
606 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
607 1
            log.debug("update_schedule result %s %s", result, 409)
608 1
            raise HTTPException(409, detail=result)
609
610 1
        new_schedule = CircuitSchedule.from_dict(data)
611 1
        new_schedule.id = found_schedule.id
612
        # Remove the old schedule
613 1
        evc.circuit_scheduler.remove(found_schedule)
614
        # Append the modified schedule
615 1
        evc.circuit_scheduler.append(new_schedule)
616
617
        # Cancel all schedule jobs
618 1
        self.sched.cancel_job(found_schedule.id)
619
        # Add the new circuit schedule
620 1
        self.sched.add_circuit_job(evc, new_schedule)
621
        # Save EVC to mongodb
622 1
        evc.sync()
623
624 1
        result = new_schedule.as_dict()
625 1
        status = 200
626
627 1
        log.debug("update_schedule result %s %s", result, status)
628 1
        return JSONResponse(result, status_code=status)
629
630 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
631 1
    def delete_schedule(self, request: Request) -> JSONResponse:
632
        """Remove a circuit schedule.
633
634
        Remove the Schedule from EVC.
635
        Remove the Schedule from cron job.
636
        Save the EVC to the Storehouse.
637
        """
638 1
        schedule_id = request.path_params["schedule_id"]
639 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
640 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
641
642
        # Can not modify circuits deleted and archived
643 1
        if not found_schedule:
644 1
            result = f"schedule_id {schedule_id} not found"
645 1
            log.debug("delete_schedule result %s %s", result, 404)
646 1
            raise HTTPException(404, detail=result)
647
648 1
        if evc.archived:
649 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
650 1
            log.debug("delete_schedule result %s %s", result, 409)
651 1
            raise HTTPException(409, detail=result)
652
653
        # Remove the old schedule
654 1
        evc.circuit_scheduler.remove(found_schedule)
655
656
        # Cancel all schedule jobs
657 1
        self.sched.cancel_job(found_schedule.id)
658
        # Save EVC to mongodb
659 1
        evc.sync()
660
661 1
        result = "Schedule removed"
662 1
        status = 200
663
664 1
        log.debug("delete_schedule result %s %s", result, status)
665 1
        return JSONResponse(result, status_code=status)
666
667 1
    def _is_duplicated_evc(self, evc):
668
        """Verify if the circuit given is duplicated with the stored evcs.
669
670
        Args:
671
            evc (EVC): circuit to be analysed.
672
673
        Returns:
674
            boolean: True if the circuit is duplicated, otherwise False.
675
676
        """
677 1
        for circuit in tuple(self.circuits.values()):
678 1
            if not circuit.archived and circuit.shares_uni(evc):
679 1
                return True
680 1
        return False
681
682 1
    @listen_to("kytos/topology.link_up")
683 1
    def on_link_up(self, event):
684
        """Change circuit when link is up or end_maintenance."""
685
        self.handle_link_up(event)
686
687 1
    def handle_link_up(self, event):
688
        """Change circuit when link is up or end_maintenance."""
689 1
        log.info("Event handle_link_up %s", event.content["link"])
690 1
        for evc in self.get_evcs_by_svc_level():
691 1
            if evc.is_enabled() and not evc.archived:
692 1
                with evc.lock:
693 1
                    evc.handle_link_up(event.content["link"])
694
695 1
    @listen_to("kytos/topology.link_down")
696 1
    def on_link_down(self, event):
697
        """Change circuit when link is down or under_mantenance."""
698
        self.handle_link_down(event)
699
700 1
    def handle_link_down(self, event):
701
        """Change circuit when link is down or under_mantenance."""
702 1
        link = event.content["link"]
703 1
        log.info("Event handle_link_down %s", link)
704 1
        switch_flows = {}
705 1
        evcs_with_failover = []
706 1
        evcs_normal = []
707 1
        check_failover = []
708 1
        for evc in self.get_evcs_by_svc_level():
709 1
            if evc.is_affected_by_link(link):
710
                # if there is no failover path, handles link down the
711
                # tradditional way
712 1
                if (
713
                    not getattr(evc, 'failover_path', None) or
714
                    evc.is_failover_path_affected_by_link(link)
715
                ):
716 1
                    evcs_normal.append(evc)
717 1
                    continue
718 1
                for dpid, flows in evc.get_failover_flows().items():
719 1
                    switch_flows.setdefault(dpid, [])
720 1
                    switch_flows[dpid].extend(flows)
721 1
                evcs_with_failover.append(evc)
722
            else:
723 1
                check_failover.append(evc)
724
725 1
        offset = 0
726 1
        while switch_flows:
727 1
            offset = (offset + settings.BATCH_SIZE) or None
728 1
            switches = list(switch_flows.keys())
729 1
            for dpid in switches:
730 1
                emit_event(
731
                    self.controller,
732
                    context="kytos.flow_manager",
733
                    name="flows.install",
734
                    content={
735
                        "dpid": dpid,
736
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
737
                    }
738
                )
739 1
                if offset is None or offset >= len(switch_flows[dpid]):
740 1
                    del switch_flows[dpid]
741 1
                    continue
742 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
743 1
            time.sleep(settings.BATCH_INTERVAL)
744
745 1
        for evc in evcs_with_failover:
746 1
            with evc.lock:
747 1
                old_path = evc.current_path
748 1
                evc.current_path = evc.failover_path
749 1
                evc.failover_path = old_path
750 1
                evc.sync()
751 1
            emit_event(self.controller, "redeployed_link_down",
752
                       content=map_evc_event_content(evc))
753 1
            log.info(
754
                f"{evc} redeployed with failover due to link down {link.id}"
755
            )
756
757 1
        for evc in evcs_normal:
758 1
            emit_event(
759
                self.controller,
760
                "evc_affected_by_link_down",
761
                content={"link_id": link.id} | map_evc_event_content(evc)
762
            )
763
764
        # After handling the hot path, check if new failover paths are needed.
765
        # Note that EVCs affected by link down will generate a KytosEvent for
766
        # deployed|redeployed, which will trigger the failover path setup.
767
        # Thus, we just need to further check the check_failover list
768 1
        for evc in check_failover:
769 1
            if evc.is_failover_path_affected_by_link(link):
770 1
                evc.setup_failover_path()
771
772 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
773 1
    def on_evc_affected_by_link_down(self, event):
774
        """Change circuit when link is down or under_mantenance."""
775
        self.handle_evc_affected_by_link_down(event)
776
777 1
    def handle_evc_affected_by_link_down(self, event):
778
        """Change circuit when link is down or under_mantenance."""
779 1
        evc = self.circuits.get(event.content["evc_id"])
780 1
        link_id = event.content['link_id']
781 1
        if not evc:
782 1
            return
783 1
        with evc.lock:
784 1
            result = evc.handle_link_down()
785 1
        event_name = "error_redeploy_link_down"
786 1
        if result:
787 1
            log.info(f"{evc} redeployed due to link down {link_id}")
788 1
            event_name = "redeployed_link_down"
789 1
        emit_event(self.controller, event_name,
790
                   content=map_evc_event_content(evc))
791
792 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
793 1
    def on_evc_deployed(self, event):
794
        """Handle EVC deployed|redeployed_link_down."""
795
        self.handle_evc_deployed(event)
796
797 1
    def handle_evc_deployed(self, event):
798
        """Setup failover path on evc deployed."""
799 1
        evc = self.circuits.get(event.content["evc_id"])
800 1
        if not evc:
801 1
            return
802 1
        with evc.lock:
803 1
            evc.setup_failover_path()
804
805 1
    @listen_to("kytos/topology.topology_loaded")
806 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
807
        """Load EVCs once the topology is available."""
808
        self.load_all_evcs()
809
810 1
    def load_all_evcs(self):
811
        """Try to load all EVCs on startup."""
812 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
813 1
        for circuit_id, circuit in circuits:
814 1
            if circuit_id not in self.circuits:
815 1
                self._load_evc(circuit)
816
817 1
    def _load_evc(self, circuit_dict):
818
        """Load one EVC from mongodb to memory."""
819 1
        try:
820 1
            evc = self._evc_from_dict(circuit_dict)
821 1
        except ValueError as exception:
822 1
            log.error(
823
                f"Could not load EVC: dict={circuit_dict} error={exception}"
824
            )
825 1
            return None
826
827 1
        if evc.archived:
828 1
            return None
829 1
        evc.deactivate()
830 1
        evc.sync()
831 1
        self.circuits.setdefault(evc.id, evc)
832 1
        self.sched.add(evc)
833 1
        return evc
834
835 1
    @listen_to("kytos/flow_manager.flow.error")
836 1
    def on_flow_mod_error(self, event):
837
        """Handle flow mod errors related to an EVC."""
838
        self.handle_flow_mod_error(event)
839
840 1
    def handle_flow_mod_error(self, event):
841
        """Handle flow mod errors related to an EVC."""
842 1
        flow = event.content["flow"]
843 1
        command = event.content.get("error_command")
844 1
        if command != "add":
845
            return
846 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
847 1
        if evc:
848 1
            evc.remove_current_flows()
849
850 1
    def _evc_dict_with_instances(self, evc_dict):
851
        """Convert some dict values to instance of EVC classes.
852
853
        This method will convert: [UNI, Link]
854
        """
855 1
        data = evc_dict.copy()  # Do not modify the original dict
856 1
        for attribute, value in data.items():
857
            # Get multiple attributes.
858
            # Ex: uni_a, uni_z
859 1
            if "uni" in attribute:
860 1
                try:
861 1
                    data[attribute] = self._uni_from_dict(value)
862 1
                except ValueError as exception:
863 1
                    result = "Error creating UNI: Invalid value"
864 1
                    raise ValueError(result) from exception
865
866 1
            if attribute == "circuit_scheduler":
867 1
                data[attribute] = []
868 1
                for schedule in value:
869 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
870
871
            # Get multiple attributes.
872
            # Ex: primary_links,
873
            #     backup_links,
874
            #     current_links_cache,
875
            #     primary_links_cache,
876
            #     backup_links_cache
877 1
            if "links" in attribute:
878 1
                data[attribute] = [
879
                    self._link_from_dict(link) for link in value
880
                ]
881
882
            # Ex: current_path,
883
            #     primary_path,
884
            #     backup_path
885 1
            if "path" in attribute and attribute != "dynamic_backup_path":
886 1
                data[attribute] = Path(
887
                    [self._link_from_dict(link) for link in value]
888
                )
889
890 1
        return data
891
892 1
    def _evc_from_dict(self, evc_dict):
893 1
        data = self._evc_dict_with_instances(evc_dict)
894 1
        return EVC(self.controller, **data)
895
896 1
    def _uni_from_dict(self, uni_dict):
897
        """Return a UNI object from python dict."""
898 1
        if uni_dict is None:
899 1
            return False
900
901 1
        interface_id = uni_dict.get("interface_id")
902 1
        interface = self.controller.get_interface_by_id(interface_id)
903 1
        if interface is None:
904 1
            result = (
905
                "Error creating UNI:"
906
                + f"Could not instantiate interface {interface_id}"
907
            )
908 1
            raise ValueError(result) from ValueError
909
910 1
        tag_dict = uni_dict.get("tag", None)
911 1
        if tag_dict:
912 1
            tag = TAG.from_dict(tag_dict)
913
        else:
914 1
            tag = None
915 1
        uni = UNI(interface, tag)
916
917 1
        return uni
918
919 1
    def _link_from_dict(self, link_dict):
920
        """Return a Link object from python dict."""
921 1
        id_a = link_dict.get("endpoint_a").get("id")
922 1
        id_b = link_dict.get("endpoint_b").get("id")
923
924 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
925 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
926 1
        if not endpoint_a:
927 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
928 1
            raise ValueError(error_msg)
929 1
        if not endpoint_b:
930
            error_msg = f"Could not get interface endpoint_b id {id_b}"
931
            raise ValueError(error_msg)
932
933 1
        link = Link(endpoint_a, endpoint_b)
934 1
        if "metadata" in link_dict:
935 1
            link.extend_metadata(link_dict.get("metadata"))
936
937 1
        s_vlan = link.get_metadata("s_vlan")
938 1
        if s_vlan:
939 1
            tag = TAG.from_dict(s_vlan)
940 1
            if tag is False:
941
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
942
                raise ValueError(error_msg)
943 1
            link.update_metadata("s_vlan", tag)
944 1
        return link
945
946 1
    def _find_evc_by_schedule_id(self, schedule_id):
947
        """
948
        Find an EVC and CircuitSchedule based on schedule_id.
949
950
        :param schedule_id: Schedule ID
951
        :return: EVC and Schedule
952
        """
953 1
        circuits = self._get_circuits_buffer()
954 1
        found_schedule = None
955 1
        evc = None
956
957
        # pylint: disable=unused-variable
958 1
        for c_id, circuit in circuits.items():
959 1
            for schedule in circuit.circuit_scheduler:
960 1
                if schedule.id == schedule_id:
961 1
                    found_schedule = schedule
962 1
                    evc = circuit
963 1
                    break
964 1
            if found_schedule:
965 1
                break
966 1
        return evc, found_schedule
967
968 1
    def _get_circuits_buffer(self):
969
        """
970
        Return the circuit buffer.
971
972
        If the buffer is empty, try to load data from mongodb.
973
        """
974 1
        if not self.circuits:
975
            # Load circuits from mongodb to buffer
976 1
            circuits = self.mongo_controller.get_circuits()['circuits']
977 1
            for c_id, circuit in circuits.items():
978 1
                evc = self._evc_from_dict(circuit)
979 1
                self.circuits[c_id] = evc
980
        return self.circuits
981