Test Failed
Pull Request — master (#396)
by
unknown
06:24
created

build.main.Main.on_flow_mod_error()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.exceptions import KytosInvalidRanges, KytosTagsAreNotAvailable
15
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
16 1
                                validate_openapi)
17 1
from kytos.core.interface import TAG, UNI, TAGRange
18 1
from kytos.core.link import Link
19
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
20 1
                                 get_json_or_400)
21 1
from kytos.core.tag_ranges import get_tag_ranges
22 1
from napps.kytos.mef_eline import controllers, settings
23
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
24 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
25 1
                                          Path)
26
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
27
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
28
                                         emit_event, get_vlan_tags_and_masks,
29
                                         map_evc_event_content)
30 1
31
32
# pylint: disable=too-many-public-methods
33
class Main(KytosNApp):
34
    """Main class of amlight/mef_eline NApp.
35
36 1
    This class is the entry point for this napp.
37
    """
38 1
39
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
40
41
    def setup(self):
42
        """Replace the '__init__' method for the KytosNApp subclass.
43
44
        The setup method is automatically called by the controller when your
45
        application is loaded.
46
47 1
        So, if you have any setup routine, insert it here.
48
        """
49
        # object used to scheduler circuit events
50 1
        self.sched = Scheduler()
51 1
52
        # object to save and load circuits
53
        self.mongo_controller = self.get_eline_controller()
54 1
        self.mongo_controller.bootstrap_indexes()
55
56
        # set the controller that will manager the dynamic paths
57
        DynamicPathManager.set_controller(self.controller)
58 1
59
        # dictionary of EVCs created. It acts as a circuit buffer.
60 1
        # Every create/update/delete must be synced to mongodb.
61 1
        self.circuits = {}
62 1
63
        self.table_group = {"epl": 0, "evpl": 0}
64 1
        self._lock = Lock()
65 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
66
67 1
        self.load_all_evcs()
68
        self._topology_updated_at = None
69
70
    def get_evcs_by_svc_level(self) -> list:
71
        """Get circuits sorted by desc service level and asc creation_time.
72 1
73
        In the future, as more ops are offloaded it should be get from the DB.
74
        """
75 1
        return sorted(self.circuits.values(),
76 1
                      key=lambda x: (-x.service_level, x.creation_time))
77
78
    @staticmethod
79
    def get_eline_controller():
80 1
        """Return the ELineController instance."""
81
        return controllers.ELineController()
82 1
83 1
    def execute(self):
84 1
        """Execute once when the napp is running."""
85 1
        if self._lock.locked():
86 1
            return
87 1
        log.debug("Starting consistency routine")
88
        with self._lock:
89 1
            self.execute_consistency()
90
        log.debug("Finished consistency routine")
91
92 1
    def should_be_checked(self, circuit):
93
        "Verify if the circuit meets the necessary conditions to be checked"
94
        # pylint: disable=too-many-boolean-expressions
95
        if (
96
                circuit.is_enabled()
97
                and not circuit.is_active()
98
                and not circuit.lock.locked()
99
                and not circuit.has_recent_removed_flow()
100
                and not circuit.is_recent_updated()
101
                and circuit.are_unis_active(self.controller.switches)
102
                # if a inter-switch EVC does not have current_path, it does not
103 1
                # make sense to run sdntrace on it
104
                and (circuit.is_intra_switch() or circuit.current_path)
105
                ):
106 1
            return True
107
        return False
108 1
109 1
    def execute_consistency(self):
110 1
        """Execute consistency routine."""
111 1
        circuits_to_check = []
112 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
113 1
        for circuit in self.get_evcs_by_svc_level():
114 1
            stored_circuits.pop(circuit.id, None)
115 1
            if self.should_be_checked(circuit):
116 1
                circuits_to_check.append(circuit)
117 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
118 1
        for circuit in circuits_to_check:
119 1
            is_checked = circuits_checked.get(circuit.id)
120 1
            if is_checked:
121 1
                circuit.execution_rounds = 0
122 1
                log.info(f"{circuit} enabled but inactive - activating")
123
                with circuit.lock:
124 1
                    circuit.activate()
125 1
                    circuit.sync()
126 1
            else:
127 1
                circuit.execution_rounds += 1
128 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
129 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
130 1
                    with circuit.lock:
131 1
                        circuit.deploy()
132
        for circuit_id in stored_circuits:
133 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
134
            self._load_evc(stored_circuits[circuit_id])
135
136
    def shutdown(self):
137
        """Execute when your napp is unloaded.
138
139 1
        If you have some cleanup procedure, insert it here.
140 1
        """
141
142
    @rest("/v2/evc/", methods=["GET"])
143
    def list_circuits(self, request: Request) -> JSONResponse:
144
        """Endpoint to return circuits stored.
145
146 1
        archive query arg if defined (not null) will be filtered
147 1
        accordingly, by default only non archived evcs will be listed
148 1
        """
