Passed
Pull Request — master (#371)
by
unknown
03:29
created

build.main.Main.execute_consistency()   C

Complexity

Conditions 9

Size

Total Lines 26
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 24
CRAP Score 9

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 26
ccs 24
cts 24
cp 1
rs 6.6666
c 0
b 0
f 0
cc 9
nop 1
crap 9
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
from threading import Lock
9
10 1
from pydantic import ValidationError
11
12 1
from kytos.core import KytosNApp, log, rest
13 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
14
                                validate_openapi)
15 1
from kytos.core.interface import TAG, UNI
16 1
from kytos.core.link import Link
17 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
18
                                 get_json_or_400)
19 1
from napps.kytos.mef_eline import controllers, settings
20 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
21 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
22
                                          Path)
23 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
24 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
25
                                         emit_event, map_evc_event_content)
26
27
28
# pylint: disable=too-many-public-methods
29 1
class Main(KytosNApp):
30
    """Main class of amlight/mef_eline NApp.
31
32
    This class is the entry point for this napp.
33
    """
34
35 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
36
37 1
    def setup(self):
38
        """Replace the '__init__' method for the KytosNApp subclass.
39
40
        The setup method is automatically called by the controller when your
41
        application is loaded.
42
43
        So, if you have any setup routine, insert it here.
44
        """
45
        # object used to scheduler circuit events
46 1
        self.sched = Scheduler()
47
48
        # object to save and load circuits
49 1
        self.mongo_controller = self.get_eline_controller()
50 1
        self.mongo_controller.bootstrap_indexes()
51
52
        # set the controller that will manager the dynamic paths
53 1
        DynamicPathManager.set_controller(self.controller)
54
55
        # dictionary of EVCs created. It acts as a circuit buffer.
56
        # Every create/update/delete must be synced to mongodb.
57 1
        self.circuits = {}
58
59 1
        self.table_group = {"epl": 0, "evpl": 0}
60 1
        self._lock = Lock()
61 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
62
63 1
        self.load_all_evcs()
64
65 1
    def get_evcs_by_svc_level(self) -> list:
66
        """Get circuits sorted by desc service level and asc creation_time.
67
68
        In the future, as more ops are offloaded it should be get from the DB.
69
        """
70 1
        return sorted(self.circuits.values(),
71
                      key=lambda x: (-x.service_level, x.creation_time))
72
73 1
    @staticmethod
74 1
    def get_eline_controller():
75
        """Return the ELineController instance."""
76
        return controllers.ELineController()
77
78 1
    def execute(self):
79
        """Execute once when the napp is running."""
80 1
        if self._lock.locked():
81 1
            return
82 1
        log.debug("Starting consistency routine")
83 1
        with self._lock:
84 1
            self.execute_consistency()
85 1
        log.debug("Finished consistency routine")
86
87 1
    @staticmethod
88 1
    def should_be_checked(circuit):
89
        "Verify if the circuit meets the necessary conditions to be checked"
90
        # pylint: disable=too-many-boolean-expressions
91 1
        if (
92
                circuit.is_enabled()
93
                and not circuit.is_active()
94
                and not circuit.lock.locked()
95
                and not circuit.has_recent_removed_flow()
96
                and not circuit.is_recent_updated()
97
                # if a inter-switch EVC does not have current_path, it does not
98
                # make sense to run sdntrace on it
99
                and (circuit.is_intra_switch() or circuit.current_path)
100
                ):
101 1
            return True
102
        return False
103
104 1
    def execute_consistency(self):
105
        """Execute consistency routine."""
106 1
        circuits_to_check = []
107 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
108 1
        for circuit in self.get_evcs_by_svc_level():
109 1
            stored_circuits.pop(circuit.id, None)
110 1
            if self.should_be_checked(circuit):
111 1
                circuits_to_check.append(circuit)
112 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
113 1
        for circuit in circuits_to_check:
114 1
            is_checked = circuits_checked.get(circuit.id)
115 1
            if is_checked:
116 1
                circuit.execution_rounds = 0
117 1
                log.info(f"{circuit} enabled but inactive - activating")
118 1
                with circuit.lock:
119 1
                    circuit.activate()
120 1
                    circuit.sync()
121
            else:
122 1
                circuit.execution_rounds += 1
123 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
124 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
125 1
                    with circuit.lock:
126 1
                        circuit.deploy()
127 1
        for circuit_id in stored_circuits:
128 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
129 1
            self._load_evc(stored_circuits[circuit_id])
130
131 1
    def shutdown(self):
132
        """Execute when your napp is unloaded.
133
134
        If you have some cleanup procedure, insert it here.
135
        """
136
137 1
    @rest("/v2/evc/", methods=["GET"])
138 1
    def list_circuits(self, request: Request) -> JSONResponse:
