Test Failed
Pull Request — master (#396)
by
unknown
03:36
created

build.main.Main._uni_from_dict()   B

Complexity

Conditions 5

Size

Total Lines 29
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 20
CRAP Score 5.0187

Importance

Changes 0
Metric Value
cc 5
eloc 24
nop 2
dl 0
loc 29
ccs 20
cts 22
cp 0.9091
crap 5.0187
rs 8.8373
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.exceptions import KytosTagError
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI, TAGRange
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from kytos.core.tag_ranges import get_tag_ranges
22 1
from napps.kytos.mef_eline import controllers, settings
23
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
24 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
25 1
                                          Path)
26
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
27
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
28
                                         emit_event, get_vlan_tags_and_masks,
29
                                         map_evc_event_content)
30 1
31
32
# pylint: disable=too-many-public-methods
33
class Main(KytosNApp):
34
    """Main class of amlight/mef_eline NApp.
35
36 1
    This class is the entry point for this napp.
37
    """
38 1
39
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47 1
        So, if you have any setup routine, insert it here.
48
        """
49
        # object used to scheduler circuit events
50 1
        self.sched = Scheduler()
51 1
52
        # object to save and load circuits
53
        self.mongo_controller = self.get_eline_controller()
54 1
        self.mongo_controller.bootstrap_indexes()
55
56
        # set the controller that will manager the dynamic paths
57
        DynamicPathManager.set_controller(self.controller)
58 1
59
        # dictionary of EVCs created. It acts as a circuit buffer.
60 1
        # Every create/update/delete must be synced to mongodb.
61 1
        self.circuits = {}
62 1
63
        self.table_group = {"epl": 0, "evpl": 0}
64 1
        self._lock = Lock()
65 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
66
67 1
        self.load_all_evcs()
68
        self._topology_updated_at = None
69
70
    def get_evcs_by_svc_level(self) -> list:
71
        """Get circuits sorted by desc service level and asc creation_time.
72 1
73
        In the future, as more ops are offloaded it should be get from the DB.
74
        """
75 1
        return sorted(self.circuits.values(),
76 1
                      key=lambda x: (-x.service_level, x.creation_time))
77
78
    @staticmethod
79
    def get_eline_controller():
80 1
        """Return the ELineController instance."""
81
        return controllers.ELineController()
82 1
83 1
    def execute(self):
84 1
        """Execute once when the napp is running."""
85 1
        if self._lock.locked():
86 1
            return
87 1
        log.debug("Starting consistency routine")
88
        with self._lock:
89 1
            self.execute_consistency()
90
        log.debug("Finished consistency routine")
91
92 1
    def should_be_checked(self, circuit):
93
        "Verify if the circuit meets the necessary conditions to be checked"
94
        # pylint: disable=too-many-boolean-expressions
95
        if (
96
                circuit.is_enabled()
97
                and not circuit.is_active()
98
                and not circuit.lock.locked()
99
                and not circuit.has_recent_removed_flow()
100
                and not circuit.is_recent_updated()
101
                and circuit.are_unis_active(self.controller.switches)
102
                # if a inter-switch EVC does not have current_path, it does not
103 1
                # make sense to run sdntrace on it
104
                and (circuit.is_intra_switch() or circuit.current_path)
105
                ):
106 1
            return True
107
        return False
108 1
109 1
    def execute_consistency(self):
110 1
        """Execute consistency routine."""
111 1
        circuits_to_check = []
112 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
113 1
        for circuit in self.get_evcs_by_svc_level():
114 1
            stored_circuits.pop(circuit.id, None)
115 1
            if self.should_be_checked(circuit):
116 1
                circuits_to_check.append(circuit)
117 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
118 1
        for circuit in circuits_to_check:
119 1
            is_checked = circuits_checked.get(circuit.id)
120 1
            if is_checked:
121 1
                circuit.execution_rounds = 0
122 1
                log.info(f"{circuit} enabled but inactive - activating")
123
                with circuit.lock:
124 1
                    circuit.activate()
125 1
                    circuit.sync()
126 1
            else:
127 1
                circuit.execution_rounds += 1
128 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
129 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
130 1
                    with circuit.lock:
131 1
                        circuit.deploy()
132
        for circuit_id in stored_circuits:
133 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
134
            self._load_evc(stored_circuits[circuit_id])
135
136
    def shutdown(self):
137
        """Execute when your napp is unloaded.
138
139 1
        If you have some cleanup procedure, insert it here.
140 1
        """
141
142
    @rest("/v2/evc/", methods=["GET"])
143
    def list_circuits(self, request: Request) -> JSONResponse:
144
        """Endpoint to return circuits stored.
145
146 1
        archive query arg if defined (not null) will be filtered
147 1
        accordingly, by default only non archived evcs will be listed