149 1
        log.debug("list_circuits /v2/evc")
150 1
        args = request.query_params
151
        archived = args.get("archived", "false").lower()
152 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
153 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
154
                                                      metadata=args)
155 1
        circuits = circuits['circuits']
156 1
        return JSONResponse(circuits)
157
158
    @rest("/v2/evc/schedule", methods=["GET"])
159
    def list_schedules(self, _request: Request) -> JSONResponse:
160
        """Endpoint to return all schedules stored for all circuits.
161
162
        Return a JSON with the following template:
163
        [{"schedule_id": <schedule_id>,
164 1
         "circuit_id": <circuit_id>,
165 1
         "schedule": <schedule object>}]
166 1
        """
167 1
        log.debug("list_schedules /v2/evc/schedule")
168 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
169 1
        if not circuits:
170
            result = {}
171 1
            status = 200
172 1
            return JSONResponse(result, status_code=status)
173 1
174 1
        result = []
175 1
        status = 200
176 1
        for circuit in circuits:
177 1
            circuit_scheduler = circuit.get("circuit_scheduler")
178
            if circuit_scheduler:
179
                for scheduler in circuit_scheduler:
180
                    value = {
181
                        "schedule_id": scheduler.get("id"),
182 1
                        "circuit_id": circuit.get("id"),
183
                        "schedule": scheduler,
184 1
                    }
185 1
                    result.append(value)
186
187 1
        log.debug("list_schedules result %s %s", result, status)
188 1
        return JSONResponse(result, status_code=status)
189
190 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
191 1
    def get_circuit(self, request: Request) -> JSONResponse:
192 1
        """Endpoint to return a circuit based on id."""
193 1
        circuit_id = request.path_params["circuit_id"]
194 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
195 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
196 1
        if not circuit:
197 1
            result = f"circuit_id {circuit_id} not found"
198 1
            log.debug("get_circuit result %s %s", result, 404)
199 1
            raise HTTPException(404, detail=result)
200
        status = 200
201 1
        log.debug("get_circuit result %s %s", circuit, status)
202 1
        return JSONResponse(circuit, status_code=status)
203 1
204
    @rest("/v2/evc/", methods=["POST"])
205
    @validate_openapi(spec)
206
    def create_circuit(self, request: Request) -> JSONResponse:
207
        """Try to create a new circuit.
208
209
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
210
        UNI_Z's requested C-VID are available from the interfaces' pools. This
211
        is checked when creating the UNI object.
212
213
        Then, E-Line NApp requests a primary and a backup path to the
214
        Pathfinder NApp using the attributes primary_links and backup_links
215
        submitted via REST
216
217
        # For each link composing paths in #3:
218
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
219
        #  - Using the S-VID obtained, generate abstract flow entries to be
220
        #    sent to FlowManager
221
222
        Push abstract flow entries to FlowManager and FlowManager pushes
223
        OpenFlow entries to datapaths
224
225
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
226
        creation
227
228 1
        Finnaly, notify user of the status of its request.
229 1
        """
230
        # Try to create the circuit object
231 1
        log.debug("create_circuit /v2/evc/")
232 1
        data = get_json_or_400(request, self.controller.loop)
233 1
234 1
        try:
235 1
            evc = self._evc_from_dict(data)
236
        except ValueError as exception:
237 1
            log.debug("create_circuit result %s %s", exception, 400)
238 1
            raise HTTPException(400, detail=str(exception)) from exception
239 1
240 1
        try:
241 1
            check_disabled_component(evc.uni_a, evc.uni_z)
242
        except DisabledSwitch as exception:
243
            log.debug("create_circuit result %s %s", exception, 409)
244
            raise HTTPException(
245
                    409,
246 1
                    detail=f"Path is not valid: {exception}"
247
                ) from exception
248
249
        if evc.primary_path:
250
            try:
251
                evc.primary_path.is_valid(
252
                    evc.uni_a.interface.switch,
253
                    evc.uni_z.interface.switch,
254
                    bool(evc.circuit_scheduler),
255
                )
256
            except InvalidPath as exception:
257
                raise HTTPException(
258 1
                    400,
259
                    detail=f"primary_path is not valid: {exception}"
260
                ) from exception
261
        if evc.backup_path:
262
            try:
263
                evc.backup_path.is_valid(
264
                    evc.uni_a.interface.switch,
265
                    evc.uni_z.interface.switch,
266
                    bool(evc.circuit_scheduler),
267
                )
268
            except InvalidPath as exception:
269
                raise HTTPException(
270
                    400,
271
                    detail=f"backup_path is not valid: {exception}"
272 1
                ) from exception
273 1
274 1
        if not evc._tag_lists_equal():
275 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
276
            raise HTTPException(400, detail=detail)
277 1
278 1
        try:
279 1
            evc._validate_has_primary_or_dynamic()
280 1
        except ValueError as exception:
281
            raise HTTPException(400, detail=str(exception)) from exception
282 1
283 1
        try:
284
            self._use_uni_tags(evc)
285
        except KytosTagsAreNotAvailable as exception:
286
            raise HTTPException(400, detail=str(exception)) from exception
287
288 1
        # save circuit
289 1
        try:
290
            evc.sync()
291
        except ValidationError as exception:
292
            raise HTTPException(400, detail=str(exception)) from exception
293
294 1
        # store circuit in dictionary
295
        self.circuits[evc.id] = evc
296
297 1
        # Schedule the circuit deploy
298
        self.sched.add(evc)
299
300 1
        # Circuit has no schedule, deploy now
301 1
        if not evc.circuit_scheduler:
302 1
            with evc.lock:
303
                evc.deploy()
304
305 1
        # Notify users
306 1
        result = {"circuit_id": evc.id}
307 1
        status = 201
308 1
        log.debug("create_circuit result %s %s", result, status)
309
        emit_event(self.controller, name="created",
310 1
                   content=map_evc_event_content(evc))
311
        return JSONResponse(result, status_code=status)
312 1
313 1
    @staticmethod
314 1
    def _use_uni_tags(evc):
315 1
        uni_a = evc.uni_a
316 1
        try:
317 1
            evc._use_uni_vlan(uni_a)
318 1
        except KytosTagsAreNotAvailable as err:
319 1
            raise err
320 1
        try:
321 1
            uni_z = evc.uni_z
322 1
            evc._use_uni_vlan(uni_z)
323 1
        except KytosTagsAreNotAvailable as err:
324 1
            evc.make_uni_vlan_available(uni_a)
325
            raise err
326 1
327 1
    @listen_to('kytos/flow_manager.flow.removed')
328
    def on_flow_delete(self, event):
329
        """Capture delete messages to keep track when flows got removed."""
330
        self.handle_flow_delete(event)
331 1
332
    def handle_flow_delete(self, event):
333 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
334 1
        flow = event.content["flow"]
335 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
336 1
        if evc:
337 1
            log.debug("Flow removed in EVC %s", evc.id)
338
            evc.set_flow_removed_at()
339 1
340 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
341 1
    @validate_openapi(spec)
342
    def update(self, request: Request) -> JSONResponse:
343
        """Update a circuit based on payload.
344
345
        The EVC attributes (creation_time, active, current_path,
346
        failover_path, _id, archived) can't be updated.
347 1
        """
348 1
        data = get_json_or_400(request, self.controller.loop)
349 1
        circuit_id = request.path_params["circuit_id"]
350 1
        log.debug("update /v2/evc/%s", circuit_id)
351 1
        try:
352 1
            evc = self.circuits[circuit_id]
353 1
        except KeyError:
354 1
            result = f"circuit_id {circuit_id} not found"
355 1
            log.debug("update result %s %s", result, 404)
356
            raise HTTPException(404, detail=result) from KeyError
357 1
358 1
        if evc.archived:
359 1
            result = "Can't update archived EVC"
360 1
            log.debug("update result %s %s", result, 409)
361
            raise HTTPException(409, detail=result)
362 1
363 1
        try:
364
            enable, redeploy = evc.update(
365
                **self._evc_dict_with_instances(data)
366 1
            )
367
        except ValidationError as exception:
368 1
            raise HTTPException(400, detail=str(exception)) from exception
369 1
        except ValueError as exception:
370 1
            log.error(exception)
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except KytosTagsAreNotAvailable as exception:
374 1
            log.error(exception)
375
            log.debug("update result %s %s", exception, 400)
376
            raise HTTPException(400, detail=str(exception)) from exception
377
        except DisabledSwitch as exception:
378
            log.debug("update result %s %s", exception, 409)
379 1
            raise HTTPException(
380
                    409,
381
                    detail=f"Path is not valid: {exception}"
382
                ) from exception
383
384
        if evc.is_active():
385
            if enable is False:  # disable if active
386
                with evc.lock:
387
                    evc.remove()
388 1
            elif redeploy is not None:  # redeploy if active
389 1
                with evc.lock:
390 1
                    evc.remove()
391 1
                    evc.deploy()
392 1
        else:
393
            if enable is True:  # enable if inactive
394 1
                with evc.lock:
395 1
                    evc.deploy()
396
        result = {evc.id: evc.as_dict()}
397 1
        status = 200
398
399 1
        log.debug("update result %s %s", result, status)
400 1
        emit_event(self.controller, "updated",
401
                   content=map_evc_event_content(evc, **data))
402
        return JSONResponse(result, status_code=status)
403
404
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
405
    def delete_circuit(self, request: Request) -> JSONResponse:
406 1
        """Remove a circuit.
407 1
408 1
        First, the flows are removed from the switches, and then the EVC is
409 1
        disabled.
410 1
        """
411 1
        circuit_id = request.path_params["circuit_id"]
412 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
413 1
        try:
414
            evc = self.circuits[circuit_id]
415 1
        except KeyError:
416 1
            result = f"circuit_id {circuit_id} not found"
