Test Failed
Pull Request — master (#383)
by
unknown
03:32
created

build.main.Main.execute_consistency()   C

Complexity

Conditions 9

Size

Total Lines 26
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 9

Importance

Changes 0
Metric Value
cc 9
eloc 24
nop 1
dl 0
loc 26
ccs 23
cts 23
cp 1
crap 9
rs 6.6666
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from napps.kytos.mef_eline import controllers, settings
22 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
23
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
24 1
                                          Path)
25 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
26
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
27
                                         emit_event, map_evc_event_content)
28
29
30 1
# pylint: disable=too-many-public-methods
31
class Main(KytosNApp):
32
    """Main class of amlight/mef_eline NApp.
33
34
    This class is the entry point for this napp.
35
    """
36 1
37
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
38 1
39
    def setup(self):
40
        """Replace the '__init__' method for the KytosNApp subclass.
41
42
        The setup method is automatically called by the controller when your
43
        application is loaded.
44
45
        So, if you have any setup routine, insert it here.
46
        """
47 1
        # object used to scheduler circuit events
48
        self.sched = Scheduler()
49
50 1
        # object to save and load circuits
51 1
        self.mongo_controller = self.get_eline_controller()
52
        self.mongo_controller.bootstrap_indexes()
53
54 1
        # set the controller that will manager the dynamic paths
55
        DynamicPathManager.set_controller(self.controller)
56
57
        # dictionary of EVCs created. It acts as a circuit buffer.
58 1
        # Every create/update/delete must be synced to mongodb.
59
        self.circuits = {}
60 1
61 1
        self.table_group = {"epl": 0, "evpl": 0}
62 1
        self._lock = Lock()
63
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
64 1
65 1
        self.load_all_evcs()
66
        self._topology_updated_at = None
67 1
68
    def get_evcs_by_svc_level(self) -> list:
69
        """Get circuits sorted by desc service level and asc creation_time.
70
71
        In the future, as more ops are offloaded it should be get from the DB.
72 1
        """
73
        return sorted(self.circuits.values(),
74
                      key=lambda x: (-x.service_level, x.creation_time))
75 1
76 1
    @staticmethod
77
    def get_eline_controller():
78
        """Return the ELineController instance."""
79
        return controllers.ELineController()
80 1
81
    def execute(self):
82 1
        """Execute once when the napp is running."""
83 1
        if self._lock.locked():
84 1
            return
85 1
        log.debug("Starting consistency routine")
86 1
        with self._lock:
87 1
            self.execute_consistency()
88
        log.debug("Finished consistency routine")
89 1
90
    def should_be_checked(self, circuit):
91
        "Verify if the circuit meets the necessary conditions to be checked"
92 1
        # pylint: disable=too-many-boolean-expressions
93
        if (
94
                circuit.is_enabled()
95
                and not circuit.is_active()
96
                and not circuit.lock.locked()
97
                and not circuit.has_recent_removed_flow()
98
                and not circuit.is_recent_updated()
99
                and circuit.are_unis_active(self.controller.switches)
100
                # if a inter-switch EVC does not have current_path, it does not
101
                # make sense to run sdntrace on it
102
                and (circuit.is_intra_switch() or circuit.current_path)
103 1
                ):
104
            return True
105
        return False
106 1
107
    def execute_consistency(self):
108 1
        """Execute consistency routine."""
109 1
        circuits_to_check = []
110 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
111 1
        for circuit in self.get_evcs_by_svc_level():
112 1
            stored_circuits.pop(circuit.id, None)
113 1
            if self.should_be_checked(circuit):
114 1
                circuits_to_check.append(circuit)
115 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
116 1
        for circuit in circuits_to_check:
117 1
            is_checked = circuits_checked.get(circuit.id)
118 1
            if is_checked:
119 1
                circuit.execution_rounds = 0
120 1
                log.info(f"{circuit} enabled but inactive - activating")
121 1
                with circuit.lock:
122 1
                    circuit.activate()
123
                    circuit.sync()
124 1
            else:
125 1
                circuit.execution_rounds += 1
126 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
127 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
128 1
                    with circuit.lock:
129 1
                        circuit.deploy()
130 1
        for circuit_id in stored_circuits:
131 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
132
            self._load_evc(stored_circuits[circuit_id])
133 1
134
    def shutdown(self):
135
        """Execute when your napp is unloaded.
136
137
        If you have some cleanup procedure, insert it here.
138
        """
139 1
140 1
    @rest("/v2/evc/", methods=["GET"])
141
    def list_circuits(self, request: Request) -> JSONResponse:
142
        """Endpoint to return circuits stored.
143
144
        archive query arg if defined (not null) will be filtered
145
        accordingly, by default only non archived evcs will be listed
146 1
        """
147 1
        log.debug("list_circuits /v2/evc")
148 1
        args = request.query_params
149 1
        archived = args.get("archived", "false").lower()
150 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
151
        circuits = self.mongo_controller.get_circuits(archived=archived,
152 1
                                                      metadata=args)
153 1
        circuits = circuits['circuits']
154
        return JSONResponse(circuits)