148 1
        """
149 1
        log.debug("list_circuits /v2/evc")
150 1
        args = request.query_params
151
        archived = args.get("archived", "false").lower()
152 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
153 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
154
                                                      metadata=args)
155 1
        circuits = circuits['circuits']
156 1
        return JSONResponse(circuits)
157
158
    @rest("/v2/evc/schedule", methods=["GET"])
159
    def list_schedules(self, _request: Request) -> JSONResponse:
160
        """Endpoint to return all schedules stored for all circuits.
161
162
        Return a JSON with the following template:
163
        [{"schedule_id": <schedule_id>,
164 1
         "circuit_id": <circuit_id>,
165 1
         "schedule": <schedule object>}]
166 1
        """
167 1
        log.debug("list_schedules /v2/evc/schedule")
168 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
169 1
        if not circuits:
170
            result = {}
171 1
            status = 200
172 1
            return JSONResponse(result, status_code=status)
173 1
174 1
        result = []
175 1
        status = 200
176 1
        for circuit in circuits:
177 1
            circuit_scheduler = circuit.get("circuit_scheduler")
178
            if circuit_scheduler:
179
                for scheduler in circuit_scheduler:
180
                    value = {
181
                        "schedule_id": scheduler.get("id"),
182 1
                        "circuit_id": circuit.get("id"),
183
                        "schedule": scheduler,
184 1
                    }
185 1
                    result.append(value)
186
187 1
        log.debug("list_schedules result %s %s", result, status)
188 1
        return JSONResponse(result, status_code=status)
189
190 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
191 1
    def get_circuit(self, request: Request) -> JSONResponse:
192 1
        """Endpoint to return a circuit based on id."""
193 1
        circuit_id = request.path_params["circuit_id"]
194 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
195 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
196 1
        if not circuit:
197 1
            result = f"circuit_id {circuit_id} not found"
198 1
            log.debug("get_circuit result %s %s", result, 404)
199 1
            raise HTTPException(404, detail=result)
200
        status = 200
201 1
        log.debug("get_circuit result %s %s", circuit, status)
202 1
        return JSONResponse(circuit, status_code=status)
203 1
204
    @rest("/v2/evc/", methods=["POST"])
205
    @validate_openapi(spec)
206
    def create_circuit(self, request: Request) -> JSONResponse:
207
        """Try to create a new circuit.
208
209
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
210
        UNI_Z's requested C-VID are available from the interfaces' pools. This
211
        is checked when creating the UNI object.
212
213
        Then, E-Line NApp requests a primary and a backup path to the
214
        Pathfinder NApp using the attributes primary_links and backup_links
215
        submitted via REST
216
217
        # For each link composing paths in #3:
218
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
219
        #  - Using the S-VID obtained, generate abstract flow entries to be
220
        #    sent to FlowManager
221
222
        Push abstract flow entries to FlowManager and FlowManager pushes
223
        OpenFlow entries to datapaths
224
225
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
226
        creation
227
228 1
        Finnaly, notify user of the status of its request.
