Test Failed
Pull Request — master (#396)
by
unknown
03:43
created

build.main.Main.on_flow_delete()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1.037

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 2
cts 3
cp 0.6667
cc 1
nop 2
crap 1.037
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.exceptions import KytosInvalidTagRanges, KytosTagsAreNotAvailable, KytosTagError
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI, TAGRange
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from kytos.core.tag_ranges import get_tag_ranges
22 1
from napps.kytos.mef_eline import controllers, settings
23
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
24 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
25 1
                                          Path)
26
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
27
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
28
                                         emit_event, get_vlan_tags_and_masks,
29
                                         map_evc_event_content)
30 1
31
32
# pylint: disable=too-many-public-methods
33
class Main(KytosNApp):
34
    """Main class of amlight/mef_eline NApp.
35
36 1
    This class is the entry point for this napp.
37
    """
38 1
39
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47 1
        So, if you have any setup routine, insert it here.
48
        """
49
        # object used to scheduler circuit events
50 1
        self.sched = Scheduler()
51 1
52
        # object to save and load circuits
53
        self.mongo_controller = self.get_eline_controller()
54 1
        self.mongo_controller.bootstrap_indexes()
55
56
        # set the controller that will manager the dynamic paths
57
        DynamicPathManager.set_controller(self.controller)
58 1
59
        # dictionary of EVCs created. It acts as a circuit buffer.
60 1
        # Every create/update/delete must be synced to mongodb.
61 1
        self.circuits = {}
62 1
63
        self.table_group = {"epl": 0, "evpl": 0}
64 1
        self._lock = Lock()
65 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
66
67 1
        self.load_all_evcs()
68
        self._topology_updated_at = None
69
70
    def get_evcs_by_svc_level(self) -> list:
71
        """Get circuits sorted by desc service level and asc creation_time.
72 1
73
        In the future, as more ops are offloaded it should be get from the DB.
74
        """
75 1
        return sorted(self.circuits.values(),
76 1
                      key=lambda x: (-x.service_level, x.creation_time))
77
78
    @staticmethod
79
    def get_eline_controller():
80 1
        """Return the ELineController instance."""
81
        return controllers.ELineController()
82 1
83 1
    def execute(self):
84 1
        """Execute once when the napp is running."""
85 1
        if self._lock.locked():
86 1
            return
87 1
        log.debug("Starting consistency routine")
88
        with self._lock:
89 1
            self.execute_consistency()
90
        log.debug("Finished consistency routine")
91
92 1
    def should_be_checked(self, circuit):
93
        "Verify if the circuit meets the necessary conditions to be checked"
94
        # pylint: disable=too-many-boolean-expressions
95
        if (
96
                circuit.is_enabled()
97
                and not circuit.is_active()
98
                and not circuit.lock.locked()
99
                and not circuit.has_recent_removed_flow()
100
                and not circuit.is_recent_updated()
101
                and circuit.are_unis_active(self.controller.switches)
102
                # if a inter-switch EVC does not have current_path, it does not
103 1
                # make sense to run sdntrace on it
104
                and (circuit.is_intra_switch() or circuit.current_path)
105
                ):
106 1
            return True
107
        return False
108 1
109 1
    def execute_consistency(self):
110 1
        """Execute consistency routine."""
111 1
        circuits_to_check = []
112 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
113 1
        for circuit in self.get_evcs_by_svc_level():
114 1
            stored_circuits.pop(circuit.id, None)
115 1
            if self.should_be_checked(circuit):
116 1
                circuits_to_check.append(circuit)
117 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
118 1
        for circuit in circuits_to_check:
119 1
            is_checked = circuits_checked.get(circuit.id)
120 1
            if is_checked:
121 1
                circuit.execution_rounds = 0
122 1
                log.info(f"{circuit} enabled but inactive - activating")
123
                with circuit.lock:
124 1
                    circuit.activate()
125 1
                    circuit.sync()
126 1
            else:
127 1
                circuit.execution_rounds += 1
128 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
129 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
130 1
                    with circuit.lock:
131 1
                        circuit.deploy()
132
        for circuit_id in stored_circuits:
133 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
134
            self._load_evc(stored_circuits[circuit_id])
135
136
    def shutdown(self):
137
        """Execute when your napp is unloaded.
138
139 1
        If you have some cleanup procedure, insert it here.
140 1
        """
141
142
    @rest("/v2/evc/", methods=["GET"])
143
    def list_circuits(self, request: Request) -> JSONResponse:
144
        """Endpoint to return circuits stored.
145
146 1
        archive query arg if defined (not null) will be filtered
147 1
        accordingly, by default only non archived evcs will be listed