155 1
156 1
    @rest("/v2/evc/schedule", methods=["GET"])
157
    def list_schedules(self, _request: Request) -> JSONResponse:
158
        """Endpoint to return all schedules stored for all circuits.
159
160
        Return a JSON with the following template:
161
        [{"schedule_id": <schedule_id>,
162
         "circuit_id": <circuit_id>,
163
         "schedule": <schedule object>}]
164 1
        """
165 1
        log.debug("list_schedules /v2/evc/schedule")
166 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
167 1
        if not circuits:
168 1
            result = {}
169 1
            status = 200
170
            return JSONResponse(result, status_code=status)
171 1
172 1
        result = []
173 1
        status = 200
174 1
        for circuit in circuits:
175 1
            circuit_scheduler = circuit.get("circuit_scheduler")
176 1
            if circuit_scheduler:
177 1
                for scheduler in circuit_scheduler:
178
                    value = {
179
                        "schedule_id": scheduler.get("id"),
180
                        "circuit_id": circuit.get("id"),
181
                        "schedule": scheduler,
182 1
                    }
183
                    result.append(value)
184 1
185 1
        log.debug("list_schedules result %s %s", result, status)
186
        return JSONResponse(result, status_code=status)
187 1
188 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
189
    def get_circuit(self, request: Request) -> JSONResponse:
190 1
        """Endpoint to return a circuit based on id."""
191 1
        circuit_id = request.path_params["circuit_id"]
192 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
193 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
194 1
        if not circuit:
195 1
            result = f"circuit_id {circuit_id} not found"
196 1
            log.debug("get_circuit result %s %s", result, 404)
197 1
            raise HTTPException(404, detail=result)
198 1
        status = 200
199 1
        log.debug("get_circuit result %s %s", circuit, status)
200
        return JSONResponse(circuit, status_code=status)
201 1
202 1
    @rest("/v2/evc/", methods=["POST"])
203 1
    @validate_openapi(spec)
204
    def create_circuit(self, request: Request) -> JSONResponse:
205
        """Try to create a new circuit.
206
207
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
208
        UNI_Z's requested C-VID are available from the interfaces' pools. This
209
        is checked when creating the UNI object.
210
211
        Then, E-Line NApp requests a primary and a backup path to the
212
        Pathfinder NApp using the attributes primary_links and backup_links
213
        submitted via REST
214
215
        # For each link composing paths in #3:
216
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
217
        #  - Using the S-VID obtained, generate abstract flow entries to be
218
        #    sent to FlowManager
219
220
        Push abstract flow entries to FlowManager and FlowManager pushes
221
        OpenFlow entries to datapaths
222
223
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
224
        creation
225
226
        Finnaly, notify user of the status of its request.
227
        """
228 1
        # Try to create the circuit object
229 1
        log.debug("create_circuit /v2/evc/")
230
        data = get_json_or_400(request, self.controller.loop)
231 1
232 1
        try:
233 1
            evc = self._evc_from_dict(data)
234 1
        except ValueError as exception:
235 1
            log.debug("create_circuit result %s %s", exception, 400)
236
            raise HTTPException(400, detail=str(exception)) from exception
237 1
238 1
        try:
239 1
            check_disabled_component(evc.uni_a, evc.uni_z)
240 1
        except DisabledSwitch as exception:
241 1
            log.debug("create_circuit result %s %s", exception, 409)
242
            raise HTTPException(
243
                    409,
244
                    detail=f"Path is not valid: {exception}"
245
                ) from exception
246 1
247
        if evc.primary_path:
248
            try:
249
                evc.primary_path.is_valid(
250
                    evc.uni_a.interface.switch,
251
                    evc.uni_z.interface.switch,
252
                    bool(evc.circuit_scheduler),
253
                )
254
            except InvalidPath as exception:
255
                raise HTTPException(
256
                    400,
257
                    detail=f"primary_path is not valid: {exception}"
258 1
                ) from exception
259
        if evc.backup_path:
260
            try:
261
                evc.backup_path.is_valid(
262
                    evc.uni_a.interface.switch,
263
                    evc.uni_z.interface.switch,
264
                    bool(evc.circuit_scheduler),
265
                )
266
            except InvalidPath as exception:
267
                raise HTTPException(
268
                    400,
269
                    detail=f"backup_path is not valid: {exception}"
270
                ) from exception
271
272 1
        # verify duplicated evc
273 1
        if self._is_duplicated_evc(evc):
274 1
            result = "The EVC already exists."
275 1
            log.debug("create_circuit result %s %s", result, 409)
276
            raise HTTPException(409, detail=result)
277 1
278 1
        try:
279 1
            evc._validate_has_primary_or_dynamic()
280 1
        except ValueError as exception:
281
            raise HTTPException(400, detail=str(exception)) from exception
282
283 1
        try:
284 1
            self._use_uni_tags(evc)
285
        except ValueError as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288
        # save circuit
289 1
        try:
290
            evc.sync()
291
        except ValidationError as exception:
292 1
            raise HTTPException(400, detail=str(exception)) from exception