139
        """Endpoint to return circuits stored.
140
141
        archive query arg if defined (not null) will be filtered
142
        accordingly, by default only non archived evcs will be listed
143
        """
144 1
        log.debug("list_circuits /v2/evc")
145 1
        args = request.query_params
146 1
        archived = args.get("archived", "false").lower()
147 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
148 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
149
                                                      metadata=args)
150 1
        circuits = circuits['circuits']
151 1
        return JSONResponse(circuits)
152
153 1
    @rest("/v2/evc/schedule", methods=["GET"])
154 1
    def list_schedules(self, _request: Request) -> JSONResponse:
155
        """Endpoint to return all schedules stored for all circuits.
156
157
        Return a JSON with the following template:
158
        [{"schedule_id": <schedule_id>,
159
         "circuit_id": <circuit_id>,
160
         "schedule": <schedule object>}]
161
        """
162 1
        log.debug("list_schedules /v2/evc/schedule")
163 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
164 1
        if not circuits:
165 1
            result = {}
166 1
            status = 200
167 1
            return JSONResponse(result, status_code=status)
168
169 1
        result = []
170 1
        status = 200
171 1
        for circuit in circuits:
172 1
            circuit_scheduler = circuit.get("circuit_scheduler")
173 1
            if circuit_scheduler:
174 1
                for scheduler in circuit_scheduler:
175 1
                    value = {
176
                        "schedule_id": scheduler.get("id"),
177
                        "circuit_id": circuit.get("id"),
178
                        "schedule": scheduler,
179
                    }
180 1
                    result.append(value)
181
182 1
        log.debug("list_schedules result %s %s", result, status)
183 1
        return JSONResponse(result, status_code=status)
184
185 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
186 1
    def get_circuit(self, request: Request) -> JSONResponse:
187
        """Endpoint to return a circuit based on id."""
188 1
        circuit_id = request.path_params["circuit_id"]
189 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
190 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
191 1
        if not circuit:
192 1
            result = f"circuit_id {circuit_id} not found"
193 1
            log.debug("get_circuit result %s %s", result, 404)
194 1
            raise HTTPException(404, detail=result)
195 1
        status = 200
196 1
        log.debug("get_circuit result %s %s", circuit, status)
197 1
        return JSONResponse(circuit, status_code=status)
198
199 1
    @rest("/v2/evc/", methods=["POST"])
200 1
    @validate_openapi(spec)
201 1
    def create_circuit(self, request: Request) -> JSONResponse:
202
        """Try to create a new circuit.
203
204
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
205
        UNI_Z's requested C-VID are available from the interfaces' pools. This
206
        is checked when creating the UNI object.
207
208
        Then, E-Line NApp requests a primary and a backup path to the
209
        Pathfinder NApp using the attributes primary_links and backup_links
210
        submitted via REST
211
212
        # For each link composing paths in #3:
213
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
214
        #  - Using the S-VID obtained, generate abstract flow entries to be
215
        #    sent to FlowManager
216
217
        Push abstract flow entries to FlowManager and FlowManager pushes
218
        OpenFlow entries to datapaths
219
220
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
221
        creation
222
223
        Finnaly, notify user of the status of its request.
224
        """
225
        # Try to create the circuit object
226 1
        log.debug("create_circuit /v2/evc/")
227 1
        data = get_json_or_400(request, self.controller.loop)
228
229 1
        try:
230 1
            evc = self._evc_from_dict(data)
231 1
        except ValueError as exception:
232 1
            log.debug("create_circuit result %s %s", exception, 400)
233 1
            raise HTTPException(400, detail=str(exception)) from exception
234
235 1
        try:
236 1
            check_disabled_component(evc.uni_a, evc.uni_z)
237 1
        except DisabledSwitch as exception:
238 1
            log.debug("create_circuit result %s %s", exception, 409)
239 1
            raise HTTPException(
240
                    409,
241
                    detail=f"Path is not valid: {exception}"
242
                ) from exception
243
244 1
        if evc.primary_path:
245
            try:
246
                evc.primary_path.is_valid(
247
                    evc.uni_a.interface.switch,
248
                    evc.uni_z.interface.switch,
249
                    bool(evc.circuit_scheduler),
250
                )
251
            except InvalidPath as exception:
252
                raise HTTPException(
253
                    400,
254
                    detail=f"primary_path is not valid: {exception}"
255
                ) from exception
256 1
        if evc.backup_path:
257
            try:
258
                evc.backup_path.is_valid(
259
                    evc.uni_a.interface.switch,
260
                    evc.uni_z.interface.switch,
261
                    bool(evc.circuit_scheduler),
262
                )
263
            except InvalidPath as exception:
264
                raise HTTPException(
265
                    400,
266
                    detail=f"backup_path is not valid: {exception}"
267
                ) from exception