148 1
        """
149 1
        log.debug("list_circuits /v2/evc")
150 1
        args = request.query_params
151
        archived = args.get("archived", "false").lower()
152 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
153 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
154
                                                      metadata=args)
155 1
        circuits = circuits['circuits']
156 1
        return JSONResponse(circuits)
157
158
    @rest("/v2/evc/schedule", methods=["GET"])
159
    def list_schedules(self, _request: Request) -> JSONResponse:
160
        """Endpoint to return all schedules stored for all circuits.
161
162
        Return a JSON with the following template:
163
        [{"schedule_id": <schedule_id>,
164 1
         "circuit_id": <circuit_id>,
165 1
         "schedule": <schedule object>}]
166 1
        """
167 1
        log.debug("list_schedules /v2/evc/schedule")
168 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
169 1
        if not circuits:
170
            result = {}
171 1
            status = 200
172 1
            return JSONResponse(result, status_code=status)
173 1
174 1
        result = []
175 1
        status = 200
176 1
        for circuit in circuits:
177 1
            circuit_scheduler = circuit.get("circuit_scheduler")
178
            if circuit_scheduler:
179
                for scheduler in circuit_scheduler:
180
                    value = {
181
                        "schedule_id": scheduler.get("id"),
182 1
                        "circuit_id": circuit.get("id"),
183
                        "schedule": scheduler,
184 1
                    }
185 1
                    result.append(value)
186
187 1
        log.debug("list_schedules result %s %s", result, status)
188 1
        return JSONResponse(result, status_code=status)
189
190 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
191 1
    def get_circuit(self, request: Request) -> JSONResponse:
192 1
        """Endpoint to return a circuit based on id."""
193 1
        circuit_id = request.path_params["circuit_id"]
194 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
195 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
196 1
        if not circuit:
197 1
            result = f"circuit_id {circuit_id} not found"
198 1
            log.debug("get_circuit result %s %s", result, 404)
199 1
            raise HTTPException(404, detail=result)
200
        status = 200
201 1
        log.debug("get_circuit result %s %s", circuit, status)
202 1
        return JSONResponse(circuit, status_code=status)
203 1
204
    @rest("/v2/evc/", methods=["POST"])
205
    @validate_openapi(spec)
206
    def create_circuit(self, request: Request) -> JSONResponse:
207
        """Try to create a new circuit.
208
209
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
210
        UNI_Z's requested C-VID are available from the interfaces' pools. This
211
        is checked when creating the UNI object.
212
213
        Then, E-Line NApp requests a primary and a backup path to the
214
        Pathfinder NApp using the attributes primary_links and backup_links
215
        submitted via REST
216
217
        # For each link composing paths in #3:
218
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
219
        #  - Using the S-VID obtained, generate abstract flow entries to be
220
        #    sent to FlowManager
221
222
        Push abstract flow entries to FlowManager and FlowManager pushes
223
        OpenFlow entries to datapaths
224
225
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
226
        creation
227
228 1
        Finnaly, notify user of the status of its request.
229 1
        """
230
        # Try to create the circuit object
231 1
        log.debug("create_circuit /v2/evc/")
232 1
        data = get_json_or_400(request, self.controller.loop)
233 1
234 1
        try:
235 1
            evc = self._evc_from_dict(data)
236
        except ValueError as exception:
237 1
            log.debug("create_circuit result %s %s", exception, 400)
238 1
            raise HTTPException(400, detail=str(exception)) from exception
239 1
        except KytosTagError as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 400)
241 1
            raise HTTPException(400, detail=str(exception)) from exception
242
        try:
243
            check_disabled_component(evc.uni_a, evc.uni_z)
244
        except DisabledSwitch as exception:
245
            log.debug("create_circuit result %s %s", exception, 409)
246 1
            raise HTTPException(
247
                    409,
248
                    detail=f"Path is not valid: {exception}"
249
                ) from exception
250
251
        if evc.primary_path:
252
            try:
253
                evc.primary_path.is_valid(
254
                    evc.uni_a.interface.switch,
255
                    evc.uni_z.interface.switch,
256
                    bool(evc.circuit_scheduler),
257
                )
258 1
            except InvalidPath as exception:
259
                raise HTTPException(
260
                    400,
261
                    detail=f"primary_path is not valid: {exception}"
262
                ) from exception
263
        if evc.backup_path:
264
            try:
265
                evc.backup_path.is_valid(
266
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272 1
                    400,
273 1
                    detail=f"backup_path is not valid: {exception}"
274 1
                ) from exception
275 1
276
        if not evc._tag_lists_equal():
277 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
278 1
            raise HTTPException(400, detail=detail)
279 1
280 1
        try:
281
            evc._validate_has_primary_or_dynamic()
282 1
        except ValueError as exception:
283 1
            raise HTTPException(400, detail=str(exception)) from exception
284
285
        try:
286
            self._use_uni_tags(evc)
287
        except KytosTagsAreNotAvailable as exception:
288 1
            raise HTTPException(400, detail=str(exception)) from exception
289 1
290
        # save circuit
291
        try:
292
            evc.sync()
293
        except ValidationError as exception:
294 1
            raise HTTPException(400, detail=str(exception)) from exception
295
296
        # store circuit in dictionary
297 1
        self.circuits[evc.id] = evc
298
299
        # Schedule the circuit deploy
300 1
        self.sched.add(evc)
301 1
302 1
        # Circuit has no schedule, deploy now
303
        if not evc.circuit_scheduler:
304
            with evc.lock:
305 1
                evc.deploy()
306 1
307 1
        # Notify users
308 1
        result = {"circuit_id": evc.id}
309
        status = 201
310 1
        log.debug("create_circuit result %s %s", result, status)
311
        emit_event(self.controller, name="created",
312 1
                   content=map_evc_event_content(evc))
313 1
        return JSONResponse(result, status_code=status)
314 1
315 1
    @staticmethod
316 1
    def _use_uni_tags(evc):
317 1
        uni_a = evc.uni_a
318 1
        evc._use_uni_vlan(uni_a)
319 1
        try:
320 1
            uni_z = evc.uni_z
321 1
            evc._use_uni_vlan(uni_z)
322 1
        except KytosTagsAreNotAvailable as err:
323 1
            evc.make_uni_vlan_available(uni_a)
324 1
            raise err
325
326 1
    @listen_to('kytos/flow_manager.flow.removed')
327 1
    def on_flow_delete(self, event):
328
        """Capture delete messages to keep track when flows got removed."""
329
        self.handle_flow_delete(event)
330
331 1
    def handle_flow_delete(self, event):
332
        """Keep track when the EVC got flows removed by deriving its cookie."""
333 1
        flow = event.content["flow"]
334 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
335 1
        if evc:
336 1
            log.debug("Flow removed in EVC %s", evc.id)
337 1
            evc.set_flow_removed_at()
338
339 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
340 1
    @validate_openapi(spec)
341 1
    def update(self, request: Request) -> JSONResponse:
342
        """Update a circuit based on payload.