229 1
        """
230
        # Try to create the circuit object
231 1
        log.debug("create_circuit /v2/evc/")
232 1
        data = get_json_or_400(request, self.controller.loop)
233 1
234 1
        try:
235 1
            evc = self._evc_from_dict(data)
236
        except ValueError as exception:
237 1
            log.debug("create_circuit result %s %s", exception, 400)
238 1
            raise HTTPException(400, detail=str(exception)) from exception
239 1
        except KytosTagError as exception:
240 1
            log.debug("create_circuit result %s %s", exception, 400)
241 1
            raise HTTPException(400, detail=str(exception)) from exception
242
        try:
243
            check_disabled_component(evc.uni_a, evc.uni_z)
244
        except DisabledSwitch as exception:
245
            log.debug("create_circuit result %s %s", exception, 409)
246 1
            raise HTTPException(
247
                    409,
248
                    detail=f"Path is not valid: {exception}"
249
                ) from exception
250
251
        if evc.primary_path:
252
            try:
253
                evc.primary_path.is_valid(
254
                    evc.uni_a.interface.switch,
255
                    evc.uni_z.interface.switch,
256
                    bool(evc.circuit_scheduler),
257
                )
258 1
            except InvalidPath as exception:
259
                raise HTTPException(
260
                    400,
261
                    detail=f"primary_path is not valid: {exception}"
262
                ) from exception
263
        if evc.backup_path:
264
            try:
265
                evc.backup_path.is_valid(
266
                    evc.uni_a.interface.switch,
267
                    evc.uni_z.interface.switch,
268
                    bool(evc.circuit_scheduler),
269
                )
270
            except InvalidPath as exception:
271
                raise HTTPException(
272 1
                    400,
273 1
                    detail=f"backup_path is not valid: {exception}"
274 1
                ) from exception
275 1
276
        if self._is_duplicated_evc(evc):
277 1
            result = "The EVC already exists."
278 1
            log.debug("create_circuit result %s %s", result, 409)
279 1
            raise HTTPException(409, detail=result)
280 1
281
        if not evc._tag_lists_equal():
282 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
283 1
            raise HTTPException(400, detail=detail)
284
285
        try:
286
            evc._validate_has_primary_or_dynamic()
287
        except ValueError as exception:
288 1
            raise HTTPException(400, detail=str(exception)) from exception
289 1
290
        try:
291
            self._use_uni_tags(evc)
292
        except KytosTagError as exception:
293
            raise HTTPException(400, detail=str(exception)) from exception
294 1
295
        # save circuit
296
        try:
297 1
            evc.sync()
298
        except ValidationError as exception:
299
            raise HTTPException(400, detail=str(exception)) from exception
300 1
301 1
        # store circuit in dictionary
302 1
        self.circuits[evc.id] = evc
303
304
        # Schedule the circuit deploy
305 1
        self.sched.add(evc)
306 1
307 1
        # Circuit has no schedule, deploy now
308 1
        if not evc.circuit_scheduler:
309
            with evc.lock:
310 1
                evc.deploy()
311
312 1
        # Notify users
313 1
        result = {"circuit_id": evc.id}
314 1
        status = 201
315 1
        log.debug("create_circuit result %s %s", result, status)
316 1
        emit_event(self.controller, name="created",
317 1
                   content=map_evc_event_content(evc))
318 1
        return JSONResponse(result, status_code=status)
319 1
320 1
    @staticmethod
321 1
    def _use_uni_tags(evc):
322 1
        uni_a = evc.uni_a
323 1
        evc._use_uni_vlan(uni_a)
324 1
        try:
325
            uni_z = evc.uni_z
326 1
            evc._use_uni_vlan(uni_z)
327 1
        except KytosTagError as err:
328
            evc.make_uni_vlan_available(uni_a)
329
            raise err
330
331 1
    @listen_to('kytos/flow_manager.flow.removed')
332
    def on_flow_delete(self, event):
333 1
        """Capture delete messages to keep track when flows got removed."""
334 1
        self.handle_flow_delete(event)
335 1
336 1
    def handle_flow_delete(self, event):
337 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
338
        flow = event.content["flow"]
339 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
340 1
        if evc:
341 1
            log.debug("Flow removed in EVC %s", evc.id)
342
            evc.set_flow_removed_at()
343
344
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
345
    @validate_openapi(spec)
346
    def update(self, request: Request) -> JSONResponse:
347 1
        """Update a circuit based on payload.
348 1
349 1
        The EVC attributes (creation_time, active, current_path,
350 1
        failover_path, _id, archived) can't be updated.
351 1
        """
352 1
        data = get_json_or_400(request, self.controller.loop)
353 1
        circuit_id = request.path_params["circuit_id"]
354 1
        log.debug("update /v2/evc/%s", circuit_id)
355 1
        try:
356
            evc = self.circuits[circuit_id]
357 1
        except KeyError:
358 1
            result = f"circuit_id {circuit_id} not found"
359 1
            log.debug("update result %s %s", result, 404)
360 1
            raise HTTPException(404, detail=result) from KeyError
361
362 1
        if evc.archived:
363 1
            result = "Can't update archived EVC"
364
            log.debug("update result %s %s", result, 409)
365
            raise HTTPException(409, detail=result)
366 1
367
        try:
368 1
            enable, redeploy = evc.update(
369 1
                **self._evc_dict_with_instances(data)
370 1
            )
371 1
        except KytosTagError as exception:
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except ValidationError as exception:
374 1
            raise HTTPException(400, detail=str(exception)) from exception
375
        except ValueError as exception:
376
            log.debug("update result %s %s", exception, 400)
377
            raise HTTPException(400, detail=str(exception)) from exception
378
        except DisabledSwitch as exception:
379 1
            log.debug("update result %s %s", exception, 409)
380
            raise HTTPException(
381
                    409,
382
                    detail=f"Path is not valid: {exception}"
383
                ) from exception
384
385
        if evc.is_active():
386
            if enable is False:  # disable if active
387
                with evc.lock:
388 1
                    evc.remove()
389 1
            elif redeploy is not None:  # redeploy if active
390 1
                with evc.lock:
391 1
                    evc.remove()
392 1
                    evc.deploy()
393
        else:
394 1
            if enable is True:  # enable if inactive
395 1
                with evc.lock:
396
                    evc.deploy()
397 1
        result = {evc.id: evc.as_dict()}
398
        status = 200
399 1
400 1
        log.debug("update result %s %s", result, status)
401
        emit_event(self.controller, "updated",
402
                   content=map_evc_event_content(evc, **data))
403
        return JSONResponse(result, status_code=status)
404
405
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
406 1
    def delete_circuit(self, request: Request) -> JSONResponse:
407 1
        """Remove a circuit.
408 1
409 1
        First, the flows are removed from the switches, and then the EVC is
410 1
        disabled.
