Test Failed
Pull Request — master (#407)
by Vinicius
15:31 queued 13:18
created

build.main.Main.setup()   A

Complexity

Conditions 1

Size

Total Lines 28
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 1
dl 0
loc 28
ccs 11
cts 11
cp 1
crap 1
rs 9.85
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17 1
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21 1
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26 1
                                          Path)
27
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31 1
32
33
# pylint: disable=too-many-public-methods
34
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37 1
    This class is the entry point for this napp.
38
    """
39 1
40
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48 1
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52 1
53
        # object to save and load circuits
54
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58
        DynamicPathManager.set_controller(self.controller)
59 1
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61 1
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63 1
64
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69
        self._topology_updated_at = None
70
71
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73 1
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77 1
                      key=lambda x: (-x.service_level, x.creation_time))
78
79
    @staticmethod
80
    def get_eline_controller():
81 1
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83 1
84 1
    def execute(self):
85 1
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89
        with self._lock:
90 1
            self.execute_consistency()
91
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104 1
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109 1
110 1
    def execute_consistency(self):
111 1
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127 1
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135
            self._load_evc(stored_circuits[circuit_id])
136
137
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140 1
        If you have some cleanup procedure, insert it here.
141 1
        """
142
143
    @rest("/v2/evc/", methods=["GET"])
144
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147 1
        archive query arg if defined (not null) will be filtered
148 1
        accordingly, by default only non archived evcs will be listed
149 1
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159
    @rest("/v2/evc/schedule", methods=["GET"])
160
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165 1
         "circuit_id": <circuit_id>,
166 1
         "schedule": <schedule object>}]
167 1
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174 1
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179
            if circuit_scheduler:
180
                for scheduler in circuit_scheduler:
181
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183 1
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185 1
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193 1
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204 1
205
    # pylint: disable=too-many-branches, too-many-statements
206
    @rest("/v2/evc/", methods=["POST"])
207
    @validate_openapi(spec)
208
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229 1
230 1
        Finnaly, notify user of the status of its request.
231
        """
232 1
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235 1
236 1
        try:
237
            evc = self._evc_from_dict(data)
238 1
        except (ValueError, KytosTagError) as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        try:
242 1
            check_disabled_component(evc.uni_a, evc.uni_z)
243
        except DisabledSwitch as exception:
244
            log.debug("create_circuit result %s %s", exception, 409)
245
            raise HTTPException(
246
                    409,
247 1
                    detail=f"Path is not valid: {exception}"
248
                ) from exception
249
250
        if evc.primary_path:
251
            try:
252
                evc.primary_path.is_valid(
253
                    evc.uni_a.interface.switch,
254
                    evc.uni_z.interface.switch,
255
                    bool(evc.circuit_scheduler),
256
                )
257
            except InvalidPath as exception:
258
                raise HTTPException(
259 1
                    400,
260
                    detail=f"primary_path is not valid: {exception}"
261
                ) from exception
262
        if evc.backup_path:
263
            try:
264
                evc.backup_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269
            except InvalidPath as exception:
270
                raise HTTPException(
271
                    400,
272
                    detail=f"backup_path is not valid: {exception}"
273 1
                ) from exception
274 1
275 1
        if self._is_duplicated_evc(evc):
276 1
            result = "The EVC already exists."
277
            log.debug("create_circuit result %s %s", result, 409)
278 1
            raise HTTPException(409, detail=result)
279 1
280 1
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282
            raise HTTPException(400, detail=detail)
283 1
284 1
        try:
285
            evc._validate_has_primary_or_dynamic()
286
        except ValueError as exception:
287
            raise HTTPException(400, detail=str(exception)) from exception
288
289 1
        try:
290 1
            self._use_uni_tags(evc)
291
        except KytosTagError as exception:
292
            raise HTTPException(400, detail=str(exception)) from exception
293
294
        # save circuit
295 1
        try:
296
            evc.sync()
297
        except ValidationError as exception:
298 1
            raise HTTPException(400, detail=str(exception)) from exception
299
300
        # store circuit in dictionary
301 1
        self.circuits[evc.id] = evc
302 1
303 1
        # Schedule the circuit deploy
304
        self.sched.add(evc)
305
306 1
        # Circuit has no schedule, deploy now
307 1
        if not evc.circuit_scheduler:
308 1
            with evc.lock:
309 1
                evc.deploy()
310
311 1
        # Notify users
312
        result = {"circuit_id": evc.id}
313 1
        status = 201
314 1
        log.debug("create_circuit result %s %s", result, status)
315 1
        emit_event(self.controller, name="created",
316 1
                   content=map_evc_event_content(evc))
317 1
        return JSONResponse(result, status_code=status)
318 1
319 1
    @staticmethod
320 1
    def _use_uni_tags(evc):
321 1
        uni_a = evc.uni_a
322 1
        evc._use_uni_vlan(uni_a)
323 1
        try:
324 1
            uni_z = evc.uni_z
325 1
            evc._use_uni_vlan(uni_z)
326
        except KytosTagError as err:
327 1
            evc.make_uni_vlan_available(uni_a)
328 1
            raise err
329
330
    @listen_to('kytos/flow_manager.flow.removed')
331
    def on_flow_delete(self, event):
332 1
        """Capture delete messages to keep track when flows got removed."""
333
        self.handle_flow_delete(event)
334 1
335 1
    def handle_flow_delete(self, event):
336 1
        """Keep track when the EVC got flows removed by deriving its cookie."""
337 1
        flow = event.content["flow"]
338 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
339
        if evc:
340 1
            log.debug("Flow removed in EVC %s", evc.id)
341 1
            evc.set_flow_removed_at()
342 1
343
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
344
    @validate_openapi(spec)
345
    def update(self, request: Request) -> JSONResponse:
346
        """Update a circuit based on payload.