343
344
        The EVC attributes (creation_time, active, current_path,
345
        failover_path, _id, archived) can't be updated.
346
        """
347 1
        data = get_json_or_400(request, self.controller.loop)
348 1
        circuit_id = request.path_params["circuit_id"]
349 1
        log.debug("update /v2/evc/%s", circuit_id)
350 1
        try:
351 1
            evc = self.circuits[circuit_id]
352 1
        except KeyError:
353 1
            result = f"circuit_id {circuit_id} not found"
354 1
            log.debug("update result %s %s", result, 404)
355 1
            raise HTTPException(404, detail=result) from KeyError
356
357 1
        if evc.archived:
358 1
            result = "Can't update archived EVC"
359 1
            log.debug("update result %s %s", result, 409)
360 1
            raise HTTPException(409, detail=result)
361
362 1
        try:
363 1
            enable, redeploy = evc.update(
364
                **self._evc_dict_with_instances(data)
365
            )
366 1
        except KytosTagError as exception:
367
            raise HTTPException(400, detail=str(exception)) from exception
368 1
        except ValidationError as exception:
369 1
            raise HTTPException(400, detail=str(exception)) from exception
370 1
        except ValueError as exception:
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except KytosTagsAreNotAvailable as exception:
374 1
            log.debug("update result %s %s", exception, 400)
375
            raise HTTPException(400, detail=str(exception)) from exception
376
        except DisabledSwitch as exception:
377
            log.debug("update result %s %s", exception, 409)
378
            raise HTTPException(
379 1
                    409,
380
                    detail=f"Path is not valid: {exception}"
381
                ) from exception
382
383
        if evc.is_active():
384
            if enable is False:  # disable if active
385
                with evc.lock:
386
                    evc.remove()
387
            elif redeploy is not None:  # redeploy if active
388 1
                with evc.lock:
389 1
                    evc.remove()
390 1
                    evc.deploy()
391 1
        else:
392 1
            if enable is True:  # enable if inactive
393
                with evc.lock:
394 1
                    evc.deploy()
395 1
        result = {evc.id: evc.as_dict()}
396
        status = 200
397 1
398
        log.debug("update result %s %s", result, status)
399 1
        emit_event(self.controller, "updated",
400 1
                   content=map_evc_event_content(evc, **data))
401
        return JSONResponse(result, status_code=status)
402
403
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
404
    def delete_circuit(self, request: Request) -> JSONResponse:
405
        """Remove a circuit.
406 1
407 1
        First, the flows are removed from the switches, and then the EVC is
408 1
        disabled.