411 1
        """
412 1
        circuit_id = request.path_params["circuit_id"]
413 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
414
        try:
415 1
            evc = self.circuits[circuit_id]
416 1
        except KeyError:
417 1
            result = f"circuit_id {circuit_id} not found"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419
            raise HTTPException(404, detail=result) from KeyError
420 1
421 1
        if evc.archived:
422 1
            result = f"Circuit {circuit_id} already removed"
423 1
            log.debug("delete_circuit result %s %s", result, 404)
424 1
            raise HTTPException(404, detail=result)
425 1
426 1
        log.info("Removing %s", evc)
427 1
        with evc.lock:
428 1
            evc.remove_current_flows()
429 1
            evc.remove_failover_flows(sync=False)
430 1
            evc.deactivate()
431 1
            evc.disable()
432 1
            self.sched.remove(evc)
433
            evc.archive()
434 1
            evc.remove_uni_tags()
435 1
            evc.sync()
436
        log.info("EVC removed. %s", evc)
437 1
        result = {"response": f"Circuit {circuit_id} removed"}
438
        status = 200
439 1
440 1
        log.debug("delete_circuit result %s %s", result, status)
441
        emit_event(self.controller, "deleted",
442 1
                   content=map_evc_event_content(evc))
443 1
        return JSONResponse(result, status_code=status)
444 1
445
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
446
    def get_metadata(self, request: Request) -> JSONResponse:
447
        """Get metadata from an EVC."""
448
        circuit_id = request.path_params["circuit_id"]
449
        try:
450
            return (
451
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
452
            )
453 1
        except KeyError as error:
454 1
            raise HTTPException(
455 1
                404,
456
                detail=f"circuit_id {circuit_id} not found."
457 1
            ) from error
458 1
459 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460 1
    @validate_openapi(spec)
461
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
462 1
        """Add metadata to a bulk of EVCs."""
463 1
        data = get_json_or_400(request, self.controller.loop)
464 1
        circuit_ids = data.pop("circuit_ids")
465 1
466 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
467 1
468 1
        fail_evcs = []
469
        for _id in circuit_ids:
470 1
            try:
471 1
                evc = self.circuits[_id]
472 1
                evc.extend_metadata(data)
473
            except KeyError:
474 1
                fail_evcs.append(_id)
475 1
476 1
        if fail_evcs:
477
            raise HTTPException(404, detail=fail_evcs)
478 1
        return JSONResponse("Operation successful", status_code=201)
479 1
480 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
481
    @validate_openapi(spec)
482 1
    def add_metadata(self, request: Request) -> JSONResponse:
483 1
        """Add metadata to an EVC."""
484 1
        circuit_id = request.path_params["circuit_id"]
485 1
        metadata = get_json_or_400(request, self.controller.loop)
486
        if not isinstance(metadata, dict):
487
            raise HTTPException(400, "Invalid metadata value: {metadata}")
488
        try:
489
            evc = self.circuits[circuit_id]
490 1
        except KeyError as error:
491 1
            raise HTTPException(
492 1
                404,
493
                detail=f"circuit_id {circuit_id} not found."
494 1
            ) from error
495 1
496 1
        evc.extend_metadata(metadata)
497
        evc.sync()
498 1
        return JSONResponse("Operation successful", status_code=201)
499 1
500 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
501 1
    @validate_openapi(spec)
502
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
503 1
        """Delete metada from a bulk of EVCs"""
504 1
        data = get_json_or_400(request, self.controller.loop)
505 1
        key = request.path_params["key"]
506 1
        circuit_ids = data.pop("circuit_ids")
507 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
508
509
        fail_evcs = []
510
        for _id in circuit_ids:
511 1
            try:
512
                evc = self.circuits[_id]
513 1
                evc.remove_metadata(key)
514
            except KeyError:
515 1
                fail_evcs.append(_id)
516 1
517
        if fail_evcs:
518 1
            raise HTTPException(404, detail=fail_evcs)
519 1
        return JSONResponse("Operation successful")
520 1
521 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
522 1
    def delete_metadata(self, request: Request) -> JSONResponse:
523 1
        """Delete metadata from an EVC."""
524
        circuit_id = request.path_params["circuit_id"]
525
        key = request.path_params["key"]
526
        try:
527
            evc = self.circuits[circuit_id]
528 1
        except KeyError as error:
529 1
            raise HTTPException(
530 1
                404,
531
                detail=f"circuit_id {circuit_id} not found."
532 1
            ) from error
533 1
534
        evc.remove_metadata(key)
535 1
        evc.sync()
536 1
        return JSONResponse("Operation successful")
537 1
538 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
539 1
    def redeploy(self, request: Request) -> JSONResponse:
540 1
        """Endpoint to force the redeployment of an EVC."""
541
        circuit_id = request.path_params["circuit_id"]
542
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
543
        try:
544 1
            evc = self.circuits[circuit_id]
545 1
        except KeyError:
546 1
            raise HTTPException(
547 1
                404,
548 1
                detail=f"circuit_id {circuit_id} not found"
549 1
            ) from KeyError
550
        if evc.is_enabled():
551 1
            with evc.lock:
552 1
                evc.remove_current_flows()
553
                evc.deploy()
554 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
555
            status = 202
556 1
        else:
557 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
558 1
            status = 409
559
560
        return JSONResponse(result, status_code=status)
561
562
    @rest("/v2/evc/schedule/", methods=["POST"])
563
    @validate_openapi(spec)
564
    def create_schedule(self, request: Request) -> JSONResponse:
565
        """