268
269
        # verify duplicated evc
270 1
        if self._is_duplicated_evc(evc):
271 1
            result = "The EVC already exists."
272 1
            log.debug("create_circuit result %s %s", result, 409)
273 1
            raise HTTPException(409, detail=result)
274
275 1
        try:
276 1
            evc._validate_has_primary_or_dynamic()
277 1
        except ValueError as exception:
278 1
            raise HTTPException(400, detail=str(exception)) from exception
279
280 1
        try:
281 1
            self._use_uni_tags(evc)
282
        except ValueError as exception:
283
            raise HTTPException(400, detail=str(exception)) from exception
284
285
        # save circuit
286 1
        try:
287 1
            evc.sync()
288
        except ValidationError as exception:
289
            raise HTTPException(400, detail=str(exception)) from exception
290
291
        # store circuit in dictionary
292 1
        self.circuits[evc.id] = evc
293
294
        # Schedule the circuit deploy
295 1
        self.sched.add(evc)
296
297
        # Circuit has no schedule, deploy now
298 1
        if not evc.circuit_scheduler:
299 1
            with evc.lock:
300 1
                evc.deploy()
301
302
        # Notify users
303 1
        result = {"circuit_id": evc.id}
304 1
        status = 201
305 1
        log.debug("create_circuit result %s %s", result, status)
306 1
        emit_event(self.controller, name="created",
307
                   content=map_evc_event_content(evc))
308 1
        return JSONResponse(result, status_code=status)
309
310 1
    @staticmethod
311 1
    def _use_uni_tags(evc):
312 1
        uni_a = evc.uni_a
313 1
        try:
314 1
            evc._use_uni_vlan(uni_a)
315
        except ValueError as err:
316
            raise err
317 1
        try:
318 1
            uni_z = evc.uni_z
319 1
            evc._use_uni_vlan(uni_z)
320 1
        except ValueError as err:
321 1
            evc.make_uni_vlan_available(uni_a)
322 1
            raise err
323
324 1
    @listen_to('kytos/flow_manager.flow.removed')
325 1
    def on_flow_delete(self, event):
326
        """Capture delete messages to keep track when flows got removed."""
327
        self.handle_flow_delete(event)
328
329 1
    def handle_flow_delete(self, event):
330
        """Keep track when the EVC got flows removed by deriving its cookie."""
331 1
        flow = event.content["flow"]
332 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
333 1
        if evc:
334 1
            log.debug("Flow removed in EVC %s", evc.id)
335 1
            evc.set_flow_removed_at()
336
337 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
338 1
    @validate_openapi(spec)
339 1
    def update(self, request: Request) -> JSONResponse:
340
        """Update a circuit based on payload.
341
342
        The EVC attributes (creation_time, active, current_path,
343
        failover_path, _id, archived) can't be updated.
344
        """
345 1
        data = get_json_or_400(request, self.controller.loop)
346 1
        circuit_id = request.path_params["circuit_id"]
347 1
        log.debug("update /v2/evc/%s", circuit_id)
348 1
        try:
349 1
            evc = self.circuits[circuit_id]
350 1
        except KeyError:
351 1
            result = f"circuit_id {circuit_id} not found"
352 1
            log.debug("update result %s %s", result, 404)
353 1
            raise HTTPException(404, detail=result) from KeyError
354
355 1
        if evc.archived:
356 1
            result = "Can't update archived EVC"
357 1
            log.debug("update result %s %s", result, 409)
358 1
            raise HTTPException(409, detail=result)
359
360 1
        try:
361 1
            enable, redeploy = evc.update(
362
                **self._evc_dict_with_instances(data)
363
            )
364 1
        except ValidationError as exception:
365
            raise HTTPException(400, detail=str(exception)) from exception
366 1
        except ValueError as exception:
367 1
            log.error(exception)
368 1
            log.debug("update result %s %s", exception, 400)
369 1
            raise HTTPException(400, detail=str(exception)) from exception
370 1
        except DisabledSwitch as exception:
371 1
            log.debug("update result %s %s", exception, 409)
372 1
            raise HTTPException(
373
                    409,
374
                    detail=f"Path is not valid: {exception}"
375
                ) from exception
376
377 1
        if evc.is_active():
378
            if enable is False:  # disable if active
379
                with evc.lock:
380
                    evc.remove()
381
            elif redeploy is not None:  # redeploy if active
382
                with evc.lock:
383
                    evc.remove()
384
                    evc.deploy()
385
        else:
386 1
            if enable is True:  # enable if inactive
387 1
                with evc.lock:
388 1
                    evc.deploy()
389 1
        result = {evc.id: evc.as_dict()}
390 1
        status = 200
391
392 1
        log.debug("update result %s %s", result, status)