347
348 1
        The EVC attributes (creation_time, active, current_path,
349 1
        failover_path, _id, archived) can't be updated.
350 1
        """
351 1
        data = get_json_or_400(request, self.controller.loop)
352 1
        circuit_id = request.path_params["circuit_id"]
353 1
        log.debug("update /v2/evc/%s", circuit_id)
354 1
        try:
355 1
            evc = self.circuits[circuit_id]
356 1
        except KeyError:
357
            result = f"circuit_id {circuit_id} not found"
358 1
            log.debug("update result %s %s", result, 404)
359 1
            raise HTTPException(404, detail=result) from KeyError
360 1
361 1
        if evc.archived:
362
            result = "Can't update archived EVC"
363 1
            log.debug("update result %s %s", result, 409)
364 1
            raise HTTPException(409, detail=result)
365
366
        try:
367 1
            enable, redeploy = evc.update(
368
                **self._evc_dict_with_instances(data)
369 1
            )
370 1
        except (ValueError, KytosTagError, ValidationError) as exception:
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except DisabledSwitch as exception:
374 1
            log.debug("update result %s %s", exception, 409)
375 1
            raise HTTPException(
376
                    409,
377
                    detail=f"Path is not valid: {exception}"
378
                ) from exception
379
380 1
        if self._is_duplicated_evc(evc):
381
            result = "The EVC already exists."
382
            log.debug("create_circuit result %s %s", result, 409)
383
            raise HTTPException(409, detail=result)
384
385
        if evc.is_active():
386
            if enable is False:  # disable if active
387
                with evc.lock:
388
                    evc.remove()
389 1
            elif redeploy is not None:  # redeploy if active
390 1
                with evc.lock:
391 1
                    evc.remove()
392 1
                    evc.deploy()
393 1
        else:
394
            if enable is True:  # enable if inactive
395 1
                with evc.lock:
396 1
                    evc.deploy()
397
        result = {evc.id: evc.as_dict()}
398 1
        status = 200
399
400 1
        log.debug("update result %s %s", result, status)
401 1
        emit_event(self.controller, "updated",
402
                   content=map_evc_event_content(evc, **data))
403
        return JSONResponse(result, status_code=status)
404
405
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
406
    def delete_circuit(self, request: Request) -> JSONResponse:
407 1
        """Remove a circuit.
408 1
409 1
        First, the flows are removed from the switches, and then the EVC is
410 1
        disabled.