566
        Create a new schedule for a given circuit.
567
568
        This service do no check if there are conflicts with another schedule.
569
        Payload example:
570
            {
571
              "circuit_id":"aa:bb:cc",
572
              "schedule": {
573
                "date": "2019-08-07T14:52:10.967Z",
574 1
                "interval": "string",
575 1
                "frequency": "1 * * * *",
576 1
                "action": "create"
577 1
              }
578
            }
579
        """
580 1
        log.debug("create_schedule /v2/evc/schedule/")
581
        data = get_json_or_400(request, self.controller.loop)
582
        circuit_id = data["circuit_id"]
583 1
        schedule_data = data["schedule"]
584
585
        # Get EVC from circuits buffer
586 1
        circuits = self._get_circuits_buffer()
587 1
588 1
        # get the circuit
589 1
        evc = circuits.get(circuit_id)
590
591 1
        # get the circuit
592 1
        if not evc:
593 1
            result = f"circuit_id {circuit_id} not found"
594 1
            log.debug("create_schedule result %s %s", result, 404)
595
            raise HTTPException(404, detail=result)
596
        # Can not modify circuits deleted and archived
597 1
        if evc.archived:
598
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
599
            log.debug("create_schedule result %s %s", result, 409)
600 1
            raise HTTPException(409, detail=result)
601 1
602
        # new schedule from dict
603
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604 1
605
        # If there is no schedule, create the list
606
        if not evc.circuit_scheduler:
607 1
            evc.circuit_scheduler = []
608
609
        # Add the new schedule
610 1
        evc.circuit_scheduler.append(new_schedule)
611
612 1
        # Add schedule job
613 1
        self.sched.add_circuit_job(evc, new_schedule)
614
615 1
        # save circuit to mongodb
616 1
        evc.sync()
617
618 1
        result = new_schedule.as_dict()
619 1
        status = 201
620 1
621
        log.debug("create_schedule result %s %s", result, status)
622
        return JSONResponse(result, status_code=status)
623
624
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625
    @validate_openapi(spec)
626
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629
        Change all attributes from the given schedule from a EVC circuit.
630
        The schedule ID is preserved as default.
631
        Payload example:
632
            {
633 1
              "date": "2019-08-07T14:52:10.967Z",
634 1
              "interval": "string",
635 1
              "frequency": "1 * * *",
636
              "action": "create"
637
            }
638 1
        """
639
        data = get_json_or_400(request, self.controller.loop)
640
        schedule_id = request.path_params["schedule_id"]
641 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642 1
643 1
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645 1
646 1
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648 1
            result = f"schedule_id {schedule_id} not found"
649
            log.debug("update_schedule result %s %s", result, 404)
650 1
            raise HTTPException(404, detail=result)
651 1
        if evc.archived:
652
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653 1
            log.debug("update_schedule result %s %s", result, 409)
654
            raise HTTPException(409, detail=result)
655 1
656
        new_schedule = CircuitSchedule.from_dict(data)
657
        new_schedule.id = found_schedule.id
658 1
        # Remove the old schedule
659
        evc.circuit_scheduler.remove(found_schedule)
660 1
        # Append the modified schedule
661
        evc.circuit_scheduler.append(new_schedule)
662 1
663
        # Cancel all schedule jobs
664 1
        self.sched.cancel_job(found_schedule.id)
665 1
        # Add the new circuit schedule
666
        self.sched.add_circuit_job(evc, new_schedule)
667 1
        # Save EVC to mongodb
668 1
        evc.sync()
669
670 1
        result = new_schedule.as_dict()
671 1
        status = 200
672
673
        log.debug("update_schedule result %s %s", result, status)
674
        return JSONResponse(result, status_code=status)
675
676
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
677
    def delete_schedule(self, request: Request) -> JSONResponse:
678 1
        """Remove a circuit schedule.
679 1
680 1
        Remove the Schedule from EVC.
681
        Remove the Schedule from cron job.
682
        Save the EVC to the Storehouse.
683 1
        """
684 1
        schedule_id = request.path_params["schedule_id"]
685 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
686 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
687
688 1
        # Can not modify circuits deleted and archived
689 1
        if not found_schedule:
690 1
            result = f"schedule_id {schedule_id} not found"
691 1
            log.debug("delete_schedule result %s %s", result, 404)
692
            raise HTTPException(404, detail=result)
693
694 1
        if evc.archived:
695
            result = f"Circuit {evc.id} is archived. Update is forbidden."