293
294
        # store circuit in dictionary
295 1
        self.circuits[evc.id] = evc
296 1
297 1
        # Schedule the circuit deploy
298
        self.sched.add(evc)
299
300 1
        # Circuit has no schedule, deploy now
301 1
        if not evc.circuit_scheduler:
302 1
            with evc.lock:
303 1
                evc.deploy()
304
305 1
        # Notify users
306
        result = {"circuit_id": evc.id}
307 1
        status = 201
308 1
        log.debug("create_circuit result %s %s", result, status)
309
        emit_event(self.controller, name="created",
310
                   content=map_evc_event_content(evc))
311
        return JSONResponse(result, status_code=status)
312 1
313
    @staticmethod
314 1
    def _use_uni_tags(evc):
315 1
        uni_a = evc.uni_a
316 1
        try:
317 1
            evc._use_uni_vlan(uni_a)
318 1
        except ValueError as err:
319
            raise err
320 1
        try:
321 1
            uni_z = evc.uni_z
322 1
            evc._use_uni_vlan(uni_z)
323
        except ValueError as err:
324
            evc.make_uni_vlan_available(uni_a)
325
            raise err
326
327
    @listen_to('kytos/flow_manager.flow.removed')
328 1
    def on_flow_delete(self, event):
329 1
        """Capture delete messages to keep track when flows got removed."""
330 1
        self.handle_flow_delete(event)
331 1
332 1
    def handle_flow_delete(self, event):
333 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
334 1
        flow = event.content["flow"]
335 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
336 1
        if evc:
337
            log.debug("Flow removed in EVC %s", evc.id)
338 1
            evc.set_flow_removed_at()
339 1
340 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
341 1
    @validate_openapi(spec)
342
    def update(self, request: Request) -> JSONResponse:
343 1
        """Update a circuit based on payload.
344 1
345
        The EVC attributes (creation_time, active, current_path,
346
        failover_path, _id, archived) can't be updated.
347 1
        """
348
        data = get_json_or_400(request, self.controller.loop)
349 1
        circuit_id = request.path_params["circuit_id"]
350 1
        log.debug("update /v2/evc/%s", circuit_id)
351 1
        try:
352 1
            evc = self.circuits[circuit_id]
353 1
        except KeyError:
354 1
            result = f"circuit_id {circuit_id} not found"
355 1
            log.debug("update result %s %s", result, 404)
356
            raise HTTPException(404, detail=result) from KeyError
357
358
        if evc.archived:
359
            result = "Can't update archived EVC"
360 1
            log.debug("update result %s %s", result, 409)
361
            raise HTTPException(409, detail=result)
362
363
        try:
364
            enable, redeploy = evc.update(
365
                **self._evc_dict_with_instances(data)
366
            )
367
        except ValidationError as exception:
368
            raise HTTPException(400, detail=str(exception)) from exception
369 1
        except ValueError as exception:
370 1
            log.error(exception)
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except DisabledSwitch as exception:
374
            log.debug("update result %s %s", exception, 409)
375 1
            raise HTTPException(
376 1
                    409,
377
                    detail=f"Path is not valid: {exception}"
378 1
                ) from exception
379
380 1
        if evc.is_active():
381 1
            if enable is False:  # disable if active
382
                with evc.lock:
383
                    evc.remove()
384
            elif redeploy is not None:  # redeploy if active
385
                with evc.lock:
386
                    evc.remove()
387 1
                    evc.deploy()
388 1
        else:
389 1
            if enable is True:  # enable if inactive
390 1
                with evc.lock:
391 1
                    evc.deploy()
392 1
        result = {evc.id: evc.as_dict()}
393 1
        status = 200
394 1
395
        log.debug("update result %s %s", result, status)
396 1
        emit_event(self.controller, "updated",
397 1
                   content=map_evc_event_content(evc, **data))
398 1
        return JSONResponse(result, status_code=status)
399 1
400
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
401 1
    def delete_circuit(self, request: Request) -> JSONResponse:
402 1
        """Remove a circuit.
403 1
404 1
        First, the flows are removed from the switches, and then the EVC is
405 1
        disabled.
406 1
        """
407 1
        circuit_id = request.path_params["circuit_id"]
408 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
409 1
        try:
410 1
            evc = self.circuits[circuit_id]
411 1
        except KeyError:
412 1
            result = f"circuit_id {circuit_id} not found"
413
            log.debug("delete_circuit result %s %s", result, 404)
414 1
            raise HTTPException(404, detail=result) from KeyError
415 1
416
        if evc.archived:
417 1
            result = f"Circuit {circuit_id} already removed"
418
            log.debug("delete_circuit result %s %s", result, 404)
419 1
            raise HTTPException(404, detail=result)
420 1
421
        log.info("Removing %s", evc)
422 1
        with evc.lock:
423 1
            evc.remove_current_flows()
424 1
            evc.remove_failover_flows(sync=False)
425
            evc.deactivate()
426
            evc.disable()
427
            self.sched.remove(evc)
428
            evc.archive()
429
            evc.remove_uni_tags()
430
            evc.sync()