393 1
        emit_event(self.controller, "updated",
394
                   content=map_evc_event_content(evc, **data))
395 1
        return JSONResponse(result, status_code=status)
396
397 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
398 1
    def delete_circuit(self, request: Request) -> JSONResponse:
399
        """Remove a circuit.
400
401
        First, the flows are removed from the switches, and then the EVC is
402
        disabled.
403
        """
404 1
        circuit_id = request.path_params["circuit_id"]
405 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
406 1
        try:
407 1
            evc = self.circuits[circuit_id]
408 1
        except KeyError:
409 1
            result = f"circuit_id {circuit_id} not found"
410 1
            log.debug("delete_circuit result %s %s", result, 404)
411 1
            raise HTTPException(404, detail=result) from KeyError
412
413 1
        if evc.archived:
414 1
            result = f"Circuit {circuit_id} already removed"
415 1
            log.debug("delete_circuit result %s %s", result, 404)
416 1
            raise HTTPException(404, detail=result)
417
418 1
        log.info("Removing %s", evc)
419 1
        with evc.lock:
420 1
            evc.remove_current_flows()
421 1
            evc.remove_failover_flows(sync=False)
422 1
            evc.deactivate()
423 1
            evc.disable()
424 1
            self.sched.remove(evc)
425 1
            evc.archive()
426 1
            evc.remove_uni_tags()
427 1
            evc.sync()
428 1
        log.info("EVC removed. %s", evc)
429 1
        result = {"response": f"Circuit {circuit_id} removed"}
430 1
        status = 200
431
432 1
        log.debug("delete_circuit result %s %s", result, status)
433 1
        emit_event(self.controller, "deleted",
434
                   content=map_evc_event_content(evc))
435 1
        return JSONResponse(result, status_code=status)
436
437 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["GET"])
438 1
    def get_metadata(self, request: Request) -> JSONResponse:
439
        """Get metadata from an EVC."""
440 1
        circuit_id = request.path_params["circuit_id"]
441 1
        try:
442 1
            return (
443
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
444
            )
445
        except KeyError as error:
446
            raise HTTPException(
447
                404,
448
                detail=f"circuit_id {circuit_id} not found."
449
            ) from error