411 1
        """
412 1
        circuit_id = request.path_params["circuit_id"]
413 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
414 1
        try:
415
            evc = self.circuits[circuit_id]
416 1
        except KeyError:
417 1
            result = f"circuit_id {circuit_id} not found"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419 1
            raise HTTPException(404, detail=result) from KeyError
420
421 1
        if evc.archived:
422 1
            result = f"Circuit {circuit_id} already removed"
423 1
            log.debug("delete_circuit result %s %s", result, 404)
424 1
            raise HTTPException(404, detail=result)
425 1
426 1
        log.info("Removing %s", evc)
427 1
        with evc.lock:
428 1
            evc.remove_current_flows()
429 1
            evc.remove_failover_flows(sync=False)
430 1
            evc.deactivate()
431 1
            evc.disable()
432 1
            self.sched.remove(evc)
433 1
            evc.archive()
434
            evc.remove_uni_tags()
435 1
            evc.sync()
436 1
        log.info("EVC removed. %s", evc)
437
        result = {"response": f"Circuit {circuit_id} removed"}
438 1
        status = 200
439
440 1
        log.debug("delete_circuit result %s %s", result, status)
441 1
        emit_event(self.controller, "deleted",
442
                   content=map_evc_event_content(evc))
443 1
        return JSONResponse(result, status_code=status)
444 1
445 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
446
    def get_metadata(self, request: Request) -> JSONResponse:
447
        """Get metadata from an EVC."""
448
        circuit_id = request.path_params["circuit_id"]
449
        try:
450
            return (
451
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
452
            )
453
        except KeyError as error:
454 1
            raise HTTPException(
455 1
                404,
456 1
                detail=f"circuit_id {circuit_id} not found."
457
            ) from error
458 1
459 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460
    @validate_openapi(spec)
461 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
462
        """Add metadata to a bulk of EVCs."""
463 1
        data = get_json_or_400(request, self.controller.loop)
464 1
        circuit_ids = data.pop("circuit_ids")
465 1
466 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
467 1
468 1
        fail_evcs = []
469 1
        for _id in circuit_ids:
470
            try:
471 1
                evc = self.circuits[_id]
472 1
                evc.extend_metadata(data)
473 1
            except KeyError:
474
                fail_evcs.append(_id)
475 1
476 1
        if fail_evcs:
477 1
            raise HTTPException(404, detail=fail_evcs)
478
        return JSONResponse("Operation successful", status_code=201)
479 1
480 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
481 1
    @validate_openapi(spec)
482
    def add_metadata(self, request: Request) -> JSONResponse:
483 1
        """Add metadata to an EVC."""
484 1
        circuit_id = request.path_params["circuit_id"]
485 1
        metadata = get_json_or_400(request, self.controller.loop)
486 1
        if not isinstance(metadata, dict):
487
            raise HTTPException(400, "Invalid metadata value: {metadata}")
488
        try:
489
            evc = self.circuits[circuit_id]
490
        except KeyError as error:
491 1
            raise HTTPException(
492 1
                404,
493 1
                detail=f"circuit_id {circuit_id} not found."
494
            ) from error
495 1
496 1
        evc.extend_metadata(metadata)
497 1
        evc.sync()
498
        return JSONResponse("Operation successful", status_code=201)
499 1
500 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
501 1
    @validate_openapi(spec)
502 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
503
        """Delete metada from a bulk of EVCs"""
504 1
        data = get_json_or_400(request, self.controller.loop)
505 1
        key = request.path_params["key"]
506 1
        circuit_ids = data.pop("circuit_ids")
507 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
508 1
509
        fail_evcs = []
510
        for _id in circuit_ids:
511
            try:
512 1
                evc = self.circuits[_id]
513
                evc.remove_metadata(key)
514 1
            except KeyError:
515
                fail_evcs.append(_id)
516 1
517 1
        if fail_evcs:
518
            raise HTTPException(404, detail=fail_evcs)
519 1
        return JSONResponse("Operation successful")
520 1
521 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
522 1
    def delete_metadata(self, request: Request) -> JSONResponse:
523 1
        """Delete metadata from an EVC."""
524 1
        circuit_id = request.path_params["circuit_id"]
525
        key = request.path_params["key"]
526
        try:
527
            evc = self.circuits[circuit_id]
528
        except KeyError as error:
529 1
            raise HTTPException(
530 1
                404,
531 1
                detail=f"circuit_id {circuit_id} not found."
532
            ) from error
533 1
534 1
        evc.remove_metadata(key)
535
        evc.sync()
536 1
        return JSONResponse("Operation successful")
537 1
538 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
539 1
    def redeploy(self, request: Request) -> JSONResponse:
540 1
        """Endpoint to force the redeployment of an EVC."""
541 1
        circuit_id = request.path_params["circuit_id"]
542
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
543
        try:
544
            evc = self.circuits[circuit_id]
545 1
        except KeyError:
546 1
            raise HTTPException(
547 1
                404,
548 1
                detail=f"circuit_id {circuit_id} not found"
549 1
            ) from KeyError
550 1
        if evc.is_enabled():
551
            with evc.lock:
552 1
                evc.remove_current_flows()
553 1
                evc.deploy()
554
            result = {"response": f"Circuit {circuit_id} redeploy received."}
555 1
            status = 202
556
        else:
557 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
558 1
            status = 409
559 1
560
        return JSONResponse(result, status_code=status)
561
562
    @rest("/v2/evc/schedule/", methods=["POST"])
563
    @validate_openapi(spec)
564
    def create_schedule(self, request: Request) -> JSONResponse:
565
        """