696
            log.debug("delete_schedule result %s %s", result, 409)
697 1
            raise HTTPException(409, detail=result)
698
699 1
        # Remove the old schedule
700
        evc.circuit_scheduler.remove(found_schedule)
701 1
702 1
        # Cancel all schedule jobs
703
        self.sched.cancel_job(found_schedule.id)
704 1
        # Save EVC to mongodb
705 1
        evc.sync()
706
707 1
        result = "Schedule removed"
708
        status = 200
709
710
        log.debug("delete_schedule result %s %s", result, status)
711
        return JSONResponse(result, status_code=status)
712
713
    def _is_duplicated_evc(self, evc):
714
        """Verify if the circuit given is duplicated with the stored evcs.
715
716
        Args:
717 1
            evc (EVC): circuit to be analysed.
718 1
719 1
        Returns:
720 1
            boolean: True if the circuit is duplicated, otherwise False.
721
722 1
        """
723 1
        for circuit in tuple(self.circuits.values()):
724
            if not circuit.archived and circuit.shares_uni(evc):
725
                return True
726
        return False
727 1
728
    @listen_to("kytos/topology.link_up")
729 1
    def on_link_up(self, event):
730 1
        """Change circuit when link is up or end_maintenance."""
731 1
        self.handle_link_up(event)
732 1
733 1
    def handle_link_up(self, event):
734
        """Change circuit when link is up or end_maintenance."""
735 1
        log.info("Event handle_link_up %s", event.content["link"])
736 1
        for evc in self.get_evcs_by_svc_level():
737
            if evc.is_enabled() and not evc.archived:
738
                with evc.lock:
739
                    evc.handle_link_up(event.content["link"])
740 1
741
    @listen_to("kytos/topology.updated")
742 1
    def on_topology_update(self, event):
743 1
        """Capture topology update event"""
744
        self.handle_topology_update(event)
745
746
    def handle_topology_update(self, event):
747
        """Handle topology update"""
748 1
        with self._lock:
749 1
            if (
750 1
                self._topology_updated_at
751 1
                and self._topology_updated_at > event.timestamp
752 1
            ):
753
                return
754
            self._topology_updated_at = event.timestamp
755
            for evc in self.get_evcs_by_svc_level():
756 1
                if evc.is_enabled() and not evc.archived:
757 1
                    with evc.lock:
758
                        evc.handle_topology_update(
759
                            event.content["topology"].switches
760
                        )
761 1
762
    @listen_to("kytos/topology.link_down")
763 1
    def on_link_down(self, event):
764 1
        """Change circuit when link is down or under_mantenance."""
765 1
        self.handle_link_down(event)
766 1
767 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
768 1
        """Change circuit when link is down or under_mantenance."""
769 1
        link = event.content["link"]
770 1
        log.info("Event handle_link_down %s", link)
771
        switch_flows = {}
772
        evcs_with_failover = []
773 1
        evcs_normal = []
774
        check_failover = []
775
        for evc in self.get_evcs_by_svc_level():
776
            if evc.is_affected_by_link(link):
777 1
                # if there is no failover path, handles link down the
778 1
                # tradditional way
779 1
                if (
780 1
                    not getattr(evc, 'failover_path', None) or
781
                    evc.is_failover_path_affected_by_link(link)
782 1
                ):
783 1
                    evcs_normal.append(evc)
784 1
                    continue
785
                try:
786
                    dpid_flows = evc.get_failover_flows()
787 1
                # pylint: disable=broad-except
788 1
                except Exception:
789 1
                    err = traceback.format_exc().replace("\n", ", ")
790 1
                    log.error(
791 1
                        f"Ignore Failover path for {evc} due to error: {err}"
792 1
                    )
793
                    evcs_normal.append(evc)
794 1
                    continue
795
                for dpid, flows in dpid_flows.items():
796 1
                    switch_flows.setdefault(dpid, [])
797 1
                    switch_flows[dpid].extend(flows)
798 1
                evcs_with_failover.append(evc)
799 1
            else:
800 1
                check_failover.append(evc)
801
802
        while switch_flows:
803
            offset = settings.BATCH_SIZE or None
804
            switches = list(switch_flows.keys())
805
            for dpid in switches:
806
                emit_event(
807
                    self.controller,
808
                    context="kytos.flow_manager",
809 1
                    name="flows.install",
810 1
                    content={
811 1
                        "dpid": dpid,
812 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
813 1
                    }
814
                )
815 1
                if offset is None or offset >= len(switch_flows[dpid]):
816 1
                    del switch_flows[dpid]
817 1
                    continue
818 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
819 1
            time.sleep(settings.BATCH_INTERVAL)
820 1
821 1
        for evc in evcs_with_failover:
822
            with evc.lock:
823 1
                old_path = evc.current_path
824
                evc.current_path = evc.failover_path
825
                evc.failover_path = old_path
826
                evc.sync()
827 1
            emit_event(self.controller, "redeployed_link_down",
828 1
                       content=map_evc_event_content(evc))