409 1
        """
410 1
        circuit_id = request.path_params["circuit_id"]
411 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
412 1
        try:
413 1
            evc = self.circuits[circuit_id]
414
        except KeyError:
415 1
            result = f"circuit_id {circuit_id} not found"
416 1
            log.debug("delete_circuit result %s %s", result, 404)
417 1
            raise HTTPException(404, detail=result) from KeyError
418 1
419
        if evc.archived:
420 1
            result = f"Circuit {circuit_id} already removed"
421 1
            log.debug("delete_circuit result %s %s", result, 404)
422 1
            raise HTTPException(404, detail=result)
423 1
424 1
        log.info("Removing %s", evc)
425 1
        with evc.lock:
426 1
            evc.remove_current_flows()
427 1
            evc.remove_failover_flows(sync=False)
428 1
            evc.deactivate()
429 1
            evc.disable()
430 1
            self.sched.remove(evc)
431 1
            evc.archive()
432 1
            evc.remove_uni_tags()
433
            evc.sync()
434 1
        log.info("EVC removed. %s", evc)
435 1
        result = {"response": f"Circuit {circuit_id} removed"}
436
        status = 200
437 1
438
        log.debug("delete_circuit result %s %s", result, status)
439 1
        emit_event(self.controller, "deleted",
440 1
                   content=map_evc_event_content(evc))
441
        return JSONResponse(result, status_code=status)
442 1
443 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
444 1
    def get_metadata(self, request: Request) -> JSONResponse:
445
        """Get metadata from an EVC."""
446
        circuit_id = request.path_params["circuit_id"]
447
        try:
448
            return (
449
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
450
            )
451
        except KeyError as error:
452
            raise HTTPException(
453 1
                404,
454 1
                detail=f"circuit_id {circuit_id} not found."
455 1
            ) from error
456
457 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
458 1
    @validate_openapi(spec)
459
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
460 1
        """Add metadata to a bulk of EVCs."""
461
        data = get_json_or_400(request, self.controller.loop)
462 1
        circuit_ids = data.pop("circuit_ids")
463 1
464 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
465 1
466 1
        fail_evcs = []
467 1
        for _id in circuit_ids:
468 1
            try:
469
                evc = self.circuits[_id]
470 1
                evc.extend_metadata(data)
471 1
            except KeyError:
472 1
                fail_evcs.append(_id)
473
474 1
        if fail_evcs:
475 1
            raise HTTPException(404, detail=fail_evcs)
476 1
        return JSONResponse("Operation successful", status_code=201)
477
478 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
479 1
    @validate_openapi(spec)
480 1
    def add_metadata(self, request: Request) -> JSONResponse:
481
        """Add metadata to an EVC."""
482 1
        circuit_id = request.path_params["circuit_id"]
483 1
        metadata = get_json_or_400(request, self.controller.loop)
484 1
        if not isinstance(metadata, dict):
485 1
            raise HTTPException(400, "Invalid metadata value: {metadata}")
486
        try:
487
            evc = self.circuits[circuit_id]
488
        except KeyError as error:
489
            raise HTTPException(
490 1
                404,
491 1
                detail=f"circuit_id {circuit_id} not found."
492 1
            ) from error
493
494 1
        evc.extend_metadata(metadata)
495 1
        evc.sync()
496 1
        return JSONResponse("Operation successful", status_code=201)
497
498 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
499 1
    @validate_openapi(spec)
500 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
501 1
        """Delete metada from a bulk of EVCs"""
502
        data = get_json_or_400(request, self.controller.loop)
503 1
        key = request.path_params["key"]
504 1
        circuit_ids = data.pop("circuit_ids")
505 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
506 1
507 1
        fail_evcs = []
508
        for _id in circuit_ids:
509
            try:
510
                evc = self.circuits[_id]
511 1
                evc.remove_metadata(key)
512
            except KeyError:
513 1
                fail_evcs.append(_id)
514
515 1
        if fail_evcs:
516 1
            raise HTTPException(404, detail=fail_evcs)
517
        return JSONResponse("Operation successful")
518 1
519 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
520 1
    def delete_metadata(self, request: Request) -> JSONResponse:
521 1
        """Delete metadata from an EVC."""
522 1
        circuit_id = request.path_params["circuit_id"]
523 1
        key = request.path_params["key"]
524
        try:
525
            evc = self.circuits[circuit_id]
526
        except KeyError as error:
527
            raise HTTPException(
528 1
                404,
529 1
                detail=f"circuit_id {circuit_id} not found."
530 1
            ) from error
531
532 1
        evc.remove_metadata(key)
533 1
        evc.sync()
534
        return JSONResponse("Operation successful")
535 1
536 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
537 1
    def redeploy(self, request: Request) -> JSONResponse:
538 1
        """Endpoint to force the redeployment of an EVC."""
539 1
        circuit_id = request.path_params["circuit_id"]
540 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
541
        try:
542
            evc = self.circuits[circuit_id]
543
        except KeyError:
544 1
            raise HTTPException(
545 1
                404,
546 1
                detail=f"circuit_id {circuit_id} not found"
547 1
            ) from KeyError
548 1
        if evc.is_enabled():
549 1
            with evc.lock:
550
                evc.remove_current_flows()
551 1
                evc.deploy()
552 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
553
            status = 202
554 1
        else:
555
            result = {"response": f"Circuit {circuit_id} is disabled."}
556 1
            status = 409
557 1
558 1
        return JSONResponse(result, status_code=status)
559
560
    @rest("/v2/evc/schedule/", methods=["POST"])
561
    @validate_openapi(spec)
562
    def create_schedule(self, request: Request) -> JSONResponse:
563
        """