566
        Create a new schedule for a given circuit.
567
568
        This service do no check if there are conflicts with another schedule.
569
        Payload example:
570
            {
571
              "circuit_id":"aa:bb:cc",
572
              "schedule": {
573
                "date": "2019-08-07T14:52:10.967Z",
574
                "interval": "string",
575 1
                "frequency": "1 * * * *",
576 1
                "action": "create"
577 1
              }
578 1
            }
579
        """
580
        log.debug("create_schedule /v2/evc/schedule/")
581 1
        data = get_json_or_400(request, self.controller.loop)
582
        circuit_id = data["circuit_id"]
583
        schedule_data = data["schedule"]
584 1
585
        # Get EVC from circuits buffer
586
        circuits = self._get_circuits_buffer()
587 1
588 1
        # get the circuit
589 1
        evc = circuits.get(circuit_id)
590 1
591
        # get the circuit
592 1
        if not evc:
593 1
            result = f"circuit_id {circuit_id} not found"
594 1
            log.debug("create_schedule result %s %s", result, 404)
595 1
            raise HTTPException(404, detail=result)
596
        # Can not modify circuits deleted and archived
597
        if evc.archived:
598 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
599
            log.debug("create_schedule result %s %s", result, 409)
600
            raise HTTPException(409, detail=result)
601 1
602 1
        # new schedule from dict
603
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604
605 1
        # If there is no schedule, create the list
606
        if not evc.circuit_scheduler:
607
            evc.circuit_scheduler = []
608 1
609
        # Add the new schedule
610
        evc.circuit_scheduler.append(new_schedule)
611 1
612
        # Add schedule job
613 1
        self.sched.add_circuit_job(evc, new_schedule)
614 1
615
        # save circuit to mongodb
616 1
        evc.sync()
617 1
618
        result = new_schedule.as_dict()
619 1
        status = 201
620 1
621 1
        log.debug("create_schedule result %s %s", result, status)
622
        return JSONResponse(result, status_code=status)
623
624
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625
    @validate_openapi(spec)
626
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629
        Change all attributes from the given schedule from a EVC circuit.
630
        The schedule ID is preserved as default.
631
        Payload example:
632
            {
633
              "date": "2019-08-07T14:52:10.967Z",
634 1
              "interval": "string",
635 1
              "frequency": "1 * * *",
636 1
              "action": "create"
637
            }
638
        """
639 1
        data = get_json_or_400(request, self.controller.loop)
640
        schedule_id = request.path_params["schedule_id"]
641
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642 1
643 1
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645 1
646 1
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648 1
            result = f"schedule_id {schedule_id} not found"
649 1
            log.debug("update_schedule result %s %s", result, 404)
650
            raise HTTPException(404, detail=result)
651 1
        if evc.archived:
652 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653
            log.debug("update_schedule result %s %s", result, 409)
654 1
            raise HTTPException(409, detail=result)
655
656 1
        new_schedule = CircuitSchedule.from_dict(data)
657
        new_schedule.id = found_schedule.id