431
        log.info("EVC removed. %s", evc)
432
        result = {"response": f"Circuit {circuit_id} removed"}
433 1
        status = 200
434 1
435 1
        log.debug("delete_circuit result %s %s", result, status)
436
        emit_event(self.controller, "deleted",
437 1
                   content=map_evc_event_content(evc))
438 1
        return JSONResponse(result, status_code=status)
439
440 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
441
    def get_metadata(self, request: Request) -> JSONResponse:
442 1
        """Get metadata from an EVC."""
443 1
        circuit_id = request.path_params["circuit_id"]
444 1
        try:
445 1
            return (
446 1
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
447 1
            )
448 1
        except KeyError as error:
449
            raise HTTPException(
450 1
                404,
451 1
                detail=f"circuit_id {circuit_id} not found."
452 1
            ) from error
453
454 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
455 1
    @validate_openapi(spec)
456 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
457
        """Add metadata to a bulk of EVCs."""
458 1
        data = get_json_or_400(request, self.controller.loop)
459 1
        circuit_ids = data.pop("circuit_ids")
460 1
461
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
462 1
463 1
        fail_evcs = []
464 1
        for _id in circuit_ids:
465 1
            try:
466
                evc = self.circuits[_id]
467
                evc.extend_metadata(data)
468
            except KeyError:
469
                fail_evcs.append(_id)
470 1
471 1
        if fail_evcs:
472 1
            raise HTTPException(404, detail=fail_evcs)
473
        return JSONResponse("Operation successful", status_code=201)
474 1
475 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
476 1
    @validate_openapi(spec)
477
    def add_metadata(self, request: Request) -> JSONResponse:
478 1
        """Add metadata to an EVC."""
479 1
        circuit_id = request.path_params["circuit_id"]
480 1
        metadata = get_json_or_400(request, self.controller.loop)
481 1
        if not isinstance(metadata, dict):
482
            raise HTTPException(400, "Invalid metadata value: {metadata}")
483 1
        try:
484 1
            evc = self.circuits[circuit_id]
485 1
        except KeyError as error:
486 1
            raise HTTPException(
487 1
                404,
488
                detail=f"circuit_id {circuit_id} not found."
489
            ) from error
490
491 1
        evc.extend_metadata(metadata)
492
        evc.sync()
493 1
        return JSONResponse("Operation successful", status_code=201)
494
495 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
496 1
    @validate_openapi(spec)
497
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
498 1
        """Delete metada from a bulk of EVCs"""
499 1
        data = get_json_or_400(request, self.controller.loop)
500 1
        key = request.path_params["key"]
501 1
        circuit_ids = data.pop("circuit_ids")
502 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
503 1
504
        fail_evcs = []
505
        for _id in circuit_ids:
506
            try:
507
                evc = self.circuits[_id]
508 1
                evc.remove_metadata(key)
509 1
            except KeyError:
510 1
                fail_evcs.append(_id)
511
512 1
        if fail_evcs:
513 1
            raise HTTPException(404, detail=fail_evcs)
514
        return JSONResponse("Operation successful")
515 1
516 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
517 1
    def delete_metadata(self, request: Request) -> JSONResponse:
518 1
        """Delete metadata from an EVC."""
519 1
        circuit_id = request.path_params["circuit_id"]
520 1
        key = request.path_params["key"]
521
        try:
522
            evc = self.circuits[circuit_id]
523
        except KeyError as error:
524 1
            raise HTTPException(
525 1
                404,
526 1
                detail=f"circuit_id {circuit_id} not found."
527 1
            ) from error
528 1
529 1
        evc.remove_metadata(key)
530
        evc.sync()
531 1
        return JSONResponse("Operation successful")
532 1
533
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
534 1
    def redeploy(self, request: Request) -> JSONResponse:
535
        """Endpoint to force the redeployment of an EVC."""
536 1
        circuit_id = request.path_params["circuit_id"]
537 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
538 1
        try:
539
            evc = self.circuits[circuit_id]
540
        except KeyError:
541
            raise HTTPException(
542
                404,
543
                detail=f"circuit_id {circuit_id} not found"
544
            ) from KeyError
545
        if evc.is_enabled():
546
            with evc.lock:
547
                evc.remove_current_flows()
548
                evc.deploy()
549
            result = {"response": f"Circuit {circuit_id} redeploy received."}
550
            status = 202
551
        else:
552
            result = {"response": f"Circuit {circuit_id} is disabled."}
553
            status = 409
554 1
555 1
        return JSONResponse(result, status_code=status)
556 1
557 1
    @rest("/v2/evc/schedule/", methods=["POST"])
558
    @validate_openapi(spec)
559
    def create_schedule(self, request: Request) -> JSONResponse:
560 1
        """
561
        Create a new schedule for a given circuit.
562
563 1
        This service do no check if there are conflicts with another schedule.
564
        Payload example:
565
            {
566 1
              "circuit_id":"aa:bb:cc",
567 1
              "schedule": {
568 1
                "date": "2019-08-07T14:52:10.967Z",
569 1
                "interval": "string",
570
                "frequency": "1 * * * *",
571 1
                "action": "create"
572 1
              }
573 1
            }
574 1
        """