564
        Create a new schedule for a given circuit.
565
566
        This service do no check if there are conflicts with another schedule.
567
        Payload example:
568
            {
569
              "circuit_id":"aa:bb:cc",
570
              "schedule": {
571
                "date": "2019-08-07T14:52:10.967Z",
572
                "interval": "string",
573
                "frequency": "1 * * * *",
574 1
                "action": "create"
575 1
              }
576 1
            }
577 1
        """
578
        log.debug("create_schedule /v2/evc/schedule/")
579
        data = get_json_or_400(request, self.controller.loop)
580 1
        circuit_id = data["circuit_id"]
581
        schedule_data = data["schedule"]
582
583 1
        # Get EVC from circuits buffer
584
        circuits = self._get_circuits_buffer()
585
586 1
        # get the circuit
587 1
        evc = circuits.get(circuit_id)
588 1
589 1
        # get the circuit
590
        if not evc:
591 1
            result = f"circuit_id {circuit_id} not found"
592 1
            log.debug("create_schedule result %s %s", result, 404)
593 1
            raise HTTPException(404, detail=result)
594 1
        # Can not modify circuits deleted and archived
595
        if evc.archived:
596
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
597 1
            log.debug("create_schedule result %s %s", result, 409)
598
            raise HTTPException(409, detail=result)
599
600 1
        # new schedule from dict
601 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
602
603
        # If there is no schedule, create the list
604 1
        if not evc.circuit_scheduler:
605
            evc.circuit_scheduler = []
606
607 1
        # Add the new schedule
608
        evc.circuit_scheduler.append(new_schedule)
609
610 1
        # Add schedule job
611
        self.sched.add_circuit_job(evc, new_schedule)
612 1
613 1
        # save circuit to mongodb
614
        evc.sync()
615 1
616 1
        result = new_schedule.as_dict()
617
        status = 201
618 1
619 1
        log.debug("create_schedule result %s %s", result, status)
620 1
        return JSONResponse(result, status_code=status)
621
622
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
623
    @validate_openapi(spec)
624
    def update_schedule(self, request: Request) -> JSONResponse:
625
        """Update a schedule.
626
627
        Change all attributes from the given schedule from a EVC circuit.
628
        The schedule ID is preserved as default.
629
        Payload example:
630
            {
631
              "date": "2019-08-07T14:52:10.967Z",
632
              "interval": "string",
633 1
              "frequency": "1 * * *",
634 1
              "action": "create"
635 1
            }
636
        """
637
        data = get_json_or_400(request, self.controller.loop)
638 1
        schedule_id = request.path_params["schedule_id"]
639
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
640
641 1
        # Try to find a circuit schedule
642 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
643 1
644 1
        # Can not modify circuits deleted and archived
645 1
        if not found_schedule:
646 1
            result = f"schedule_id {schedule_id} not found"
647 1
            log.debug("update_schedule result %s %s", result, 404)
648 1
            raise HTTPException(404, detail=result)
649
        if evc.archived:
650 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
651 1
            log.debug("update_schedule result %s %s", result, 409)
652
            raise HTTPException(409, detail=result)
653 1
654
        new_schedule = CircuitSchedule.from_dict(data)
655 1
        new_schedule.id = found_schedule.id
656
        # Remove the old schedule
657
        evc.circuit_scheduler.remove(found_schedule)
658 1
        # Append the modified schedule
659
        evc.circuit_scheduler.append(new_schedule)
660 1
661
        # Cancel all schedule jobs
662 1
        self.sched.cancel_job(found_schedule.id)
663
        # Add the new circuit schedule
664 1
        self.sched.add_circuit_job(evc, new_schedule)
665 1
        # Save EVC to mongodb
666
        evc.sync()
667 1
668 1
        result = new_schedule.as_dict()
669
        status = 200
670 1
671 1
        log.debug("update_schedule result %s %s", result, status)
672
        return JSONResponse(result, status_code=status)
673
674
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
675
    def delete_schedule(self, request: Request) -> JSONResponse:
676
        """Remove a circuit schedule.
677
678 1
        Remove the Schedule from EVC.
679 1
        Remove the Schedule from cron job.
680 1
        Save the EVC to the Storehouse.