658
        # Remove the old schedule
659 1
        evc.circuit_scheduler.remove(found_schedule)
660
        # Append the modified schedule
661 1
        evc.circuit_scheduler.append(new_schedule)
662
663 1
        # Cancel all schedule jobs
664
        self.sched.cancel_job(found_schedule.id)
665 1
        # Add the new circuit schedule
666 1
        self.sched.add_circuit_job(evc, new_schedule)
667
        # Save EVC to mongodb
668 1
        evc.sync()
669 1
670
        result = new_schedule.as_dict()
671 1
        status = 200
672 1
673
        log.debug("update_schedule result %s %s", result, status)
674
        return JSONResponse(result, status_code=status)
675
676
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
677
    def delete_schedule(self, request: Request) -> JSONResponse:
678
        """Remove a circuit schedule.
679 1
680 1
        Remove the Schedule from EVC.
681 1
        Remove the Schedule from cron job.
682
        Save the EVC to the Storehouse.
683
        """
684 1
        schedule_id = request.path_params["schedule_id"]
685 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
686 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
687 1
688
        # Can not modify circuits deleted and archived
689 1
        if not found_schedule:
690 1
            result = f"schedule_id {schedule_id} not found"
691 1
            log.debug("delete_schedule result %s %s", result, 404)
692 1
            raise HTTPException(404, detail=result)
693
694
        if evc.archived:
695 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
696
            log.debug("delete_schedule result %s %s", result, 409)
697
            raise HTTPException(409, detail=result)
698 1
699
        # Remove the old schedule
700 1
        evc.circuit_scheduler.remove(found_schedule)
701
702 1
        # Cancel all schedule jobs
703 1
        self.sched.cancel_job(found_schedule.id)
704
        # Save EVC to mongodb
705 1
        evc.sync()
706 1
707
        result = "Schedule removed"
708 1
        status = 200
709
710
        log.debug("delete_schedule result %s %s", result, status)
711
        return JSONResponse(result, status_code=status)
712
713
    def _is_duplicated_evc(self, evc):
714
        """Verify if the circuit given is duplicated with the stored evcs.
715
716
        Args:
717
            evc (EVC): circuit to be analysed.
718 1
719 1
        Returns:
720 1
            boolean: True if the circuit is duplicated, otherwise False.
721 1
722
        """
723 1
        for circuit in tuple(self.circuits.values()):
724 1
            if (not circuit.archived and circuit._id != evc._id
725
                    and circuit.shares_uni(evc)):
726
                return True
727
        return False
728 1
729
    @listen_to("kytos/topology.link_up")
730 1
    def on_link_up(self, event):
731 1
        """Change circuit when link is up or end_maintenance."""
732 1
        self.handle_link_up(event)
733 1
734 1
    def handle_link_up(self, event):
735
        """Change circuit when link is up or end_maintenance."""
736
        log.info("Event handle_link_up %s", event.content["link"])
737 1
        for evc in self.get_evcs_by_svc_level():
738
            if evc.is_enabled() and not evc.archived:
739
                with evc.lock:
740
                    evc.handle_link_up(event.content["link"])
741 1
742
    # Possibly replace this with interruptions?
743
    @listen_to(
744
        '.*.switch.interface.(link_up|link_down|created|deleted)',
745
        pool='dynamic_single'
746
    )
747
    def on_interface_link_change(self, event: KytosEvent):
748
        """
749
        Handler for interface link_up and link_down events
750
        """
751
        with self._lock:
752
            _, _, event_type = event.name.rpartition('.')
753 1
            iface = event.content.get("interface")
754
            if event_type in ('link_up', 'created'):
755
                self.handle_interface_link_up(iface)
756
            elif event_type in ('link_down', 'deleted'):
757
                self.handle_interface_link_down(iface)
758
759
    def handle_interface_link_up(self, interface):
760
        """
761
        Handler for interface link_up events
762
        """
763 1
        for evc in self.get_evcs_by_svc_level():
764
            log.info("Event handle_interface_link_up %s", interface)
765
            evc.handle_interface_link_up(
766
                interface
767
            )
768
769
    def handle_interface_link_down(self, interface):
770
        """
771
        Handler for interface link_down events
772
        """
773 1
        for evc in self.get_evcs_by_svc_level():