575
        log.debug("create_schedule /v2/evc/schedule/")
576
        data = get_json_or_400(request, self.controller.loop)
577 1
        circuit_id = data["circuit_id"]
578
        schedule_data = data["schedule"]
579
580 1
        # Get EVC from circuits buffer
581 1
        circuits = self._get_circuits_buffer()
582
583
        # get the circuit
584 1
        evc = circuits.get(circuit_id)
585
586
        # get the circuit
587 1
        if not evc:
588
            result = f"circuit_id {circuit_id} not found"
589
            log.debug("create_schedule result %s %s", result, 404)
590 1
            raise HTTPException(404, detail=result)
591
        # Can not modify circuits deleted and archived
592 1
        if evc.archived:
593 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
594
            log.debug("create_schedule result %s %s", result, 409)
595 1
            raise HTTPException(409, detail=result)
596 1
597
        # new schedule from dict
598 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
599 1
600 1
        # If there is no schedule, create the list
601
        if not evc.circuit_scheduler:
602
            evc.circuit_scheduler = []
603
604
        # Add the new schedule
605
        evc.circuit_scheduler.append(new_schedule)
606
607
        # Add schedule job
608
        self.sched.add_circuit_job(evc, new_schedule)
609
610
        # save circuit to mongodb
611
        evc.sync()
612
613 1
        result = new_schedule.as_dict()
614 1
        status = 201
615 1
616
        log.debug("create_schedule result %s %s", result, status)
617
        return JSONResponse(result, status_code=status)
618 1
619
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
620
    @validate_openapi(spec)
621 1
    def update_schedule(self, request: Request) -> JSONResponse:
622 1
        """Update a schedule.
623 1
624 1
        Change all attributes from the given schedule from a EVC circuit.
625 1
        The schedule ID is preserved as default.
626 1
        Payload example:
627 1
            {
628 1
              "date": "2019-08-07T14:52:10.967Z",
629
              "interval": "string",
630 1
              "frequency": "1 * * *",
631 1
              "action": "create"
632
            }
633 1
        """
634
        data = get_json_or_400(request, self.controller.loop)
635 1
        schedule_id = request.path_params["schedule_id"]
636
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
637
638 1
        # Try to find a circuit schedule
639
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
640 1
641
        # Can not modify circuits deleted and archived
642 1
        if not found_schedule:
643
            result = f"schedule_id {schedule_id} not found"
644 1
            log.debug("update_schedule result %s %s", result, 404)
645 1
            raise HTTPException(404, detail=result)
646
        if evc.archived:
647 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
648 1
            log.debug("update_schedule result %s %s", result, 409)
649
            raise HTTPException(409, detail=result)
650 1
651 1
        new_schedule = CircuitSchedule.from_dict(data)
652
        new_schedule.id = found_schedule.id
653
        # Remove the old schedule
654
        evc.circuit_scheduler.remove(found_schedule)
655
        # Append the modified schedule
656
        evc.circuit_scheduler.append(new_schedule)
657
658 1
        # Cancel all schedule jobs
659 1
        self.sched.cancel_job(found_schedule.id)
660 1
        # Add the new circuit schedule
661
        self.sched.add_circuit_job(evc, new_schedule)
662
        # Save EVC to mongodb
663 1
        evc.sync()
664 1
665 1
        result = new_schedule.as_dict()
666 1
        status = 200
667
668 1
        log.debug("update_schedule result %s %s", result, status)
669 1
        return JSONResponse(result, status_code=status)
670 1
671 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
672
    def delete_schedule(self, request: Request) -> JSONResponse:
673
        """Remove a circuit schedule.
674 1
675
        Remove the Schedule from EVC.
676
        Remove the Schedule from cron job.
677 1
        Save the EVC to the Storehouse.
678
        """
679 1
        schedule_id = request.path_params["schedule_id"]
680
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
681 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
682 1
683
        # Can not modify circuits deleted and archived
684 1
        if not found_schedule:
685 1
            result = f"schedule_id {schedule_id} not found"
686
            log.debug("delete_schedule result %s %s", result, 404)
687 1
            raise HTTPException(404, detail=result)
688
689
        if evc.archived:
690
            result = f"Circuit {evc.id} is archived. Update is forbidden."
691
            log.debug("delete_schedule result %s %s", result, 409)
692
            raise HTTPException(409, detail=result)
693
694
        # Remove the old schedule
695
        evc.circuit_scheduler.remove(found_schedule)
696
697 1
        # Cancel all schedule jobs
698 1
        self.sched.cancel_job(found_schedule.id)
699 1
        # Save EVC to mongodb
700 1
        evc.sync()
701
702 1
        result = "Schedule removed"
703 1
        status = 200
704
705
        log.debug("delete_schedule result %s %s", result, status)
706
        return JSONResponse(result, status_code=status)
707 1
708
    def _is_duplicated_evc(self, evc):