417 1
            log.debug("delete_circuit result %s %s", result, 404)
418 1
            raise HTTPException(404, detail=result) from KeyError
419
420 1
        if evc.archived:
421 1
            result = f"Circuit {circuit_id} already removed"
422 1
            log.debug("delete_circuit result %s %s", result, 404)
423 1
            raise HTTPException(404, detail=result)
424 1
425 1
        log.info("Removing %s", evc)
426 1
        with evc.lock:
427 1
            evc.remove_current_flows()
428 1
            evc.remove_failover_flows(sync=False)
429 1
            evc.deactivate()
430 1
            evc.disable()
431 1
            self.sched.remove(evc)
432 1
            evc.archive()
433
            evc.remove_uni_tags()
434 1
            evc.sync()
435 1
        log.info("EVC removed. %s", evc)
436
        result = {"response": f"Circuit {circuit_id} removed"}
437 1
        status = 200
438
439 1
        log.debug("delete_circuit result %s %s", result, status)
440 1
        emit_event(self.controller, "deleted",
441
                   content=map_evc_event_content(evc))
442 1
        return JSONResponse(result, status_code=status)
443 1
444 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
445
    def get_metadata(self, request: Request) -> JSONResponse:
446
        """Get metadata from an EVC."""
447
        circuit_id = request.path_params["circuit_id"]
448
        try:
449
            return (
450
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
451
            )
452
        except KeyError as error:
453 1
            raise HTTPException(
454 1
                404,
455 1
                detail=f"circuit_id {circuit_id} not found."
456
            ) from error
457 1
458 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
459
    @validate_openapi(spec)
460 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
461
        """Add metadata to a bulk of EVCs."""
462 1
        data = get_json_or_400(request, self.controller.loop)
463 1
        circuit_ids = data.pop("circuit_ids")
464 1
465 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
466 1
467 1
        fail_evcs = []
468 1
        for _id in circuit_ids:
469
            try:
470 1
                evc = self.circuits[_id]
471 1
                evc.extend_metadata(data)
472 1
            except KeyError:
473
                fail_evcs.append(_id)
474 1
475 1
        if fail_evcs:
476 1
            raise HTTPException(404, detail=fail_evcs)
477
        return JSONResponse("Operation successful", status_code=201)
478 1
479 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
480 1
    @validate_openapi(spec)
481
    def add_metadata(self, request: Request) -> JSONResponse:
482 1
        """Add metadata to an EVC."""
483 1
        circuit_id = request.path_params["circuit_id"]
484 1
        metadata = get_json_or_400(request, self.controller.loop)
485 1
        if not isinstance(metadata, dict):
486
            raise HTTPException(400, "Invalid metadata value: {metadata}")
487
        try:
488
            evc = self.circuits[circuit_id]
489
        except KeyError as error:
490 1
            raise HTTPException(
491 1
                404,
492 1
                detail=f"circuit_id {circuit_id} not found."
493
            ) from error
494 1
495 1
        evc.extend_metadata(metadata)
496 1
        evc.sync()
497
        return JSONResponse("Operation successful", status_code=201)
498 1
499 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
500 1
    @validate_openapi(spec)
501 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
502
        """Delete metada from a bulk of EVCs"""
503 1
        data = get_json_or_400(request, self.controller.loop)
504 1
        key = request.path_params["key"]
505 1
        circuit_ids = data.pop("circuit_ids")
506 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
507 1
508
        fail_evcs = []
509
        for _id in circuit_ids:
510
            try:
511 1
                evc = self.circuits[_id]
512
                evc.remove_metadata(key)
513 1
            except KeyError:
514
                fail_evcs.append(_id)
515 1
516 1
        if fail_evcs:
517
            raise HTTPException(404, detail=fail_evcs)
518 1
        return JSONResponse("Operation successful")
519 1
520 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
521 1
    def delete_metadata(self, request: Request) -> JSONResponse:
522 1
        """Delete metadata from an EVC."""
523 1
        circuit_id = request.path_params["circuit_id"]
524
        key = request.path_params["key"]
525
        try:
526
            evc = self.circuits[circuit_id]
527
        except KeyError as error:
528 1
            raise HTTPException(
529 1
                404,
530 1
                detail=f"circuit_id {circuit_id} not found."
531
            ) from error
532 1
533 1
        evc.remove_metadata(key)
534
        evc.sync()
535 1
        return JSONResponse("Operation successful")
536 1
537 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
538 1
    def redeploy(self, request: Request) -> JSONResponse:
539 1
        """Endpoint to force the redeployment of an EVC."""
540 1
        circuit_id = request.path_params["circuit_id"]
541
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
542
        try:
543
            evc = self.circuits[circuit_id]
544 1
        except KeyError:
545 1
            raise HTTPException(
546 1
                404,
547 1
                detail=f"circuit_id {circuit_id} not found"
548 1
            ) from KeyError
549 1
        if evc.is_enabled():