450
451 1 View Code Duplication
    @rest("v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
452 1
    @validate_openapi(spec)
453 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
454
        """Add metadata to a bulk of EVCs."""
455 1
        data = get_json_or_400(request, self.controller.loop)
456 1
        circuit_ids = data.pop("circuit_ids")
457
458 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
459
460 1
        fail_evcs = []
461 1
        for _id in circuit_ids:
462 1
            try:
463 1
                evc = self.circuits[_id]
464 1
                evc.extend_metadata(data)
465 1
            except KeyError:
466 1
                fail_evcs.append(_id)
467
468 1
        if fail_evcs:
469 1
            raise HTTPException(404, detail=fail_evcs)
470 1
        return JSONResponse("Operation successful", status_code=201)
471
472 1
    @rest("v2/evc/{circuit_id}/metadata", methods=["POST"])
473 1
    @validate_openapi(spec)
474 1
    def add_metadata(self, request: Request) -> JSONResponse:
475
        """Add metadata to an EVC."""
476 1
        circuit_id = request.path_params["circuit_id"]
477 1
        metadata = get_json_or_400(request, self.controller.loop)
478 1
        if not isinstance(metadata, dict):
479
            raise HTTPException(400, "Invalid metadata value: {metadata}")
480 1
        try:
481 1
            evc = self.circuits[circuit_id]
482 1
        except KeyError as error:
483 1
            raise HTTPException(
484
                404,
485
                detail=f"circuit_id {circuit_id} not found."
486
            ) from error
487
488 1
        evc.extend_metadata(metadata)
489 1
        evc.sync()
490 1
        return JSONResponse("Operation successful", status_code=201)
491
492 1 View Code Duplication
    @rest("v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
493 1
    @validate_openapi(spec)
494 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
495
        """Delete metada from a bulk of EVCs"""
496 1
        data = get_json_or_400(request, self.controller.loop)
497 1
        key = request.path_params["key"]
498 1
        circuit_ids = data.pop("circuit_ids")
499 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
500
501 1
        fail_evcs = []
502 1
        for _id in circuit_ids:
503 1
            try:
504 1
                evc = self.circuits[_id]
505 1
                evc.remove_metadata(key)
506
            except KeyError:
507
                fail_evcs.append(_id)
508
509 1
        if fail_evcs:
510
            raise HTTPException(404, detail=fail_evcs)
511 1
        return JSONResponse("Operation successful")
512
513 1
    @rest("v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
514 1
    def delete_metadata(self, request: Request) -> JSONResponse:
515
        """Delete metadata from an EVC."""
516 1
        circuit_id = request.path_params["circuit_id"]
517 1
        key = request.path_params["key"]
518 1
        try:
519 1
            evc = self.circuits[circuit_id]
520 1
        except KeyError as error:
521 1
            raise HTTPException(
522
                404,
523
                detail=f"circuit_id {circuit_id} not found."
524
            ) from error
525
526 1
        evc.remove_metadata(key)
527 1
        evc.sync()
528 1
        return JSONResponse("Operation successful")
529
530 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
531 1
    def redeploy(self, request: Request) -> JSONResponse:
532
        """Endpoint to force the redeployment of an EVC."""
533 1
        circuit_id = request.path_params["circuit_id"]
534 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
535 1
        try:
536 1
            evc = self.circuits[circuit_id]
537 1
        except KeyError:
538 1
            raise HTTPException(
539
                404,
540
                detail=f"circuit_id {circuit_id} not found"
541
            ) from KeyError
542 1
        if evc.is_enabled():
543 1
            with evc.lock:
544 1
                evc.remove_current_flows()
545 1
                evc.deploy()
546 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
547 1
            status = 202
548
        else:
549 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
550 1
            status = 409
551
552 1
        return JSONResponse(result, status_code=status)
553
554 1
    @rest("/v2/evc/schedule/", methods=["POST"])
555 1
    @validate_openapi(spec)
556 1
    def create_schedule(self, request: Request) -> JSONResponse:
557
        """
558
        Create a new schedule for a given circuit.
559
560
        This service do no check if there are conflicts with another schedule.
561
        Payload example:
562
            {
563
              "circuit_id":"aa:bb:cc",
564
              "schedule": {
565
                "date": "2019-08-07T14:52:10.967Z",
566
                "interval": "string",
567
                "frequency": "1 * * * *",
568
                "action": "create"
569
              }
570
            }
571
        """
572 1
        log.debug("create_schedule /v2/evc/schedule/")
573 1
        data = get_json_or_400(request, self.controller.loop)
574 1
        circuit_id = data["circuit_id"]
575 1
        schedule_data = data["schedule"]
576
577
        # Get EVC from circuits buffer
578 1
        circuits = self._get_circuits_buffer()
579
580
        # get the circuit
581 1
        evc = circuits.get(circuit_id)
582
583
        # get the circuit
584 1
        if not evc:
585 1
            result = f"circuit_id {circuit_id} not found"
586 1
            log.debug("create_schedule result %s %s", result, 404)
587 1
            raise HTTPException(404, detail=result)
588
        # Can not modify circuits deleted and archived
589 1
        if evc.archived:
590 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
591 1
            log.debug("create_schedule result %s %s", result, 409)
592 1
            raise HTTPException(409, detail=result)
593
594
        # new schedule from dict
595 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
596
597
        # If there is no schedule, create the list
598 1
        if not evc.circuit_scheduler:
599 1
            evc.circuit_scheduler = []
600
601
        # Add the new schedule
602 1
        evc.circuit_scheduler.append(new_schedule)
603
604
        # Add schedule job
605 1
        self.sched.add_circuit_job(evc, new_schedule)
606
607
        # save circuit to mongodb
608 1
        evc.sync()
609
610 1
        result = new_schedule.as_dict()
611 1
        status = 201
612
613 1
        log.debug("create_schedule result %s %s", result, status)
614 1
        return JSONResponse(result, status_code=status)
615
616 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
617 1
    @validate_openapi(spec)
618 1
    def update_schedule(self, request: Request) -> JSONResponse:
619
        """Update a schedule.
620
621
        Change all attributes from the given schedule from a EVC circuit.
622
        The schedule ID is preserved as default.
623
        Payload example:
624
            {
625
              "date": "2019-08-07T14:52:10.967Z",
626
              "interval": "string",
627
              "frequency": "1 * * *",
628
              "action": "create"
629
            }
630
        """
631 1
        data = get_json_or_400(request, self.controller.loop)
632 1
        schedule_id = request.path_params["schedule_id"]
633 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
634
635
        # Try to find a circuit schedule
636 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
637
638
        # Can not modify circuits deleted and archived
639 1
        if not found_schedule:
640 1
            result = f"schedule_id {schedule_id} not found"
641 1
            log.debug("update_schedule result %s %s", result, 404)
642 1
            raise HTTPException(404, detail=result)
643 1
        if evc.archived:
644 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
645 1
            log.debug("update_schedule result %s %s", result, 409)
646 1
            raise HTTPException(409, detail=result)
647
648 1
        new_schedule = CircuitSchedule.from_dict(data)
649 1
        new_schedule.id = found_schedule.id
650
        # Remove the old schedule
651 1
        evc.circuit_scheduler.remove(found_schedule)
652
        # Append the modified schedule
653 1
        evc.circuit_scheduler.append(new_schedule)
654
655
        # Cancel all schedule jobs
656 1
        self.sched.cancel_job(found_schedule.id)
657
        # Add the new circuit schedule
658 1
        self.sched.add_circuit_job(evc, new_schedule)
659
        # Save EVC to mongodb
660 1
        evc.sync()
661
662 1
        result = new_schedule.as_dict()
663 1
        status = 200
664
665 1
        log.debug("update_schedule result %s %s", result, status)
666 1
        return JSONResponse(result, status_code=status)
667
668 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
669 1
    def delete_schedule(self, request: Request) -> JSONResponse:
670
        """Remove a circuit schedule.
671
672
        Remove the Schedule from EVC.
673
        Remove the Schedule from cron job.
674
        Save the EVC to the Storehouse.
675
        """
676 1
        schedule_id = request.path_params["schedule_id"]
677 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
678 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
679
680
        # Can not modify circuits deleted and archived
681 1
        if not found_schedule:
682 1
            result = f"schedule_id {schedule_id} not found"
683 1
            log.debug("delete_schedule result %s %s", result, 404)
684 1
            raise HTTPException(404, detail=result)
685
686 1
        if evc.archived:
687 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
688 1
            log.debug("delete_schedule result %s %s", result, 409)
689 1
            raise HTTPException(409, detail=result)
690
691
        # Remove the old schedule
692 1
        evc.circuit_scheduler.remove(found_schedule)
693
694
        # Cancel all schedule jobs
695 1
        self.sched.cancel_job(found_schedule.id)
696
        # Save EVC to mongodb
697 1
        evc.sync()
698
699 1
        result = "Schedule removed"
700 1
        status = 200
701
702 1
        log.debug("delete_schedule result %s %s", result, status)
703 1
        return JSONResponse(result, status_code=status)
704
705 1
    def _is_duplicated_evc(self, evc):
706
        """Verify if the circuit given is duplicated with the stored evcs.
707
708
        Args:
709
            evc (EVC): circuit to be analysed.
710
711
        Returns:
712
            boolean: True if the circuit is duplicated, otherwise False.
713
714
        """
715 1
        for circuit in tuple(self.circuits.values()):
716 1
            if not circuit.archived and circuit.shares_uni(evc):
717 1
                return True
718 1
        return False
719
720 1
    @listen_to("kytos/topology.link_up")
721 1
    def on_link_up(self, event):
722
        """Change circuit when link is up or end_maintenance."""
723
        self.handle_link_up(event)
724
725 1
    def handle_link_up(self, event):
726
        """Change circuit when link is up or end_maintenance."""
727 1
        log.info("Event handle_link_up %s", event.content["link"])
728 1
        for evc in self.get_evcs_by_svc_level():
729 1
            if evc.is_enabled() and not evc.archived:
730 1
                with evc.lock:
731 1
                    evc.handle_link_up(event.content["link"])
732
733 1
    @listen_to("kytos/topology.link_down")
734 1
    def on_link_down(self, event):
735
        """Change circuit when link is down or under_mantenance."""
736
        self.handle_link_down(event)
737
738 1
    def handle_link_down(self, event):
739
        """Change circuit when link is down or under_mantenance."""
740 1
        link = event.content["link"]
741 1
        log.info("Event handle_link_down %s", link)
742 1
        switch_flows = {}
743 1
        evcs_with_failover = []
744 1
        evcs_normal = []
745 1
        check_failover = []
746 1
        for evc in self.get_evcs_by_svc_level():
747 1
            if evc.is_affected_by_link(link):
748
                # if there is no failover path, handles link down the
749
                # tradditional way
750 1
                if (
751
                    not getattr(evc, 'failover_path', None) or
752
                    evc.is_failover_path_affected_by_link(link)
753
                ):
754 1
                    evcs_normal.append(evc)
755 1
                    continue
756 1
                for dpid, flows in evc.get_failover_flows().items():
757 1
                    switch_flows.setdefault(dpid, [])
758 1
                    switch_flows[dpid].extend(flows)
759 1
                evcs_with_failover.append(evc)
760
            else:
761 1
                check_failover.append(evc)
762
763 1
        while switch_flows:
764 1
            offset = settings.BATCH_SIZE or None
765 1
            switches = list(switch_flows.keys())
766 1
            for dpid in switches:
767 1
                emit_event(
768
                    self.controller,
769
                    context="kytos.flow_manager",
770
                    name="flows.install",
771
                    content={
772
                        "dpid": dpid,
773
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
774
                    }
775
                )
776 1
                if offset is None or offset >= len(switch_flows[dpid]):
777 1
                    del switch_flows[dpid]
778 1
                    continue
779 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
780 1
            time.sleep(settings.BATCH_INTERVAL)
781
782 1
        for evc in evcs_with_failover:
783 1
            with evc.lock:
784 1
                old_path = evc.current_path
785 1
                evc.current_path = evc.failover_path
786 1
                evc.failover_path = old_path
787 1
                evc.sync()
788 1
            emit_event(self.controller, "redeployed_link_down",
789
                       content=map_evc_event_content(evc))
790 1
            log.info(
791
                f"{evc} redeployed with failover due to link down {link.id}"
792
            )
793
794 1
        for evc in evcs_normal:
795 1
            emit_event(
796
                self.controller,
797
                "evc_affected_by_link_down",
798
                content={"link_id": link.id} | map_evc_event_content(evc)
799
            )
800
801
        # After handling the hot path, check if new failover paths are needed.
802
        # Note that EVCs affected by link down will generate a KytosEvent for
803
        # deployed|redeployed, which will trigger the failover path setup.
804
        # Thus, we just need to further check the check_failover list
805 1
        for evc in check_failover:
806 1
            if evc.is_failover_path_affected_by_link(link):
807 1
                evc.setup_failover_path()
808
809 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
810 1
    def on_evc_affected_by_link_down(self, event):
811
        """Change circuit when link is down or under_mantenance."""
812
        self.handle_evc_affected_by_link_down(event)
813
814 1
    def handle_evc_affected_by_link_down(self, event):
815
        """Change circuit when link is down or under_mantenance."""
816 1
        evc = self.circuits.get(event.content["evc_id"])
817 1
        link_id = event.content['link_id']
818 1
        if not evc:
819 1
            return
820 1
        with evc.lock:
821 1
            result = evc.handle_link_down()
822 1
        event_name = "error_redeploy_link_down"
823 1
        if result:
824 1
            log.info(f"{evc} redeployed due to link down {link_id}")
825 1
            event_name = "redeployed_link_down"
826 1
        emit_event(self.controller, event_name,
827
                   content=map_evc_event_content(evc))
828
829 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
830 1
    def on_evc_deployed(self, event):
831
        """Handle EVC deployed|redeployed_link_down."""
832
        self.handle_evc_deployed(event)
833
834 1
    def handle_evc_deployed(self, event):
835
        """Setup failover path on evc deployed."""
836 1
        evc = self.circuits.get(event.content["evc_id"])
837 1
        if not evc:
838 1
            return
839 1
        with evc.lock:
840 1
            evc.setup_failover_path()
841
842 1
    @listen_to("kytos/topology.topology_loaded")
843 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
844
        """Load EVCs once the topology is available."""
845
        self.load_all_evcs()
846
847 1
    def load_all_evcs(self):
848
        """Try to load all EVCs on startup."""
849 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
850 1
        for circuit_id, circuit in circuits:
851 1
            if circuit_id not in self.circuits:
852 1
                self._load_evc(circuit)
853
854 1
    def _load_evc(self, circuit_dict):
855
        """Load one EVC from mongodb to memory."""
856 1
        try:
857 1
            evc = self._evc_from_dict(circuit_dict)
858 1
        except ValueError as exception:
859 1
            log.error(
860
                f"Could not load EVC: dict={circuit_dict} error={exception}"
861
            )
862 1
            return None
863
864 1
        if evc.archived:
865 1
            return None
866 1
        evc.deactivate()
867 1
        evc.sync()
868 1
        self.circuits.setdefault(evc.id, evc)
869 1
        self.sched.add(evc)
870 1
        return evc
871
872 1
    @listen_to("kytos/flow_manager.flow.error")
873 1
    def on_flow_mod_error(self, event):
874
        """Handle flow mod errors related to an EVC."""
875
        self.handle_flow_mod_error(event)
876
877 1
    def handle_flow_mod_error(self, event):
878
        """Handle flow mod errors related to an EVC."""
879 1
        flow = event.content["flow"]
880 1
        command = event.content.get("error_command")
881 1
        if command != "add":
882
            return
883 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
884 1
        if evc:
885 1
            evc.remove_current_flows()
886
887 1
    def _evc_dict_with_instances(self, evc_dict):
888
        """Convert some dict values to instance of EVC classes.
889
890
        This method will convert: [UNI, Link]
891
        """
892 1
        data = evc_dict.copy()  # Do not modify the original dict
893 1
        for attribute, value in data.items():
894
            # Get multiple attributes.
895
            # Ex: uni_a, uni_z
896 1
            if "uni" in attribute:
897 1
                try:
898 1
                    data[attribute] = self._uni_from_dict(value)
899 1
                except ValueError as exception:
900 1
                    result = "Error creating UNI: Invalid value"
901 1
                    raise ValueError(result) from exception
902
903 1
            if attribute == "circuit_scheduler":
904 1
                data[attribute] = []
905 1
                for schedule in value:
906 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
907
908
            # Get multiple attributes.
909
            # Ex: primary_links,
910
            #     backup_links,
911
            #     current_links_cache,
912
            #     primary_links_cache,
913
            #     backup_links_cache
914 1
            if "links" in attribute:
915 1
                data[attribute] = [
916
                    self._link_from_dict(link) for link in value
917
                ]
918
919
            # Ex: current_path,
920
            #     primary_path,
921
            #     backup_path
922 1
            if "path" in attribute and attribute != "dynamic_backup_path":
923 1
                data[attribute] = Path(
924
                    [self._link_from_dict(link) for link in value]
925
                )
926
927 1
        return data
928
929 1
    def _evc_from_dict(self, evc_dict):
930 1
        data = self._evc_dict_with_instances(evc_dict)
931 1
        data["table_group"] = self.table_group
932 1
        return EVC(self.controller, **data)
933
934 1
    def _uni_from_dict(self, uni_dict):
935
        """Return a UNI object from python dict."""
936 1
        if uni_dict is None:
937 1
            return False
938
939 1
        interface_id = uni_dict.get("interface_id")
940 1
        interface = self.controller.get_interface_by_id(interface_id)
941 1
        if interface is None:
942 1
            result = (
943
                "Error creating UNI:"
944
                + f"Could not instantiate interface {interface_id}"
945
            )
946 1
            raise ValueError(result) from ValueError
947
948 1
        tag_dict = uni_dict.get("tag", None)
949 1
        if tag_dict:
950 1
            tag = TAG.from_dict(tag_dict)
951
        else:
952 1
            tag = None
953 1
        uni = UNI(interface, tag)
954
955 1
        return uni
956
957 1
    def _link_from_dict(self, link_dict):
958
        """Return a Link object from python dict."""
959 1
        id_a = link_dict.get("endpoint_a").get("id")
960 1
        id_b = link_dict.get("endpoint_b").get("id")
961
962 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
963 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
964 1
        if not endpoint_a:
965 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
966 1
            raise ValueError(error_msg)
967 1
        if not endpoint_b:
968
            error_msg = f"Could not get interface endpoint_b id {id_b}"
969
            raise ValueError(error_msg)
970
971 1
        link = Link(endpoint_a, endpoint_b)
972 1
        if "metadata" in link_dict:
973 1
            link.extend_metadata(link_dict.get("metadata"))
974
975 1
        s_vlan = link.get_metadata("s_vlan")
976 1
        if s_vlan:
977 1
            tag = TAG.from_dict(s_vlan)
978 1
            if tag is False:
979
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
980
                raise ValueError(error_msg)
981 1
            link.update_metadata("s_vlan", tag)
982 1
        return link
983
984 1
    def _find_evc_by_schedule_id(self, schedule_id):
985
        """
986
        Find an EVC and CircuitSchedule based on schedule_id.
987
988
        :param schedule_id: Schedule ID
989
        :return: EVC and Schedule
990
        """
991 1
        circuits = self._get_circuits_buffer()
992 1
        found_schedule = None
993 1
        evc = None
994
995
        # pylint: disable=unused-variable
996 1
        for c_id, circuit in circuits.items():
997 1
            for schedule in circuit.circuit_scheduler:
998 1
                if schedule.id == schedule_id:
999 1
                    found_schedule = schedule
1000 1
                    evc = circuit
1001 1
                    break
1002 1
            if found_schedule:
1003 1
                break
1004 1
        return evc, found_schedule
1005
1006 1
    def _get_circuits_buffer(self):
1007
        """
1008
        Return the circuit buffer.
1009
1010
        If the buffer is empty, try to load data from mongodb.
1011
        """
1012 1
        if not self.circuits:
1013
            # Load circuits from mongodb to buffer
1014 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1015 1
            for c_id, circuit in circuits.items():
1016 1
                evc = self._evc_from_dict(circuit)
1017 1
                self.circuits[c_id] = evc
1018 1
        return self.circuits
1019
1020
    # pylint: disable=attribute-defined-outside-init
1021 1
    @alisten_to("kytos/of_multi_table.enable_table")
1022 1
    async def on_table_enabled(self, event):
1023
        """Handle a recently table enabled."""
1024 1
        table_group = event.content.get("mef_eline", None)
1025 1
        if not table_group:
1026
            return
1027 1
        for group in table_group:
1028 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1029 1
                log.error(f'The table group "{group}" is not allowed for '
1030
                          f'mef_eline. Allowed table groups are '
1031
                          f'{settings.TABLE_GROUP_ALLOWED}')
1032 1
                return
1033 1
        self.table_group.update(table_group)
1034 1
        content = {"group_table": self.table_group}
1035 1
        name = "kytos/mef_eline.enable_table"
1036
        await aemit_event(self.controller, name, content)
1037