709 1
        """Verify if the circuit given is duplicated with the stored evcs.
710 1
711 1
        Args:
712 1
            evc (EVC): circuit to be analysed.
713 1
714
        Returns:
715 1
            boolean: True if the circuit is duplicated, otherwise False.
716 1
717
        """
718
        for circuit in tuple(self.circuits.values()):
719
            if not circuit.archived and circuit.shares_uni(evc):
720 1
                return True
721
        return False
722 1
723 1
    @listen_to("kytos/topology.link_up")
724
    def on_link_up(self, event):
725
        """Change circuit when link is up or end_maintenance."""
726
        self.handle_link_up(event)
727
728 1
    def handle_link_up(self, event):
729 1
        """Change circuit when link is up or end_maintenance."""
730 1
        log.info("Event handle_link_up %s", event.content["link"])
731 1
        for evc in self.get_evcs_by_svc_level():
732 1
            if evc.is_enabled() and not evc.archived:
733
                with evc.lock:
734
                    evc.handle_link_up(event.content["link"])
735
736 1
    # Possibly replace this with interruptions?
737 1
    @listen_to(
738
        '.*.switch.interface.(link_up|link_down)',
739
        pool='dynamic_single'
740
    )
741 1
    def on_interface_link_change(self, event: KytosEvent):
742
        """
743 1
        Handler for interface link_up and link_down events
744 1
        """
745 1
        with self._lock:
746 1
            _, _, event_type = event.name.rpartition('.')
747 1
            iface = event.content.get("interface")
748 1
            if event_type in ('link_up',):
749 1
                self.handle_interface_link_up(iface)
750 1
            elif event_type in ('link_down', 'deleted'):
751
                self.handle_interface_link_down(iface)
752
753 1
    def handle_interface_link_up(self, interface):
754
        """
755
        Handler for interface link_up events
756
        """
757 1
        for evc in self.get_evcs_by_svc_level():
758 1
            log.info("Event handle_interface_link_up %s", interface)
759 1
            evc.handle_interface_link_up(
760 1
                interface
761
            )
762 1
763 1
    def handle_interface_link_down(self, interface):
764 1
        """
765
        Handler for interface link_down events
766
        """
767 1
        for evc in self.get_evcs_by_svc_level():
768 1
            log.info("Event handle_interface_link_down %s", interface)
769 1
            evc.handle_interface_link_down(
770 1
                interface
771 1
            )
772 1
773
    @listen_to("kytos/topology.updated")
774 1
    def on_topology_update(self, event):
775
        """Capture topology update event"""
776 1
        self.handle_topology_update(event)
777 1
778 1
    def handle_topology_update(self, event):
779 1
        """Handle topology update"""
780 1
        with self._lock:
781
            if (
782
                self._topology_updated_at
783
                and self._topology_updated_at > event.timestamp
784
            ):
785
                return
786
            self._topology_updated_at = event.timestamp
787
            for evc in self.get_evcs_by_svc_level():
788
                if evc.is_enabled() and not evc.archived:
789 1
                    with evc.lock:
790 1
                        evc.handle_topology_update(
791 1
                            event.content["topology"].switches
792 1
                        )
793 1
794
    @listen_to("kytos/topology.link_down")
795 1
    def on_link_down(self, event):
796 1
        """Change circuit when link is down or under_mantenance."""
797 1
        self.handle_link_down(event)
798 1
799 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
800 1
        """Change circuit when link is down or under_mantenance."""
801 1
        link = event.content["link"]
802
        log.info("Event handle_link_down %s", link)
803 1
        switch_flows = {}
804
        evcs_with_failover = []
805
        evcs_normal = []
806
        check_failover = []
807 1
        for evc in self.get_evcs_by_svc_level():
808 1
            if evc.is_affected_by_link(link):
809
                # if there is no failover path, handles link down the
810
                # tradditional way
811
                if (
812
                    not getattr(evc, 'failover_path', None) or
813
                    evc.is_failover_path_affected_by_link(link)
814
                ):
815
                    evcs_normal.append(evc)
816
                    continue
817
                try:
818 1
                    dpid_flows = evc.get_failover_flows()
819 1
                # pylint: disable=broad-except
820 1
                except Exception:
821 1
                    err = traceback.format_exc().replace("\n", ", ")
822
                    log.error(
823 1
                        f"Ignore Failover path for {evc} due to error: {err}"
824 1
                    )
825
                    evcs_normal.append(evc)
826
                    continue
827
                for dpid, flows in dpid_flows.items():
828 1
                    switch_flows.setdefault(dpid, [])
829
                    switch_flows[dpid].extend(flows)
830 1
                evcs_with_failover.append(evc)
831 1
            else:
832 1
                check_failover.append(evc)
833 1
834 1
        while switch_flows:
835 1
            offset = settings.BATCH_SIZE or None
836 1
            switches = list(switch_flows.keys())
837 1
            for dpid in switches:
838 1
                emit_event(
839 1
                    self.controller,
840 1
                    context="kytos.flow_manager",
841
                    name="flows.install",
842
                    content={
843 1
                        "dpid": dpid,
844 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
845
                    }
846
                )