681
        """
682
        schedule_id = request.path_params["schedule_id"]
683 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
684 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
685 1
686 1
        # Can not modify circuits deleted and archived
687
        if not found_schedule:
688 1
            result = f"schedule_id {schedule_id} not found"
689 1
            log.debug("delete_schedule result %s %s", result, 404)
690 1
            raise HTTPException(404, detail=result)
691 1
692
        if evc.archived:
693
            result = f"Circuit {evc.id} is archived. Update is forbidden."
694 1
            log.debug("delete_schedule result %s %s", result, 409)
695
            raise HTTPException(409, detail=result)
696
697 1
        # Remove the old schedule
698
        evc.circuit_scheduler.remove(found_schedule)
699 1
700
        # Cancel all schedule jobs
701 1
        self.sched.cancel_job(found_schedule.id)
702 1
        # Save EVC to mongodb
703
        evc.sync()
704 1
705 1
        result = "Schedule removed"
706
        status = 200
707 1
708
        log.debug("delete_schedule result %s %s", result, status)
709
        return JSONResponse(result, status_code=status)
710
711
    @listen_to("kytos/topology.link_up")
712
    def on_link_up(self, event):
713
        """Change circuit when link is up or end_maintenance."""
714
        self.handle_link_up(event)
715
716
    def handle_link_up(self, event):
717 1
        """Change circuit when link is up or end_maintenance."""
718 1
        log.info("Event handle_link_up %s", event.content["link"])
719 1
        for evc in self.get_evcs_by_svc_level():
720 1
            if evc.is_enabled() and not evc.archived:
721
                with evc.lock:
722 1
                    evc.handle_link_up(event.content["link"])
723 1
724
    @listen_to("kytos/topology.updated")
725
    def on_topology_update(self, event):
726
        """Capture topology update event"""
727 1
        self.handle_topology_update(event)
728
729 1
    def handle_topology_update(self, event):
730 1
        """Handle topology update"""
731 1
        with self._lock:
732 1
            if (
733 1
                self._topology_updated_at
734
                and self._topology_updated_at > event.timestamp
735 1
            ):
736 1
                return
737
            self._topology_updated_at = event.timestamp
738
            for evc in self.get_evcs_by_svc_level():
739
                if evc.is_enabled() and not evc.archived:
740 1
                    with evc.lock:
741
                        evc.handle_topology_update(
742 1
                            event.content["topology"].switches
743 1
                        )
744
745
    @listen_to("kytos/topology.link_down")
746
    def on_link_down(self, event):
747
        """Change circuit when link is down or under_mantenance."""
748 1
        self.handle_link_down(event)
749 1
750 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
751 1
        """Change circuit when link is down or under_mantenance."""
752 1
        link = event.content["link"]
753
        log.info("Event handle_link_down %s", link)
754
        switch_flows = {}
755
        evcs_with_failover = []
756 1
        evcs_normal = []
757 1
        check_failover = []
758
        for evc in self.get_evcs_by_svc_level():
759
            if evc.is_affected_by_link(link):
760
                # if there is no failover path, handles link down the
761 1
                # tradditional way
762
                if (
763 1
                    not getattr(evc, 'failover_path', None) or
764 1
                    evc.is_failover_path_affected_by_link(link)
765 1
                ):
766 1
                    evcs_normal.append(evc)
767 1
                    continue
768 1
                try:
769 1
                    dpid_flows = evc.get_failover_flows()
770 1
                # pylint: disable=broad-except
771
                except Exception:
772
                    err = traceback.format_exc().replace("\n", ", ")
773 1
                    log.error(
774
                        f"Ignore Failover path for {evc} due to error: {err}"
775
                    )
776
                    evcs_normal.append(evc)
777 1
                    continue
778 1
                for dpid, flows in dpid_flows.items():
779 1
                    switch_flows.setdefault(dpid, [])
780 1
                    switch_flows[dpid].extend(flows)
781
                evcs_with_failover.append(evc)
782 1
            else:
783 1
                check_failover.append(evc)
784 1
785
        while switch_flows:
786
            offset = settings.BATCH_SIZE or None
787 1
            switches = list(switch_flows.keys())
788 1
            for dpid in switches:
789 1
                emit_event(
790 1
                    self.controller,
791 1
                    context="kytos.flow_manager",
792 1
                    name="flows.install",
793
                    content={
794 1
                        "dpid": dpid,
795
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
796 1
                    }
797 1
                )
798 1
                if offset is None or offset >= len(switch_flows[dpid]):
799 1
                    del switch_flows[dpid]
800 1
                    continue
801
                switch_flows[dpid] = switch_flows[dpid][offset:]
802
            time.sleep(settings.BATCH_INTERVAL)
803
804
        for evc in evcs_with_failover:
805
            with evc.lock:
806
                old_path = evc.current_path
807
                evc.current_path = evc.failover_path
808
                evc.failover_path = old_path
809 1
                evc.sync()
810 1
            emit_event(self.controller, "redeployed_link_down",
811 1
                       content=map_evc_event_content(evc))
812 1
            log.info(
813 1
                f"{evc} redeployed with failover due to link down {link.id}"
814
            )
815 1
816 1
        for evc in evcs_normal:
817 1
            emit_event(
818 1
                self.controller,
819 1
                "evc_affected_by_link_down",
820 1
                content={"link_id": link.id} | map_evc_event_content(evc)
821 1
            )
822
823 1
        # After handling the hot path, check if new failover paths are needed.
824
        # Note that EVCs affected by link down will generate a KytosEvent for
825
        # deployed|redeployed, which will trigger the failover path setup.
826
        # Thus, we just need to further check the check_failover list
827 1
        for evc in check_failover:
828 1
            if evc.is_failover_path_affected_by_link(link):
829
                with evc.lock:
830
                    evc.setup_failover_path()
831
832
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
833
    def on_evc_affected_by_link_down(self, event):
834
        """Change circuit when link is down or under_mantenance."""
835
        self.handle_evc_affected_by_link_down(event)
836
837
    def handle_evc_affected_by_link_down(self, event):
838 1
        """Change circuit when link is down or under_mantenance."""
839 1
        evc = self.circuits.get(event.content["evc_id"])
840 1
        link_id = event.content['link_id']
841 1
        if not evc:
842
            return
843 1
        with evc.lock:
844 1
            result = evc.handle_link_down()
845
        event_name = "error_redeploy_link_down"
846
        if result:
847
            log.info(f"{evc} redeployed due to link down {link_id}")
848 1
            event_name = "redeployed_link_down"
849
        emit_event(self.controller, event_name,
850 1
                   content=map_evc_event_content(evc))
851 1
852 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
853 1
    def on_evc_deployed(self, event):
854 1
        """Handle EVC deployed|redeployed_link_down."""
855 1
        self.handle_evc_deployed(event)
856 1
857 1
    def handle_evc_deployed(self, event):
858 1
        """Setup failover path on evc deployed."""
859 1
        evc = self.circuits.get(event.content["evc_id"])
860 1
        if not evc:
861
            return
862
        with evc.lock:
863 1
            evc.setup_failover_path()
864 1
865
    @listen_to("kytos/topology.topology_loaded")
866
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
867
        """Load EVCs once the topology is available."""
868 1
        self.load_all_evcs()
869
870 1
    def load_all_evcs(self):
871 1
        """Try to load all EVCs on startup."""
872 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
873 1
        for circuit_id, circuit in circuits:
874 1
            if circuit_id not in self.circuits:
875
                self._load_evc(circuit)
876 1
877 1
    def _load_evc(self, circuit_dict):
878
        """Load one EVC from mongodb to memory."""
879
        try:
880
            evc = self._evc_from_dict(circuit_dict)
881 1
        except ValueError as exception:
882
            log.error(
883 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
884 1
            )
885 1
            return None
886 1
        except KytosTagError as exception:
887
            log.error(
888 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
889
            )
890 1
891 1
        if evc.archived:
892 1
            return None
893 1
        evc.deactivate()
894
        evc.sync()
895
        self.circuits.setdefault(evc.id, evc)
896 1
        self.sched.add(evc)
897
        return evc
898 1
899 1
    @listen_to("kytos/flow_manager.flow.error")
900 1
    def on_flow_mod_error(self, event):
901 1
        """Handle flow mod errors related to an EVC."""
902 1
        self.handle_flow_mod_error(event)
903 1
904 1
    def handle_flow_mod_error(self, event):
905
        """Handle flow mod errors related to an EVC."""
906 1
        flow = event.content["flow"]
907 1
        command = event.content.get("error_command")
908
        if command != "add":
909
            return
910
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
911 1
        if evc:
912
            with evc.lock:
913 1
                evc.remove_current_flows()
914 1
915 1
    def _evc_dict_with_instances(self, evc_dict):
916
        """Convert some dict values to instance of EVC classes.