774 1
            log.info("Event handle_interface_link_down %s", interface)
775
            evc.handle_interface_link_down(
776
                interface
777
            )
778 1
779
    @listen_to("kytos/topology.link_down")
780 1
    def on_link_down(self, event):
781 1
        """Change circuit when link is down or under_mantenance."""
782 1
        self.handle_link_down(event)
783 1
784 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
785 1
        """Change circuit when link is down or under_mantenance."""
786 1
        link = event.content["link"]
787 1
        log.info("Event handle_link_down %s", link)
788
        switch_flows = {}
789
        evcs_with_failover = []
790 1
        evcs_normal = []
791
        check_failover = []
792
        for evc in self.get_evcs_by_svc_level():
793
            if evc.is_affected_by_link(link):
794 1
                # if there is no failover path, handles link down the
795 1
                # tradditional way
796 1
                if (
797 1
                    not getattr(evc, 'failover_path', None) or
798
                    evc.is_failover_path_affected_by_link(link)
799 1
                ):
800 1
                    evcs_normal.append(evc)
801 1
                    continue
802
                try:
803
                    dpid_flows = evc.get_failover_flows()
804 1
                # pylint: disable=broad-except
805 1
                except Exception:
806 1
                    err = traceback.format_exc().replace("\n", ", ")
807 1
                    log.error(
808 1
                        f"Ignore Failover path for {evc} due to error: {err}"
809 1
                    )
810
                    evcs_normal.append(evc)
811 1
                    continue
812
                for dpid, flows in dpid_flows.items():
813 1
                    switch_flows.setdefault(dpid, [])
814 1
                    switch_flows[dpid].extend(flows)
815 1
                evcs_with_failover.append(evc)
816 1
            else:
817 1
                check_failover.append(evc)
818
819
        while switch_flows:
820
            offset = settings.BATCH_SIZE or None
821
            switches = list(switch_flows.keys())
822
            for dpid in switches:
823
                emit_event(
824
                    self.controller,
825
                    context="kytos.flow_manager",
826 1
                    name="flows.install",
827 1
                    content={
828 1
                        "dpid": dpid,
829 1
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
830 1
                    }
831
                )
832 1
                if offset is None or offset >= len(switch_flows[dpid]):
833 1
                    del switch_flows[dpid]
834 1
                    continue
835 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
836 1
            time.sleep(settings.BATCH_INTERVAL)
837 1
838 1
        for evc in evcs_with_failover:
839
            with evc.lock:
840 1
                old_path = evc.current_path
841
                evc.current_path = evc.failover_path
842
                evc.failover_path = old_path
843
                evc.sync()
844 1
            emit_event(self.controller, "redeployed_link_down",
845 1
                       content=map_evc_event_content(evc))
846
            log.info(
847
                f"{evc} redeployed with failover due to link down {link.id}"
848
            )
849
850
        for evc in evcs_normal:
851
            emit_event(
852
                self.controller,
853
                "evc_affected_by_link_down",
854
                content={"link_id": link.id} | map_evc_event_content(evc)
855 1
            )
856 1
857 1
        # After handling the hot path, check if new failover paths are needed.
858 1
        # Note that EVCs affected by link down will generate a KytosEvent for
859
        # deployed|redeployed, which will trigger the failover path setup.
860 1
        # Thus, we just need to further check the check_failover list
861 1
        for evc in check_failover:
862
            if evc.is_failover_path_affected_by_link(link):
863
                with evc.lock:
864
                    evc.setup_failover_path()
865 1
866
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
867 1
    def on_evc_affected_by_link_down(self, event):
868 1
        """Change circuit when link is down or under_mantenance."""
869 1
        self.handle_evc_affected_by_link_down(event)
870 1
871 1
    def handle_evc_affected_by_link_down(self, event):
872 1
        """Change circuit when link is down or under_mantenance."""
873 1
        evc = self.circuits.get(event.content["evc_id"])
874 1
        link_id = event.content['link_id']
875 1
        if not evc:
876 1
            return
877 1
        with evc.lock:
878
            result = evc.handle_link_down()
879
        event_name = "error_redeploy_link_down"
880 1
        if result:
881 1
            log.info(f"{evc} redeployed due to link down {link_id}")