847
                if offset is None or offset >= len(switch_flows[dpid]):
848 1
                    del switch_flows[dpid]
849
                    continue
850 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
851 1
            time.sleep(settings.BATCH_INTERVAL)
852 1
853 1
        for evc in evcs_with_failover:
854 1
            with evc.lock:
855
                old_path = evc.current_path
856 1
                evc.current_path = evc.failover_path
857 1
                evc.failover_path = old_path
858
                evc.sync()
859
            emit_event(self.controller, "redeployed_link_down",
860
                       content=map_evc_event_content(evc))
861 1
            log.info(
862
                f"{evc} redeployed with failover due to link down {link.id}"
863 1
            )
864 1
865 1
        for evc in evcs_normal:
866 1
            emit_event(
867
                self.controller,
868 1
                "evc_affected_by_link_down",
869
                content={"link_id": link.id} | map_evc_event_content(evc)
870 1
            )
871 1
872 1
        # After handling the hot path, check if new failover paths are needed.
873 1
        # Note that EVCs affected by link down will generate a KytosEvent for
874
        # deployed|redeployed, which will trigger the failover path setup.
875
        # Thus, we just need to further check the check_failover list
876 1
        for evc in check_failover:
877
            if evc.is_failover_path_affected_by_link(link):
878 1
                with evc.lock:
879 1
                    evc.setup_failover_path()
880 1
881 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
882 1
    def on_evc_affected_by_link_down(self, event):
883 1
        """Change circuit when link is down or under_mantenance."""
884 1
        self.handle_evc_affected_by_link_down(event)
885
886 1
    def handle_evc_affected_by_link_down(self, event):
887 1
        """Change circuit when link is down or under_mantenance."""
888
        evc = self.circuits.get(event.content["evc_id"])
889
        link_id = event.content['link_id']
890
        if not evc:
891 1
            return
892
        with evc.lock:
893 1
            result = evc.handle_link_down()
894 1
        event_name = "error_redeploy_link_down"
895 1
        if result:
896
            log.info(f"{evc} redeployed due to link down {link_id}")
897 1
            event_name = "redeployed_link_down"
898 1
        emit_event(self.controller, event_name,
899 1
                   content=map_evc_event_content(evc))
900 1
901
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
902 1
    def on_evc_deployed(self, event):
903
        """Handle EVC deployed|redeployed_link_down."""
904
        self.handle_evc_deployed(event)
905
906
    def handle_evc_deployed(self, event):
907 1
        """Setup failover path on evc deployed."""
908 1
        evc = self.circuits.get(event.content["evc_id"])
909
        if not evc:
910
            return
911 1
        with evc.lock:
912 1
            evc.setup_failover_path()
913 1
914 1
    @listen_to("kytos/topology.topology_loaded")
915 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
916 1
        """Load EVCs once the topology is available."""
917
        self.load_all_evcs()
918 1
919 1
    def load_all_evcs(self):
920 1
        """Try to load all EVCs on startup."""
921 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
922
        for circuit_id, circuit in circuits:
923
            if circuit_id not in self.circuits:
924
                self._load_evc(circuit)
925
926
    def _load_evc(self, circuit_dict):
927
        """Load one EVC from mongodb to memory."""
928
        try:
929 1
            evc = self._evc_from_dict(circuit_dict)
930 1
        except ValueError as exception:
931
            log.error(
932
                f"Could not load EVC: dict={circuit_dict} error={exception}"
933
            )
934
            return None
935
936
        if evc.archived:
937 1
            return None
938 1
939
        self.circuits.setdefault(evc.id, evc)
940
        self.sched.add(evc)
941
        return evc
942 1
943
    @listen_to("kytos/flow_manager.flow.error")
944 1
    def on_flow_mod_error(self, event):
945 1
        """Handle flow mod errors related to an EVC."""
946 1
        self.handle_flow_mod_error(event)
947 1
948
    def handle_flow_mod_error(self, event):
949 1
        """Handle flow mod errors related to an EVC."""
950
        flow = event.content["flow"]
951 1
        command = event.content.get("error_command")
952 1
        if command != "add":
953
            return
954 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
955 1
        if evc:
956 1
            with evc.lock:
957 1
                evc.remove_current_flows()
958
959
    def _evc_dict_with_instances(self, evc_dict):
960
        """Convert some dict values to instance of EVC classes.
961 1
962
        This method will convert: [UNI, Link]
963 1
        """
964 1
        data = evc_dict.copy()  # Do not modify the original dict
965 1
        for attribute, value in data.items():
966
            # Get multiple attributes.
967 1
            # Ex: uni_a, uni_z
968 1
            if "uni" in attribute:
969
                try:
970 1
                    data[attribute] = self._uni_from_dict(value)
971
                except ValueError as exception:
972 1
                    result = "Error creating UNI: Invalid value"
973
                    raise ValueError(result) from exception
974 1
975 1
            if attribute == "circuit_scheduler":
976
                data[attribute] = []
977 1
                for schedule in value:
978 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
979 1
980 1
            # Get multiple attributes.
981 1
            # Ex: primary_links,