829
            log.info(
830
                f"{evc} redeployed with failover due to link down {link.id}"
831
            )
832
833
        for evc in evcs_normal:
834
            emit_event(
835
                self.controller,
836
                "evc_affected_by_link_down",
837
                content={"link_id": link.id} | map_evc_event_content(evc)
838 1
            )
839 1
840 1
        # After handling the hot path, check if new failover paths are needed.
841 1
        # Note that EVCs affected by link down will generate a KytosEvent for
842
        # deployed|redeployed, which will trigger the failover path setup.
843 1
        # Thus, we just need to further check the check_failover list
844 1
        for evc in check_failover:
845
            if evc.is_failover_path_affected_by_link(link):
846
                with evc.lock:
847
                    evc.setup_failover_path()
848 1
849
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
850 1
    def on_evc_affected_by_link_down(self, event):
851 1
        """Change circuit when link is down or under_mantenance."""
852 1
        self.handle_evc_affected_by_link_down(event)
853 1
854 1
    def handle_evc_affected_by_link_down(self, event):
855 1
        """Change circuit when link is down or under_mantenance."""
856 1
        evc = self.circuits.get(event.content["evc_id"])
857 1
        link_id = event.content['link_id']
858 1
        if not evc:
859 1
            return
860 1
        with evc.lock:
861
            result = evc.handle_link_down()
862
        event_name = "error_redeploy_link_down"
863 1
        if result:
864 1
            log.info(f"{evc} redeployed due to link down {link_id}")
865
            event_name = "redeployed_link_down"
866
        emit_event(self.controller, event_name,
867
                   content=map_evc_event_content(evc))
868 1
869
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
870 1
    def on_evc_deployed(self, event):
871 1
        """Handle EVC deployed|redeployed_link_down."""
872 1
        self.handle_evc_deployed(event)
873 1
874 1
    def handle_evc_deployed(self, event):
875
        """Setup failover path on evc deployed."""
876 1
        evc = self.circuits.get(event.content["evc_id"])
877 1
        if not evc:
878
            return
879
        with evc.lock:
880
            evc.setup_failover_path()
881 1
882
    @listen_to("kytos/topology.topology_loaded")
883 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
884 1
        """Load EVCs once the topology is available."""
885 1
        self.load_all_evcs()
886 1
887
    def load_all_evcs(self):
888 1
        """Try to load all EVCs on startup."""
889
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
890 1
        for circuit_id, circuit in circuits:
891 1
            if circuit_id not in self.circuits:
892 1
                self._load_evc(circuit)
893 1
894
    def _load_evc(self, circuit_dict):
895
        """Load one EVC from mongodb to memory."""
896 1
        try:
897
            evc = self._evc_from_dict(circuit_dict)
898 1
        except ValueError as exception:
899 1
            log.error(
900 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
901 1
            )
902 1
            return None
903 1
        except KytosTagError as exception:
904 1
            log.error(
905
                f"Could not load EVC: dict={circuit_dict} error={exception}"
906 1
            )
907 1
908
        if evc.archived:
909
            return None
910
        evc.deactivate()
911 1
        evc.sync()
912
        self.circuits.setdefault(evc.id, evc)
913 1
        self.sched.add(evc)
914 1
        return evc
915 1
916
    @listen_to("kytos/flow_manager.flow.error")
917 1
    def on_flow_mod_error(self, event):
918 1
        """Handle flow mod errors related to an EVC."""
919 1
        self.handle_flow_mod_error(event)
920 1
921
    def handle_flow_mod_error(self, event):
922 1
        """Handle flow mod errors related to an EVC."""
923
        flow = event.content["flow"]
924
        command = event.content.get("error_command")
925
        if command != "add":
926
            return
927 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
928 1
        if evc:
929
            with evc.lock:
930
                evc.remove_current_flows()
931 1
932 1
    def _evc_dict_with_instances(self, evc_dict):
933 1
        """Convert some dict values to instance of EVC classes.
934 1
935 1
        This method will convert: [UNI, Link]
936 1
        """
937
        data = evc_dict.copy()  # Do not modify the original dict
938 1
        for attribute, value in data.items():
939 1
            # Get multiple attributes.
940 1
            # Ex: uni_a, uni_z
941 1
            if "uni" in attribute:
942
                try:
943
                    data[attribute] = self._uni_from_dict(value)
944
                except ValueError as exception:
945
                    result = "Error creating UNI: Invalid value"
946
                    raise ValueError(result) from exception
947
948
            if attribute == "circuit_scheduler":
949 1
                data[attribute] = []
950 1
                for schedule in value:
951
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
952
953
            # Get multiple attributes.
954
            # Ex: primary_links,
955
            #     backup_links,
956
            #     current_links_cache,
957 1
            #     primary_links_cache,
958 1
            #     backup_links_cache
959
            if "links" in attribute:
960
                data[attribute] = [
961
                    self._link_from_dict(link) for link in value
962 1
                ]