917 1
918 1
        This method will convert: [UNI, Link]
919 1
        """
920 1
        data = evc_dict.copy()  # Do not modify the original dict
921
        for attribute, value in data.items():
922 1
            # Get multiple attributes.
923
            # Ex: uni_a, uni_z
924
            if "uni" in attribute:
925
                try:
926
                    data[attribute] = self._uni_from_dict(value)
927 1
                except ValueError as exception:
928 1
                    result = "Error creating UNI: Invalid value"
929
                    raise ValueError(result) from exception
930
931 1
            if attribute == "circuit_scheduler":
932 1
                data[attribute] = []
933 1
                for schedule in value:
934 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
935 1
936 1
            # Get multiple attributes.
937
            # Ex: primary_links,
938 1
            #     backup_links,
939 1
            #     current_links_cache,
940 1
            #     primary_links_cache,
941 1
            #     backup_links_cache
942
            if "links" in attribute:
943
                data[attribute] = [
944
                    self._link_from_dict(link) for link in value
945
                ]
946
947
            # Ex: current_path,
948
            #     primary_path,
949 1
            #     backup_path
950 1
            if "path" in attribute and attribute != "dynamic_backup_path":
951
                data[attribute] = Path(
952
                    [self._link_from_dict(link) for link in value]
953
                )
954
955
        return data
956
957 1
    def _evc_from_dict(self, evc_dict):
958 1
        data = self._evc_dict_with_instances(evc_dict)
959
        data["table_group"] = self.table_group
960
        return EVC(self.controller, **data)
961
962 1
    def _uni_from_dict(self, uni_dict):
963
        """Return a UNI object from python dict."""
964 1
        if uni_dict is None:
965 1
            return False
966 1
967 1
        interface_id = uni_dict.get("interface_id")
968
        interface = self.controller.get_interface_by_id(interface_id)
969 1
        if interface is None:
970
            result = (
971 1
                "Error creating UNI:"
972 1
                + f"Could not instantiate interface {interface_id}"
973
            )
974 1
            raise ValueError(result) from ValueError
975 1
        tag_convert = {1: "vlan"}
976 1
        tag_dict = uni_dict.get("tag", None)
977 1
        if tag_dict:
978
            tag_type = tag_dict.get("tag_type")
979
            tag_type = tag_convert.get(tag_type, tag_type)
980
            tag_value = tag_dict.get("value")
981 1
            if isinstance(tag_value, list):
982 1
                tag_value = get_tag_ranges(tag_value)
983 1
                mask_list = get_vlan_tags_and_masks(tag_value)
984 1
                tag = TAGRange(tag_type, tag_value, mask_list)
985 1
            else:
986 1
                tag = TAG(tag_type, tag_value)
987 1
        else:
988 1
            tag = None
989
        uni = UNI(interface, tag)
990 1
991 1
        return uni
992
993 1
    def _link_from_dict(self, link_dict):
994
        """Return a Link object from python dict."""
995 1
        id_a = link_dict.get("endpoint_a").get("id")
996
        id_b = link_dict.get("endpoint_b").get("id")
997 1
998 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
999
        endpoint_b = self.controller.get_interface_by_id(id_b)
1000 1
        if not endpoint_a:
1001 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1002 1
            raise ValueError(error_msg)
1003 1
        if not endpoint_b:
1004 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1005 1
            raise ValueError(error_msg)
1006
1007
        link = Link(endpoint_a, endpoint_b)
1008
        if "metadata" in link_dict:
1009 1
            link.extend_metadata(link_dict.get("metadata"))
1010 1
1011 1
        s_vlan = link.get_metadata("s_vlan")
1012
        if s_vlan:
1013 1
            tag = TAG.from_dict(s_vlan)
1014 1
            if tag is False:
1015 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1016 1
                raise ValueError(error_msg)
1017
            link.update_metadata("s_vlan", tag)
1018
        return link
1019 1
1020 1
    def _find_evc_by_schedule_id(self, schedule_id):
1021
        """