550
            with evc.lock:
551 1
                evc.remove_current_flows()
552 1
                evc.deploy()
553
            result = {"response": f"Circuit {circuit_id} redeploy received."}
554 1
            status = 202
555
        else:
556 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
557 1
            status = 409
558 1
559
        return JSONResponse(result, status_code=status)
560
561
    @rest("/v2/evc/schedule/", methods=["POST"])
562
    @validate_openapi(spec)
563
    def create_schedule(self, request: Request) -> JSONResponse:
564
        """
565
        Create a new schedule for a given circuit.
566
567
        This service do no check if there are conflicts with another schedule.
568
        Payload example:
569
            {
570
              "circuit_id":"aa:bb:cc",
571
              "schedule": {
572
                "date": "2019-08-07T14:52:10.967Z",
573
                "interval": "string",
574 1
                "frequency": "1 * * * *",
575 1
                "action": "create"
576 1
              }
577 1
            }
578
        """
579
        log.debug("create_schedule /v2/evc/schedule/")
580 1
        data = get_json_or_400(request, self.controller.loop)
581
        circuit_id = data["circuit_id"]
582
        schedule_data = data["schedule"]
583 1
584
        # Get EVC from circuits buffer
585
        circuits = self._get_circuits_buffer()
586 1
587 1
        # get the circuit
588 1
        evc = circuits.get(circuit_id)
589 1
590
        # get the circuit
591 1
        if not evc:
592 1
            result = f"circuit_id {circuit_id} not found"
593 1
            log.debug("create_schedule result %s %s", result, 404)
594 1
            raise HTTPException(404, detail=result)
595
        # Can not modify circuits deleted and archived
596
        if evc.archived:
597 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
598
            log.debug("create_schedule result %s %s", result, 409)
599
            raise HTTPException(409, detail=result)
600 1
601 1
        # new schedule from dict
602
        new_schedule = CircuitSchedule.from_dict(schedule_data)
603
604 1
        # If there is no schedule, create the list
605
        if not evc.circuit_scheduler:
606
            evc.circuit_scheduler = []
607 1
608
        # Add the new schedule
609
        evc.circuit_scheduler.append(new_schedule)
610 1
611
        # Add schedule job
612 1
        self.sched.add_circuit_job(evc, new_schedule)
613 1
614
        # save circuit to mongodb
615 1
        evc.sync()
616 1
617
        result = new_schedule.as_dict()
618 1
        status = 201
619 1
620 1
        log.debug("create_schedule result %s %s", result, status)
621
        return JSONResponse(result, status_code=status)
622
623
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
624
    @validate_openapi(spec)
625
    def update_schedule(self, request: Request) -> JSONResponse:
626
        """Update a schedule.
627
628
        Change all attributes from the given schedule from a EVC circuit.
629
        The schedule ID is preserved as default.
630
        Payload example:
631
            {
632
              "date": "2019-08-07T14:52:10.967Z",
633 1
              "interval": "string",
634 1
              "frequency": "1 * * *",
635 1
              "action": "create"
636
            }
637
        """
638 1
        data = get_json_or_400(request, self.controller.loop)
639
        schedule_id = request.path_params["schedule_id"]
640
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
641 1
642 1
        # Try to find a circuit schedule
643 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
644 1
645 1
        # Can not modify circuits deleted and archived
646 1
        if not found_schedule:
647 1
            result = f"schedule_id {schedule_id} not found"
648 1
            log.debug("update_schedule result %s %s", result, 404)
649
            raise HTTPException(404, detail=result)
650 1
        if evc.archived:
651 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
652
            log.debug("update_schedule result %s %s", result, 409)
653 1
            raise HTTPException(409, detail=result)
654
655 1
        new_schedule = CircuitSchedule.from_dict(data)
656
        new_schedule.id = found_schedule.id
657
        # Remove the old schedule
658 1
        evc.circuit_scheduler.remove(found_schedule)
659
        # Append the modified schedule
660 1
        evc.circuit_scheduler.append(new_schedule)
661
662 1
        # Cancel all schedule jobs
663
        self.sched.cancel_job(found_schedule.id)
664 1
        # Add the new circuit schedule
665 1
        self.sched.add_circuit_job(evc, new_schedule)
666
        # Save EVC to mongodb
667 1
        evc.sync()
668 1
669
        result = new_schedule.as_dict()
670 1
        status = 200
671 1
672
        log.debug("update_schedule result %s %s", result, status)
673
        return JSONResponse(result, status_code=status)
674
675
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
676
    def delete_schedule(self, request: Request) -> JSONResponse:
677
        """Remove a circuit schedule.
678 1
679 1
        Remove the Schedule from EVC.
680 1
        Remove the Schedule from cron job.
681
        Save the EVC to the Storehouse.
682
        """
683 1
        schedule_id = request.path_params["schedule_id"]
684 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
685 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
686 1
687
        # Can not modify circuits deleted and archived
688 1
        if not found_schedule:
689 1
            result = f"schedule_id {schedule_id} not found"
690 1
            log.debug("delete_schedule result %s %s", result, 404)
691 1
            raise HTTPException(404, detail=result)
692
693
        if evc.archived:
694 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
695
            log.debug("delete_schedule result %s %s", result, 409)
696
            raise HTTPException(409, detail=result)
697 1
698
        # Remove the old schedule
699 1
        evc.circuit_scheduler.remove(found_schedule)
700
701 1
        # Cancel all schedule jobs
702 1
        self.sched.cancel_job(found_schedule.id)
703
        # Save EVC to mongodb
704 1
        evc.sync()
705 1
706
        result = "Schedule removed"
707 1
        status = 200
708
709
        log.debug("delete_schedule result %s %s", result, status)
710
        return JSONResponse(result, status_code=status)
711
712
    @listen_to("kytos/topology.link_up")
713
    def on_link_up(self, event):
714
        """Change circuit when link is up or end_maintenance."""
715
        self.handle_link_up(event)
716
717 1
    def handle_link_up(self, event):
718 1
        """Change circuit when link is up or end_maintenance."""
719 1
        log.info("Event handle_link_up %s", event.content["link"])
720 1
        for evc in self.get_evcs_by_svc_level():
721
            if evc.is_enabled() and not evc.archived:
722 1
                with evc.lock:
723 1
                    evc.handle_link_up(event.content["link"])
724
725
    @listen_to("kytos/topology.updated")
726
    def on_topology_update(self, event):
727 1
        """Capture topology update event"""
728
        self.handle_topology_update(event)
729 1
730 1
    def handle_topology_update(self, event):
731 1
        """Handle topology update"""
732 1
        with self._lock:
733 1
            if (
734
                self._topology_updated_at
735 1
                and self._topology_updated_at > event.timestamp
736 1
            ):
737
                return
738
            self._topology_updated_at = event.timestamp
739
            for evc in self.get_evcs_by_svc_level():
740 1
                if evc.is_enabled() and not evc.archived:
741
                    with evc.lock:
742 1
                        evc.handle_topology_update(
743 1
                            event.content["topology"].switches
744
                        )
745
746
    @listen_to("kytos/topology.link_down")
747
    def on_link_down(self, event):
748 1
        """Change circuit when link is down or under_mantenance."""
749 1
        self.handle_link_down(event)
750 1
751 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
752 1
        """Change circuit when link is down or under_mantenance."""
753
        link = event.content["link"]
754
        log.info("Event handle_link_down %s", link)
755
        switch_flows = {}
756 1
        evcs_with_failover = []
757 1
        evcs_normal = []
758
        check_failover = []
759
        for evc in self.get_evcs_by_svc_level():
760
            if evc.is_affected_by_link(link):
761 1
                # if there is no failover path, handles link down the
762
                # tradditional way
763 1
                if (
764 1
                    not getattr(evc, 'failover_path', None) or
765 1
                    evc.is_failover_path_affected_by_link(link)
766 1
                ):
767 1
                    evcs_normal.append(evc)
768 1
                    continue
769 1
                try:
770 1
                    dpid_flows = evc.get_failover_flows()
771
                # pylint: disable=broad-except
772
                except Exception:
773 1
                    err = traceback.format_exc().replace("\n", ", ")
774
                    log.error(
775
                        f"Ignore Failover path for {evc} due to error: {err}"
776
                    )
777 1
                    evcs_normal.append(evc)
778 1
                    continue
779 1
                for dpid, flows in dpid_flows.items():
780 1
                    switch_flows.setdefault(dpid, [])
781
                    switch_flows[dpid].extend(flows)
782 1
                evcs_with_failover.append(evc)
783 1
            else:
784 1
                check_failover.append(evc)
785
786
        while switch_flows:
787 1
            offset = settings.BATCH_SIZE or None
788 1
            switches = list(switch_flows.keys())
789 1
            for dpid in switches:
790 1
                emit_event(
791 1
                    self.controller,
792 1
                    context="kytos.flow_manager",
793
                    name="flows.install",
794 1
                    content={
795
                        "dpid": dpid,
796 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
797 1
                    }
798 1
                )
799 1
                if offset is None or offset >= len(switch_flows[dpid]):
800 1
                    del switch_flows[dpid]
801
                    continue
802
                switch_flows[dpid] = switch_flows[dpid][offset:]
803
            time.sleep(settings.BATCH_INTERVAL)
804
805
        for evc in evcs_with_failover:
806
            with evc.lock:
807
                old_path = evc.current_path
808
                evc.current_path = evc.failover_path
809 1
                evc.failover_path = old_path
810 1
                evc.sync()
811 1
            emit_event(self.controller, "redeployed_link_down",
812 1
                       content=map_evc_event_content(evc))
813 1
            log.info(
814
                f"{evc} redeployed with failover due to link down {link.id}"
815 1
            )
816 1
817 1
        for evc in evcs_normal:
818 1
            emit_event(
819 1
                self.controller,
820 1
                "evc_affected_by_link_down",
821 1
                content={"link_id": link.id} | map_evc_event_content(evc)
822
            )
823 1
824
        # After handling the hot path, check if new failover paths are needed.
825
        # Note that EVCs affected by link down will generate a KytosEvent for
826
        # deployed|redeployed, which will trigger the failover path setup.
827 1
        # Thus, we just need to further check the check_failover list
828 1
        for evc in check_failover:
829
            if evc.is_failover_path_affected_by_link(link):
830
                with evc.lock:
831
                    evc.setup_failover_path()
832
833
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
834
    def on_evc_affected_by_link_down(self, event):
835
        """Change circuit when link is down or under_mantenance."""
836
        self.handle_evc_affected_by_link_down(event)
837
838 1
    def handle_evc_affected_by_link_down(self, event):
839 1
        """Change circuit when link is down or under_mantenance."""
840 1
        evc = self.circuits.get(event.content["evc_id"])
841 1
        link_id = event.content['link_id']
842
        if not evc:
843 1
            return
844 1
        with evc.lock:
845
            result = evc.handle_link_down()
846
        event_name = "error_redeploy_link_down"
847
        if result:
848 1
            log.info(f"{evc} redeployed due to link down {link_id}")
849
            event_name = "redeployed_link_down"
850 1
        emit_event(self.controller, event_name,
851 1
                   content=map_evc_event_content(evc))
852 1
853 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
854 1
    def on_evc_deployed(self, event):
855 1
        """Handle EVC deployed|redeployed_link_down."""
856 1
        self.handle_evc_deployed(event)
857 1
858 1
    def handle_evc_deployed(self, event):
859 1
        """Setup failover path on evc deployed."""
860 1
        evc = self.circuits.get(event.content["evc_id"])
861
        if not evc:
862
            return
863 1
        with evc.lock:
864 1
            evc.setup_failover_path()
865
866
    @listen_to("kytos/topology.topology_loaded")
867
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
868 1
        """Load EVCs once the topology is available."""
869
        self.load_all_evcs()
870 1
871 1
    def load_all_evcs(self):
872 1
        """Try to load all EVCs on startup."""
873 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
874 1
        for circuit_id, circuit in circuits:
875
            if circuit_id not in self.circuits:
876 1
                self._load_evc(circuit)
877 1
878
    def _load_evc(self, circuit_dict):
879
        """Load one EVC from mongodb to memory."""
880
        try:
881 1
            evc = self._evc_from_dict(circuit_dict)
882
        except ValueError as exception:
883 1
            log.error(
884 1
                f"Could not load EVC: dict={circuit_dict} error={exception}"
885 1
            )
886 1
            return None
887
888 1
        if evc.archived:
889
            return None
890 1
        evc.deactivate()
891 1
        evc.sync()
892 1
        self.circuits.setdefault(evc.id, evc)
893 1
        self.sched.add(evc)
894
        return evc
895
896 1
    @listen_to("kytos/flow_manager.flow.error")
897
    def on_flow_mod_error(self, event):
898 1
        """Handle flow mod errors related to an EVC."""
899 1
        self.handle_flow_mod_error(event)
900 1
901 1
    def handle_flow_mod_error(self, event):
902 1
        """Handle flow mod errors related to an EVC."""
903 1
        flow = event.content["flow"]
904 1
        command = event.content.get("error_command")
905
        if command != "add":
906 1
            return
907 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
908
        if evc:
909
            with evc.lock:
910
                evc.remove_current_flows()
911 1
912
    def _evc_dict_with_instances(self, evc_dict):
913 1
        """Convert some dict values to instance of EVC classes.
914 1
915 1
        This method will convert: [UNI, Link]
916
        """
917 1
        data = evc_dict.copy()  # Do not modify the original dict
918 1
        for attribute, value in data.items():
919 1
            # Get multiple attributes.
920 1
            # Ex: uni_a, uni_z
921
            if "uni" in attribute:
922 1
                try:
923
                    data[attribute] = self._uni_from_dict(value)
924
                except ValueError as exception:
925
                    result = "Error creating UNI: Invalid value"
926
                    raise ValueError(result) from exception
927 1
928 1
            if attribute == "circuit_scheduler":
929
                data[attribute] = []
930
                for schedule in value:
931 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
932 1
933 1
            # Get multiple attributes.
934 1
            # Ex: primary_links,
935 1
            #     backup_links,
936 1
            #     current_links_cache,
937
            #     primary_links_cache,
938 1
            #     backup_links_cache
939 1
            if "links" in attribute:
940 1
                data[attribute] = [
941 1
                    self._link_from_dict(link) for link in value
942
                ]
943
944
            # Ex: current_path,
945
            #     primary_path,
946
            #     backup_path
947
            if "path" in attribute and attribute != "dynamic_backup_path":
948
                data[attribute] = Path(
949 1
                    [self._link_from_dict(link) for link in value]
950 1
                )