963
964 1
            # Ex: current_path,
965 1
            #     primary_path,
966 1
            #     backup_path
967 1
            if "path" in attribute and attribute != "dynamic_backup_path":
968
                data[attribute] = Path(
969 1
                    [self._link_from_dict(link) for link in value]
970
                )
971 1
972 1
        return data
973
974 1
    def _evc_from_dict(self, evc_dict):
975 1
        data = self._evc_dict_with_instances(evc_dict)
976 1
        data["table_group"] = self.table_group
977 1
        return EVC(self.controller, **data)
978
979
    def _uni_from_dict(self, uni_dict):
980
        """Return a UNI object from python dict."""
981 1
        if uni_dict is None:
982 1
            return False
983 1
984 1
        interface_id = uni_dict.get("interface_id")
985 1
        interface = self.controller.get_interface_by_id(interface_id)
986 1
        if interface is None:
987 1
            result = (
988 1
                "Error creating UNI:"
989
                + f"Could not instantiate interface {interface_id}"
990 1
            )
991 1
            raise ValueError(result) from ValueError
992
        tag_convert = {1: "vlan"}
993 1
        tag_dict = uni_dict.get("tag", None)
994
        if tag_dict:
995 1
            tag_type = tag_dict.get("tag_type")
996
            tag_type = tag_convert.get(tag_type, tag_type)
997 1
            tag_value = tag_dict.get("value")
998 1
            if isinstance(tag_value, list):
999
                tag_value = get_tag_ranges(tag_value)
1000 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1001 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1002 1
            else:
1003 1
                tag = TAG(tag_type, tag_value)
1004 1
        else:
1005 1
            tag = None
1006
        uni = UNI(interface, tag)
1007
        return uni
1008
1009 1
    def _link_from_dict(self, link_dict):
1010 1
        """Return a Link object from python dict."""
1011 1
        id_a = link_dict.get("endpoint_a").get("id")
1012
        id_b = link_dict.get("endpoint_b").get("id")
1013 1
1014 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1015 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1016 1
        if not endpoint_a:
1017
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1018
            raise ValueError(error_msg)
1019 1
        if not endpoint_b:
1020 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1021
            raise ValueError(error_msg)
1022 1
1023
        link = Link(endpoint_a, endpoint_b)
1024
        if "metadata" in link_dict:
1025
            link.extend_metadata(link_dict.get("metadata"))
1026
1027
        s_vlan = link.get_metadata("s_vlan")
1028
        if s_vlan:
1029 1
            tag = TAG.from_dict(s_vlan)
1030 1
            if tag is False:
1031 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1032
                raise ValueError(error_msg)
1033
            link.update_metadata("s_vlan", tag)
1034 1
        return link
1035 1
1036 1
    def _find_evc_by_schedule_id(self, schedule_id):
1037 1
        """
1038 1
        Find an EVC and CircuitSchedule based on schedule_id.
1039 1
1040 1
        :param schedule_id: Schedule ID
1041 1
        :return: EVC and Schedule
1042 1
        """
1043
        circuits = self._get_circuits_buffer()
1044 1
        found_schedule = None
1045
        evc = None
1046
1047
        # pylint: disable=unused-variable
1048
        for c_id, circuit in circuits.items():
1049
            for schedule in circuit.circuit_scheduler:
1050 1
                if schedule.id == schedule_id:
1051
                    found_schedule = schedule
1052 1
                    evc = circuit
1053 1
                    break
1054 1
            if found_schedule:
1055 1
                break
1056 1
        return evc, found_schedule
1057
1058
    def _get_circuits_buffer(self):
1059 1
        """
1060 1
        Return the circuit buffer.
1061
1062 1
        If the buffer is empty, try to load data from mongodb.
1063 1
        """
1064
        if not self.circuits:
1065 1
            # Load circuits from mongodb to buffer
1066 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1067 1
            for c_id, circuit in circuits.items():
1068
                evc = self._evc_from_dict(circuit)
1069
                self.circuits[c_id] = evc
1070 1
        return self.circuits
1071 1
1072 1
    # pylint: disable=attribute-defined-outside-init
1073 1
    @alisten_to("kytos/of_multi_table.enable_table")
1074 1
    async def on_table_enabled(self, event):
1075
        """Handle a recently table enabled."""
1076
        table_group = event.content.get("mef_eline", None)
1077
        if not table_group:
1078
            return
1079
        for group in table_group:
1080
            if group not in settings.TABLE_GROUP_ALLOWED:
1081
                log.error(f'The table group "{group}" is not allowed for '
1082
                          f'mef_eline. Allowed table groups are '
1083
                          f'{settings.TABLE_GROUP_ALLOWED}')
1084
                return
1085
        self.table_group.update(table_group)
1086
        content = {"group_table": self.table_group}
1087
        name = "kytos/mef_eline.enable_table"
1088
        await aemit_event(self.controller, name, content)
1089