1022 1
        Find an EVC and CircuitSchedule based on schedule_id.
1023
1024
        :param schedule_id: Schedule ID
1025
        :return: EVC and Schedule
1026
        """
1027
        circuits = self._get_circuits_buffer()
1028
        found_schedule = None
1029 1
        evc = None
1030 1
1031 1
        # pylint: disable=unused-variable
1032
        for c_id, circuit in circuits.items():
1033
            for schedule in circuit.circuit_scheduler:
1034 1
                if schedule.id == schedule_id:
1035 1
                    found_schedule = schedule
1036 1
                    evc = circuit
1037 1
                    break
1038 1
            if found_schedule:
1039 1
                break
1040 1
        return evc, found_schedule
1041 1
1042 1
    def _get_circuits_buffer(self):
1043
        """
1044 1
        Return the circuit buffer.
1045
1046
        If the buffer is empty, try to load data from mongodb.
1047
        """
1048
        if not self.circuits:
1049
            # Load circuits from mongodb to buffer
1050 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1051
            for c_id, circuit in circuits.items():
1052 1
                evc = self._evc_from_dict(circuit)
1053 1
                self.circuits[c_id] = evc
1054 1
        return self.circuits
1055 1
1056 1
    # pylint: disable=attribute-defined-outside-init
1057
    @alisten_to("kytos/of_multi_table.enable_table")
1058
    async def on_table_enabled(self, event):
1059 1
        """Handle a recently table enabled."""
1060 1
        table_group = event.content.get("mef_eline", None)
1061
        if not table_group:
1062 1
            return
1063 1
        for group in table_group:
1064
            if group not in settings.TABLE_GROUP_ALLOWED:
1065 1
                log.error(f'The table group "{group}" is not allowed for '
1066 1
                          f'mef_eline. Allowed table groups are '
1067 1
                          f'{settings.TABLE_GROUP_ALLOWED}')
1068
                return
1069
        self.table_group.update(table_group)
1070 1
        content = {"group_table": self.table_group}
1071 1
        name = "kytos/mef_eline.enable_table"
1072
        await aemit_event(self.controller, name, content)
1073