882
            event_name = "redeployed_link_down"
883
        emit_event(self.controller, event_name,
884
                   content=map_evc_event_content(evc))
885 1
886
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
887 1
    def on_evc_deployed(self, event):
888 1
        """Handle EVC deployed|redeployed_link_down."""
889 1
        self.handle_evc_deployed(event)
890 1
891 1
    def handle_evc_deployed(self, event):
892
        """Setup failover path on evc deployed."""
893 1
        evc = self.circuits.get(event.content["evc_id"])
894 1
        if not evc:
895
            return
896
        with evc.lock:
897
            evc.setup_failover_path()
898 1
899
    @listen_to("kytos/topology.topology_loaded")
900 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
901 1
        """Load EVCs once the topology is available."""
902 1
        self.load_all_evcs()
903 1
904
    def load_all_evcs(self):
905 1
        """Try to load all EVCs on startup."""
906
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
907 1
        for circuit_id, circuit in circuits:
908 1
            if circuit_id not in self.circuits:
909 1
                self._load_evc(circuit)
910 1
911
    def _load_evc(self, circuit_dict):
912
        """Load one EVC from mongodb to memory."""
913 1
        try:
914
            evc = self._evc_from_dict(circuit_dict)
915 1
        except (ValueError, KytosTagError) as exception:
916 1
            log.error(
917
                f"Could not load EVC: dict={circuit_dict} error={exception}"
918 1
            )
919 1
            return None
920 1
        if evc.archived:
921
            return None
922 1
923 1
        self.circuits.setdefault(evc.id, evc)
924
        self.sched.add(evc)
925
        return evc
926
927 1
    @listen_to("kytos/flow_manager.flow.error")
928
    def on_flow_mod_error(self, event):
929 1
        """Handle flow mod errors related to an EVC."""
930 1
        self.handle_flow_mod_error(event)
931 1
932
    def handle_flow_mod_error(self, event):
933 1
        """Handle flow mod errors related to an EVC."""
934 1
        flow = event.content["flow"]
935 1
        command = event.content.get("error_command")
936 1
        if command != "add":
937
            return
938 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
939
        if evc:
940
            with evc.lock:
941
                evc.remove_current_flows()
942
943 1
    def _evc_dict_with_instances(self, evc_dict):
944 1
        """Convert some dict values to instance of EVC classes.
945
946
        This method will convert: [UNI, Link]
947 1
        """
948 1
        data = evc_dict.copy()  # Do not modify the original dict
949 1
        for attribute, value in data.items():
950 1
            # Get multiple attributes.
951 1
            # Ex: uni_a, uni_z
952 1
            if "uni" in attribute:
953
                try:
954 1
                    data[attribute] = self._uni_from_dict(value)
955 1
                except ValueError as exception:
956 1
                    result = "Error creating UNI: Invalid value"
957 1
                    raise ValueError(result) from exception
958
959
            if attribute == "circuit_scheduler":
960
                data[attribute] = []
961
                for schedule in value:
962
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
963
964
            # Get multiple attributes.
965 1
            # Ex: primary_links,
966 1
            #     backup_links,
967
            #     current_links_cache,
968
            #     primary_links_cache,
969
            #     backup_links_cache
970
            if "links" in attribute:
971
                data[attribute] = [
972
                    self._link_from_dict(link) for link in value
973 1
                ]
974 1
975
            # Ex: current_path,
976
            #     primary_path,
977
            #     backup_path
978 1
            if "path" in attribute and attribute != "dynamic_backup_path":
979
                data[attribute] = Path(
980 1
                    [self._link_from_dict(link) for link in value]
981 1
                )
982 1
983 1
        return data
984
985 1
    def _evc_from_dict(self, evc_dict):
986
        data = self._evc_dict_with_instances(evc_dict)
987 1
        data["table_group"] = self.table_group
988 1
        return EVC(self.controller, **data)
989
990 1
    def _uni_from_dict(self, uni_dict):
991 1
        """Return a UNI object from python dict."""
992 1
        if uni_dict is None:
993 1
            return False
994
995
        interface_id = uni_dict.get("interface_id")
996
        interface = self.controller.get_interface_by_id(interface_id)
997 1
        if interface is None:
998 1
            result = (
999 1
                "Error creating UNI:"
1000 1
                + f"Could not instantiate interface {interface_id}"
1001 1
            )
1002 1
            raise ValueError(result) from ValueError
1003 1
        tag_convert = {1: "vlan"}
1004 1
        tag_dict = uni_dict.get("tag", None)
1005
        if tag_dict:
1006 1
            tag_type = tag_dict.get("tag_type")
1007 1
            tag_type = tag_convert.get(tag_type, tag_type)
1008
            tag_value = tag_dict.get("value")
1009 1
            if isinstance(tag_value, list):
1010
                tag_value = get_tag_ranges(tag_value)
1011 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1012
                tag = TAGRange(tag_type, tag_value, mask_list)
1013 1
            else:
1014 1
                tag = TAG(tag_type, tag_value)
1015
        else:
1016 1
            tag = None
1017 1
        uni = UNI(interface, tag)
1018 1
        return uni
1019 1
1020 1
    def _link_from_dict(self, link_dict):
1021 1
        """Return a Link object from python dict."""
1022
        id_a = link_dict.get("endpoint_a").get("id")
1023
        id_b = link_dict.get("endpoint_b").get("id")
1024
1025 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1026 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1027 1
        if not endpoint_a:
1028
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1029 1
            raise ValueError(error_msg)
1030 1
        if not endpoint_b:
1031 1
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1032 1
            raise ValueError(error_msg)
1033
1034
        link = Link(endpoint_a, endpoint_b)
1035 1
        if "metadata" in link_dict:
1036 1
            link.extend_metadata(link_dict.get("metadata"))
1037
1038 1
        s_vlan = link.get_metadata("s_vlan")
1039
        if s_vlan:
1040
            tag = TAG.from_dict(s_vlan)
1041
            if tag is False:
1042
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1043
                raise ValueError(error_msg)
1044
            link.update_metadata("s_vlan", tag)
1045 1
        return link
1046 1
1047 1
    def _find_evc_by_schedule_id(self, schedule_id):
1048
        """
1049
        Find an EVC and CircuitSchedule based on schedule_id.
1050 1
1051 1
        :param schedule_id: Schedule ID
1052 1
        :return: EVC and Schedule
1053 1
        """
1054 1
        circuits = self._get_circuits_buffer()
1055 1
        found_schedule = None
1056 1
        evc = None
1057 1
1058 1
        # pylint: disable=unused-variable
1059
        for c_id, circuit in circuits.items():
1060 1
            for schedule in circuit.circuit_scheduler:
1061
                if schedule.id == schedule_id:
1062
                    found_schedule = schedule
1063
                    evc = circuit
1064
                    break
1065
            if found_schedule:
1066 1
                break
1067
        return evc, found_schedule
1068 1
1069 1
    def _get_circuits_buffer(self):
1070 1
        """
1071 1
        Return the circuit buffer.
1072 1
1073
        If the buffer is empty, try to load data from mongodb.
1074
        """
1075 1
        if not self.circuits:
1076 1
            # Load circuits from mongodb to buffer
1077
            circuits = self.mongo_controller.get_circuits()['circuits']
1078 1
            for c_id, circuit in circuits.items():
1079 1
                evc = self._evc_from_dict(circuit)
1080
                self.circuits[c_id] = evc
1081 1
        return self.circuits
1082 1
1083 1
    # pylint: disable=attribute-defined-outside-init
1084
    @alisten_to("kytos/of_multi_table.enable_table")
1085
    async def on_table_enabled(self, event):
1086 1
        """Handle a recently table enabled."""
1087 1
        table_group = event.content.get("mef_eline", None)
1088 1
        if not table_group:
1089 1
            return
1090 1
        for group in table_group:
1091
            if group not in settings.TABLE_GROUP_ALLOWED:
1092
                log.error(f'The table group "{group}" is not allowed for '
1093
                          f'mef_eline. Allowed table groups are '
1094
                          f'{settings.TABLE_GROUP_ALLOWED}')
1095
                return
1096
        self.table_group.update(table_group)
1097
        content = {"group_table": self.table_group}
1098
        name = "kytos/mef_eline.enable_table"
1099
        await aemit_event(self.controller, name, content)
1100