951
952
        return data
953
954
    def _evc_from_dict(self, evc_dict):
955
        data = self._evc_dict_with_instances(evc_dict)
956
        data["table_group"] = self.table_group
957 1
        return EVC(self.controller, **data)
958 1
959
    def _uni_from_dict(self, uni_dict):
960
        """Return a UNI object from python dict."""
961
        if uni_dict is None:
962 1
            return False
963
964 1
        interface_id = uni_dict.get("interface_id")
965 1
        interface = self.controller.get_interface_by_id(interface_id)
966 1
        if interface is None:
967 1
            result = (
968
                "Error creating UNI:"
969 1
                + f"Could not instantiate interface {interface_id}"
970
            )
971 1
            raise ValueError(result) from ValueError
972 1
        tag_convert = {1: "vlan"}
973
        tag_dict = uni_dict.get("tag", None)
974 1
        if tag_dict:
975 1
            tag_type = tag_dict.get("tag_type")
976 1
            tag_type = tag_convert.get(tag_type, tag_type)
977 1
            tag_value = tag_dict.get("value")
978
            if isinstance(tag_value, list):
979
                try:
980
                    tag_value = get_tag_ranges(tag_value)
981 1
                except KytosInvalidRanges as err:
982 1
                    raise err
983 1
                mask_list = get_vlan_tags_and_masks(tag_value)
984 1
                tag = TAGRange(tag_type, tag_value, mask_list)
985 1
            else:
986 1
                tag = TAG(tag_type, tag_value)
987 1
        else:
988 1
            tag = None
989
        uni = UNI(interface, tag)
990 1
991 1
        return uni
992
993 1
    def _link_from_dict(self, link_dict):
994
        """Return a Link object from python dict."""
995 1
        id_a = link_dict.get("endpoint_a").get("id")
996
        id_b = link_dict.get("endpoint_b").get("id")
997 1
998 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
999
        endpoint_b = self.controller.get_interface_by_id(id_b)
1000 1
        if not endpoint_a:
1001 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1002 1
            raise ValueError(error_msg)
1003 1
        if not endpoint_b:
1004 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1005 1
            raise ValueError(error_msg)
1006
1007
        link = Link(endpoint_a, endpoint_b)
1008
        if "metadata" in link_dict:
1009 1
            link.extend_metadata(link_dict.get("metadata"))
1010 1
1011 1
        s_vlan = link.get_metadata("s_vlan")
1012
        if s_vlan:
1013 1
            tag = TAG.from_dict(s_vlan)
1014 1
            if tag is False:
1015 1
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1016 1
                raise ValueError(error_msg)
1017
            link.update_metadata("s_vlan", tag)
1018
        return link
1019 1
1020 1
    def _find_evc_by_schedule_id(self, schedule_id):
1021
        """
1022 1
        Find an EVC and CircuitSchedule based on schedule_id.
1023
1024
        :param schedule_id: Schedule ID
1025
        :return: EVC and Schedule
1026
        """
1027
        circuits = self._get_circuits_buffer()
1028
        found_schedule = None
1029 1
        evc = None
1030 1
1031 1
        # pylint: disable=unused-variable
1032
        for c_id, circuit in circuits.items():
1033
            for schedule in circuit.circuit_scheduler:
1034 1
                if schedule.id == schedule_id:
1035 1
                    found_schedule = schedule
1036 1
                    evc = circuit
1037 1
                    break
1038 1
            if found_schedule:
1039 1
                break
1040 1
        return evc, found_schedule
1041 1
1042 1
    def _get_circuits_buffer(self):
1043
        """
1044 1
        Return the circuit buffer.
1045
1046
        If the buffer is empty, try to load data from mongodb.
1047
        """
1048
        if not self.circuits:
1049
            # Load circuits from mongodb to buffer
1050 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1051
            for c_id, circuit in circuits.items():
1052 1
                evc = self._evc_from_dict(circuit)
1053 1
                self.circuits[c_id] = evc
1054 1
        return self.circuits
1055 1
1056 1
    # pylint: disable=attribute-defined-outside-init
1057
    @alisten_to("kytos/of_multi_table.enable_table")
1058
    async def on_table_enabled(self, event):
1059 1
        """Handle a recently table enabled."""
1060 1
        table_group = event.content.get("mef_eline", None)
1061
        if not table_group:
1062 1
            return
1063 1
        for group in table_group:
1064
            if group not in settings.TABLE_GROUP_ALLOWED:
1065 1
                log.error(f'The table group "{group}" is not allowed for '
1066 1
                          f'mef_eline. Allowed table groups are '
1067 1
                          f'{settings.TABLE_GROUP_ALLOWED}')
1068
                return
1069
        self.table_group.update(table_group)
1070 1
        content = {"group_table": self.table_group}
1071 1
        name = "kytos/mef_eline.enable_table"
1072
        await aemit_event(self.controller, name, content)
1073