982 1
            #     backup_links,
983
            #     current_links_cache,
984
            #     primary_links_cache,
985
            #     backup_links_cache
986 1
            if "links" in attribute:
987 1
                data[attribute] = [
988 1
                    self._link_from_dict(link) for link in value
989
                ]
990 1
991 1
            # Ex: current_path,
992 1
            #     primary_path,
993 1
            #     backup_path
994
            if "path" in attribute and attribute != "dynamic_backup_path":
995
                data[attribute] = Path(
996 1
                    [self._link_from_dict(link) for link in value]
997 1
                )
998
999 1
        return data
1000
1001
    def _evc_from_dict(self, evc_dict):
1002
        data = self._evc_dict_with_instances(evc_dict)
1003
        data["table_group"] = self.table_group
1004
        return EVC(self.controller, **data)
1005
1006 1
    def _uni_from_dict(self, uni_dict):
1007 1
        """Return a UNI object from python dict."""
1008 1
        if uni_dict is None:
1009
            return False
1010
1011 1
        interface_id = uni_dict.get("interface_id")
1012 1
        interface = self.controller.get_interface_by_id(interface_id)
1013 1
        if interface is None:
1014 1
            result = (
1015 1
                "Error creating UNI:"
1016 1
                + f"Could not instantiate interface {interface_id}"
1017 1
            )
1018 1
            raise ValueError(result) from ValueError
1019 1
        tag_convert = {1: "vlan"}
1020
        tag_dict = uni_dict.get("tag", None)
1021 1
        if tag_dict:
1022
            tag_type = tag_dict.get("tag_type")
1023
            tag_type = tag_convert.get(tag_type, tag_type)
1024
            tag_value = tag_dict.get("value")
1025
            tag = TAG(tag_type, tag_value)
1026
        else:
1027 1
            tag = None
1028
        uni = UNI(interface, tag)
1029 1
1030 1
        return uni
1031 1
1032 1
    def _link_from_dict(self, link_dict):
1033 1
        """Return a Link object from python dict."""
1034
        id_a = link_dict.get("endpoint_a").get("id")
1035
        id_b = link_dict.get("endpoint_b").get("id")
1036 1
1037 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1038
        endpoint_b = self.controller.get_interface_by_id(id_b)
1039 1
        if not endpoint_a:
1040 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1041
            raise ValueError(error_msg)
1042 1
        if not endpoint_b:
1043 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1044 1
            raise ValueError(error_msg)
1045
1046
        link = Link(endpoint_a, endpoint_b)
1047 1
        if "metadata" in link_dict:
1048 1
            link.extend_metadata(link_dict.get("metadata"))
1049 1
1050 1
        s_vlan = link.get_metadata("s_vlan")
1051 1
        if s_vlan:
1052
            tag = TAG.from_dict(s_vlan)
1053
            if tag is False:
1054
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1055
                raise ValueError(error_msg)
1056
            link.update_metadata("s_vlan", tag)
1057
        return link
1058
1059
    def _find_evc_by_schedule_id(self, schedule_id):
1060
        """
1061
        Find an EVC and CircuitSchedule based on schedule_id.
1062
1063
        :param schedule_id: Schedule ID
1064
        :return: EVC and Schedule
1065
        """
1066
        circuits = self._get_circuits_buffer()
1067
        found_schedule = None
1068
        evc = None
1069
1070
        # pylint: disable=unused-variable
1071
        for c_id, circuit in circuits.items():
1072
            for schedule in circuit.circuit_scheduler:
1073
                if schedule.id == schedule_id:
1074
                    found_schedule = schedule
1075
                    evc = circuit
1076
                    break
1077
            if found_schedule:
1078
                break
1079
        return evc, found_schedule
1080
1081
    def _get_circuits_buffer(self):
1082
        """
1083
        Return the circuit buffer.
1084
1085
        If the buffer is empty, try to load data from mongodb.
1086
        """
1087
        if not self.circuits:
1088
            # Load circuits from mongodb to buffer
1089
            circuits = self.mongo_controller.get_circuits()['circuits']
1090
            for c_id, circuit in circuits.items():
1091
                evc = self._evc_from_dict(circuit)
1092
                self.circuits[c_id] = evc
1093
        return self.circuits
1094
1095
    # pylint: disable=attribute-defined-outside-init
1096
    @alisten_to("kytos/of_multi_table.enable_table")
1097
    async def on_table_enabled(self, event):
1098
        """Handle a recently table enabled."""
1099
        table_group = event.content.get("mef_eline", None)
1100
        if not table_group:
1101
            return
1102
        for group in table_group:
1103
            if group not in settings.TABLE_GROUP_ALLOWED:
1104
                log.error(f'The table group "{group}" is not allowed for '
1105
                          f'mef_eline. Allowed table groups are '
1106
                          f'{settings.TABLE_GROUP_ALLOWED}')
1107
                return
1108
        self.table_group.update(table_group)
1109
        content = {"group_table": self.table_group}
1110
        name = "kytos/mef_eline.enable_table"
1111
        await aemit_event(self.controller, name, content)
1112