Passed
Pull Request — master (#407)
by Vinicius
09:10 queued 05:00
created

build.main.Main.add_metadata()   A

Complexity

Conditions 3

Size

Total Lines 19
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 3.0032

Importance

Changes 0
Metric Value
cc 3
eloc 17
nop 2
dl 0
loc 19
ccs 13
cts 14
cp 0.9286
crap 3.0032
rs 9.55
c 0
b 0
f 0
1
# pylint: disable=protected-access, too-many-lines
2 1
"""Main module of kytos/mef_eline Kytos Network Application.
3
4
NApp to provision circuits from user request.
5
"""
6 1
import pathlib
7 1
import time
8 1
import traceback
9 1
from threading import Lock
10
11 1
from pydantic import ValidationError
12
13 1
from kytos.core import KytosNApp, log, rest
14 1
from kytos.core.events import KytosEvent
15 1
from kytos.core.exceptions import KytosTagError
16 1
from kytos.core.helpers import (alisten_to, listen_to, load_spec,
17
                                validate_openapi)
18 1
from kytos.core.interface import TAG, UNI, TAGRange
19 1
from kytos.core.link import Link
20 1
from kytos.core.rest_api import (HTTPException, JSONResponse, Request,
21
                                 get_json_or_400)
22 1
from kytos.core.tag_ranges import get_tag_ranges
23 1
from napps.kytos.mef_eline import controllers, settings
24 1
from napps.kytos.mef_eline.exceptions import DisabledSwitch, InvalidPath
25 1
from napps.kytos.mef_eline.models import (EVC, DynamicPathManager, EVCDeploy,
26
                                          Path)
27 1
from napps.kytos.mef_eline.scheduler import CircuitSchedule, Scheduler
28 1
from napps.kytos.mef_eline.utils import (aemit_event, check_disabled_component,
29
                                         emit_event, get_vlan_tags_and_masks,
30
                                         map_evc_event_content)
31
32
33
# pylint: disable=too-many-public-methods
34 1
class Main(KytosNApp):
35
    """Main class of amlight/mef_eline NApp.
36
37
    This class is the entry point for this napp.
38
    """
39
40 1
    spec = load_spec(pathlib.Path(__file__).parent / "openapi.yml")
41
42 1
    def setup(self):
43
        """Replace the '__init__' method for the KytosNApp subclass.
44
45
        The setup method is automatically called by the controller when your
46
        application is loaded.
47
48
        So, if you have any setup routine, insert it here.
49
        """
50
        # object used to scheduler circuit events
51 1
        self.sched = Scheduler()
52
53
        # object to save and load circuits
54 1
        self.mongo_controller = self.get_eline_controller()
55 1
        self.mongo_controller.bootstrap_indexes()
56
57
        # set the controller that will manager the dynamic paths
58 1
        DynamicPathManager.set_controller(self.controller)
59
60
        # dictionary of EVCs created. It acts as a circuit buffer.
61
        # Every create/update/delete must be synced to mongodb.
62 1
        self.circuits = {}
63
64 1
        self.table_group = {"epl": 0, "evpl": 0}
65 1
        self._lock = Lock()
66 1
        self.execute_as_loop(settings.DEPLOY_EVCS_INTERVAL)
67
68 1
        self.load_all_evcs()
69 1
        self._topology_updated_at = None
70
71 1
    def get_evcs_by_svc_level(self) -> list:
72
        """Get circuits sorted by desc service level and asc creation_time.
73
74
        In the future, as more ops are offloaded it should be get from the DB.
75
        """
76 1
        return sorted(self.circuits.values(),
77
                      key=lambda x: (-x.service_level, x.creation_time))
78
79 1
    @staticmethod
80 1
    def get_eline_controller():
81
        """Return the ELineController instance."""
82
        return controllers.ELineController()
83
84 1
    def execute(self):
85
        """Execute once when the napp is running."""
86 1
        if self._lock.locked():
87 1
            return
88 1
        log.debug("Starting consistency routine")
89 1
        with self._lock:
90 1
            self.execute_consistency()
91 1
        log.debug("Finished consistency routine")
92
93 1
    def should_be_checked(self, circuit):
94
        "Verify if the circuit meets the necessary conditions to be checked"
95
        # pylint: disable=too-many-boolean-expressions
96 1
        if (
97
                circuit.is_enabled()
98
                and not circuit.is_active()
99
                and not circuit.lock.locked()
100
                and not circuit.has_recent_removed_flow()
101
                and not circuit.is_recent_updated()
102
                and circuit.are_unis_active(self.controller.switches)
103
                # if a inter-switch EVC does not have current_path, it does not
104
                # make sense to run sdntrace on it
105
                and (circuit.is_intra_switch() or circuit.current_path)
106
                ):
107 1
            return True
108
        return False
109
110 1
    def execute_consistency(self):
111
        """Execute consistency routine."""
112 1
        circuits_to_check = []
113 1
        stored_circuits = self.mongo_controller.get_circuits()['circuits']
114 1
        for circuit in self.get_evcs_by_svc_level():
115 1
            stored_circuits.pop(circuit.id, None)
116 1
            if self.should_be_checked(circuit):
117 1
                circuits_to_check.append(circuit)
118 1
        circuits_checked = EVCDeploy.check_list_traces(circuits_to_check)
119 1
        for circuit in circuits_to_check:
120 1
            is_checked = circuits_checked.get(circuit.id)
121 1
            if is_checked:
122 1
                circuit.execution_rounds = 0
123 1
                log.info(f"{circuit} enabled but inactive - activating")
124 1
                with circuit.lock:
125 1
                    circuit.activate()
126 1
                    circuit.sync()
127
            else:
128 1
                circuit.execution_rounds += 1
129 1
                if circuit.execution_rounds > settings.WAIT_FOR_OLD_PATH:
130 1
                    log.info(f"{circuit} enabled but inactive - redeploy")
131 1
                    with circuit.lock:
132 1
                        circuit.deploy()
133 1
        for circuit_id in stored_circuits:
134 1
            log.info(f"EVC found in mongodb but unloaded {circuit_id}")
135 1
            self._load_evc(stored_circuits[circuit_id])
136
137 1
    def shutdown(self):
138
        """Execute when your napp is unloaded.
139
140
        If you have some cleanup procedure, insert it here.
141
        """
142
143 1
    @rest("/v2/evc/", methods=["GET"])
144 1
    def list_circuits(self, request: Request) -> JSONResponse:
145
        """Endpoint to return circuits stored.
146
147
        archive query arg if defined (not null) will be filtered
148
        accordingly, by default only non archived evcs will be listed
149
        """
150 1
        log.debug("list_circuits /v2/evc")
151 1
        args = request.query_params
152 1
        archived = args.get("archived", "false").lower()
153 1
        args = {k: v for k, v in args.items() if k not in {"archived"}}
154 1
        circuits = self.mongo_controller.get_circuits(archived=archived,
155
                                                      metadata=args)
156 1
        circuits = circuits['circuits']
157 1
        return JSONResponse(circuits)
158
159 1
    @rest("/v2/evc/schedule", methods=["GET"])
160 1
    def list_schedules(self, _request: Request) -> JSONResponse:
161
        """Endpoint to return all schedules stored for all circuits.
162
163
        Return a JSON with the following template:
164
        [{"schedule_id": <schedule_id>,
165
         "circuit_id": <circuit_id>,
166
         "schedule": <schedule object>}]
167
        """
168 1
        log.debug("list_schedules /v2/evc/schedule")
169 1
        circuits = self.mongo_controller.get_circuits()['circuits'].values()
170 1
        if not circuits:
171 1
            result = {}
172 1
            status = 200
173 1
            return JSONResponse(result, status_code=status)
174
175 1
        result = []
176 1
        status = 200
177 1
        for circuit in circuits:
178 1
            circuit_scheduler = circuit.get("circuit_scheduler")
179 1
            if circuit_scheduler:
180 1
                for scheduler in circuit_scheduler:
181 1
                    value = {
182
                        "schedule_id": scheduler.get("id"),
183
                        "circuit_id": circuit.get("id"),
184
                        "schedule": scheduler,
185
                    }
186 1
                    result.append(value)
187
188 1
        log.debug("list_schedules result %s %s", result, status)
189 1
        return JSONResponse(result, status_code=status)
190
191 1
    @rest("/v2/evc/{circuit_id}", methods=["GET"])
192 1
    def get_circuit(self, request: Request) -> JSONResponse:
193
        """Endpoint to return a circuit based on id."""
194 1
        circuit_id = request.path_params["circuit_id"]
195 1
        log.debug("get_circuit /v2/evc/%s", circuit_id)
196 1
        circuit = self.mongo_controller.get_circuit(circuit_id)
197 1
        if not circuit:
198 1
            result = f"circuit_id {circuit_id} not found"
199 1
            log.debug("get_circuit result %s %s", result, 404)
200 1
            raise HTTPException(404, detail=result)
201 1
        status = 200
202 1
        log.debug("get_circuit result %s %s", circuit, status)
203 1
        return JSONResponse(circuit, status_code=status)
204
205
    # pylint: disable=too-many-branches, too-many-statements
206 1
    @rest("/v2/evc/", methods=["POST"])
207 1
    @validate_openapi(spec)
208 1
    def create_circuit(self, request: Request) -> JSONResponse:
209
        """Try to create a new circuit.
210
211
        Firstly, for EVPL: E-Line NApp verifies if UNI_A's requested C-VID and
212
        UNI_Z's requested C-VID are available from the interfaces' pools. This
213
        is checked when creating the UNI object.
214
215
        Then, E-Line NApp requests a primary and a backup path to the
216
        Pathfinder NApp using the attributes primary_links and backup_links
217
        submitted via REST
218
219
        # For each link composing paths in #3:
220
        #  - E-Line NApp requests a S-VID available from the link VLAN pool.
221
        #  - Using the S-VID obtained, generate abstract flow entries to be
222
        #    sent to FlowManager
223
224
        Push abstract flow entries to FlowManager and FlowManager pushes
225
        OpenFlow entries to datapaths
226
227
        E-Line NApp generates an event to notify all Kytos NApps of a new EVC
228
        creation
229
230
        Finnaly, notify user of the status of its request.
231
        """
232
        # Try to create the circuit object
233 1
        log.debug("create_circuit /v2/evc/")
234 1
        data = get_json_or_400(request, self.controller.loop)
235
236 1
        try:
237 1
            evc = self._evc_from_dict(data)
238 1
        except (ValueError, KytosTagError) as exception:
239 1
            log.debug("create_circuit result %s %s", exception, 400)
240 1
            raise HTTPException(400, detail=str(exception)) from exception
241 1
        try:
242 1
            check_disabled_component(evc.uni_a, evc.uni_z)
243 1
        except DisabledSwitch as exception:
244 1
            log.debug("create_circuit result %s %s", exception, 409)
245 1
            raise HTTPException(
246
                    409,
247
                    detail=f"Path is not valid: {exception}"
248
                ) from exception
249
250 1
        if evc.primary_path:
251 1
            try:
252 1
                evc.primary_path.is_valid(
253
                    evc.uni_a.interface.switch,
254
                    evc.uni_z.interface.switch,
255
                    bool(evc.circuit_scheduler),
256
                )
257 1
            except InvalidPath as exception:
258 1
                raise HTTPException(
259
                    400,
260
                    detail=f"primary_path is not valid: {exception}"
261
                ) from exception
262 1
        if evc.backup_path:
263 1
            try:
264 1
                evc.backup_path.is_valid(
265
                    evc.uni_a.interface.switch,
266
                    evc.uni_z.interface.switch,
267
                    bool(evc.circuit_scheduler),
268
                )
269 1
            except InvalidPath as exception:
270 1
                raise HTTPException(
271
                    400,
272
                    detail=f"backup_path is not valid: {exception}"
273
                ) from exception
274
275 1
        if self._is_duplicated_evc(evc):
276 1
            result = "The EVC already exists."
277 1
            log.debug("create_circuit result %s %s", result, 409)
278 1
            raise HTTPException(409, detail=result)
279
280 1
        if not evc._tag_lists_equal():
281 1
            detail = "UNI_A and UNI_Z tag lists should be the same."
282 1
            raise HTTPException(400, detail=detail)
283
284 1
        try:
285 1
            evc._validate_has_primary_or_dynamic()
286 1
        except ValueError as exception:
287 1
            raise HTTPException(400, detail=str(exception)) from exception
288
289 1
        try:
290 1
            self._use_uni_tags(evc)
291
        except KytosTagError as exception:
292
            raise HTTPException(400, detail=str(exception)) from exception
293
294
        # save circuit
295 1
        try:
296 1
            evc.sync()
297
        except ValidationError as exception:
298
            raise HTTPException(400, detail=str(exception)) from exception
299
300
        # store circuit in dictionary
301 1
        self.circuits[evc.id] = evc
302
303
        # Schedule the circuit deploy
304 1
        self.sched.add(evc)
305
306
        # Circuit has no schedule, deploy now
307 1
        if not evc.circuit_scheduler:
308 1
            with evc.lock:
309 1
                evc.deploy()
310
311
        # Notify users
312 1
        result = {"circuit_id": evc.id}
313 1
        status = 201
314 1
        log.debug("create_circuit result %s %s", result, status)
315 1
        emit_event(self.controller, name="created",
316
                   content=map_evc_event_content(evc))
317 1
        return JSONResponse(result, status_code=status)
318
319 1
    @staticmethod
320 1
    def _use_uni_tags(evc):
321 1
        uni_a = evc.uni_a
322 1
        evc._use_uni_vlan(uni_a)
323 1
        try:
324 1
            uni_z = evc.uni_z
325 1
            evc._use_uni_vlan(uni_z)
326 1
        except KytosTagError as err:
327 1
            evc.make_uni_vlan_available(uni_a)
328 1
            raise err
329
330 1
    @listen_to('kytos/flow_manager.flow.removed')
331 1
    def on_flow_delete(self, event):
332
        """Capture delete messages to keep track when flows got removed."""
333
        self.handle_flow_delete(event)
334
335 1
    def handle_flow_delete(self, event):
336
        """Keep track when the EVC got flows removed by deriving its cookie."""
337 1
        flow = event.content["flow"]
338 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
339 1
        if evc:
340 1
            log.debug("Flow removed in EVC %s", evc.id)
341 1
            evc.set_flow_removed_at()
342
343 1
    @rest("/v2/evc/{circuit_id}", methods=["PATCH"])
344 1
    @validate_openapi(spec)
345 1
    def update(self, request: Request) -> JSONResponse:
346
        """Update a circuit based on payload.
347
348
        The EVC attributes (creation_time, active, current_path,
349
        failover_path, _id, archived) can't be updated.
350
        """
351 1
        data = get_json_or_400(request, self.controller.loop)
352 1
        circuit_id = request.path_params["circuit_id"]
353 1
        log.debug("update /v2/evc/%s", circuit_id)
354 1
        try:
355 1
            evc = self.circuits[circuit_id]
356 1
        except KeyError:
357 1
            result = f"circuit_id {circuit_id} not found"
358 1
            log.debug("update result %s %s", result, 404)
359 1
            raise HTTPException(404, detail=result) from KeyError
360
361 1
        if evc.archived:
362 1
            result = "Can't update archived EVC"
363 1
            log.debug("update result %s %s", result, 409)
364 1
            raise HTTPException(409, detail=result)
365
366 1
        try:
367 1
            enable, redeploy = evc.update(
368
                **self._evc_dict_with_instances(data)
369
            )
370 1
        except (ValueError, KytosTagError, ValidationError) as exception:
371 1
            log.debug("update result %s %s", exception, 400)
372 1
            raise HTTPException(400, detail=str(exception)) from exception
373 1
        except DisabledSwitch as exception:
374 1
            log.debug("update result %s %s", exception, 409)
375 1
            raise HTTPException(
376
                    409,
377
                    detail=f"Path is not valid: {exception}"
378
                ) from exception
379
380 1
        if self._is_duplicated_evc(evc):
381
            result = "The EVC already exists."
382
            log.debug("create_circuit result %s %s", result, 409)
383
            raise HTTPException(409, detail=result)
384
385 1
        if evc.is_active():
386
            if enable is False:  # disable if active
387
                with evc.lock:
388
                    evc.remove()
389
            elif redeploy is not None:  # redeploy if active
390
                with evc.lock:
391
                    evc.remove()
392
                    evc.deploy()
393
        else:
394 1
            if enable is True:  # enable if inactive
395 1
                with evc.lock:
396 1
                    evc.deploy()
397 1
        result = {evc.id: evc.as_dict()}
398 1
        status = 200
399
400 1
        log.debug("update result %s %s", result, status)
401 1
        emit_event(self.controller, "updated",
402
                   content=map_evc_event_content(evc, **data))
403 1
        return JSONResponse(result, status_code=status)
404
405 1
    @rest("/v2/evc/{circuit_id}", methods=["DELETE"])
406 1
    def delete_circuit(self, request: Request) -> JSONResponse:
407
        """Remove a circuit.
408
409
        First, the flows are removed from the switches, and then the EVC is
410
        disabled.
411
        """
412 1
        circuit_id = request.path_params["circuit_id"]
413 1
        log.debug("delete_circuit /v2/evc/%s", circuit_id)
414 1
        try:
415 1
            evc = self.circuits[circuit_id]
416 1
        except KeyError:
417 1
            result = f"circuit_id {circuit_id} not found"
418 1
            log.debug("delete_circuit result %s %s", result, 404)
419 1
            raise HTTPException(404, detail=result) from KeyError
420
421 1
        if evc.archived:
422 1
            result = f"Circuit {circuit_id} already removed"
423 1
            log.debug("delete_circuit result %s %s", result, 404)
424 1
            raise HTTPException(404, detail=result)
425
426 1
        log.info("Removing %s", evc)
427 1
        with evc.lock:
428 1
            evc.remove_current_flows()
429 1
            evc.remove_failover_flows(sync=False)
430 1
            evc.deactivate()
431 1
            evc.disable()
432 1
            self.sched.remove(evc)
433 1
            evc.archive()
434 1
            evc.remove_uni_tags()
435 1
            evc.sync()
436 1
        log.info("EVC removed. %s", evc)
437 1
        result = {"response": f"Circuit {circuit_id} removed"}
438 1
        status = 200
439
440 1
        log.debug("delete_circuit result %s %s", result, status)
441 1
        emit_event(self.controller, "deleted",
442
                   content=map_evc_event_content(evc))
443 1
        return JSONResponse(result, status_code=status)
444
445 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["GET"])
446 1
    def get_metadata(self, request: Request) -> JSONResponse:
447
        """Get metadata from an EVC."""
448 1
        circuit_id = request.path_params["circuit_id"]
449 1
        try:
450 1
            return (
451
                JSONResponse({"metadata": self.circuits[circuit_id].metadata})
452
            )
453
        except KeyError as error:
454
            raise HTTPException(
455
                404,
456
                detail=f"circuit_id {circuit_id} not found."
457
            ) from error
458
459 1 View Code Duplication
    @rest("/v2/evc/metadata", methods=["POST"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
460 1
    @validate_openapi(spec)
461 1
    def bulk_add_metadata(self, request: Request) -> JSONResponse:
462
        """Add metadata to a bulk of EVCs."""
463 1
        data = get_json_or_400(request, self.controller.loop)
464 1
        circuit_ids = data.pop("circuit_ids")
465
466 1
        self.mongo_controller.update_evcs(circuit_ids, data, "add")
467
468 1
        fail_evcs = []
469 1
        for _id in circuit_ids:
470 1
            try:
471 1
                evc = self.circuits[_id]
472 1
                evc.extend_metadata(data)
473 1
            except KeyError:
474 1
                fail_evcs.append(_id)
475
476 1
        if fail_evcs:
477 1
            raise HTTPException(404, detail=fail_evcs)
478 1
        return JSONResponse("Operation successful", status_code=201)
479
480 1
    @rest("/v2/evc/{circuit_id}/metadata", methods=["POST"])
481 1
    @validate_openapi(spec)
482 1
    def add_metadata(self, request: Request) -> JSONResponse:
483
        """Add metadata to an EVC."""
484 1
        circuit_id = request.path_params["circuit_id"]
485 1
        metadata = get_json_or_400(request, self.controller.loop)
486 1
        if not isinstance(metadata, dict):
487
            raise HTTPException(400, "Invalid metadata value: {metadata}")
488 1
        try:
489 1
            evc = self.circuits[circuit_id]
490 1
        except KeyError as error:
491 1
            raise HTTPException(
492
                404,
493
                detail=f"circuit_id {circuit_id} not found."
494
            ) from error
495
496 1
        evc.extend_metadata(metadata)
497 1
        evc.sync()
498 1
        return JSONResponse("Operation successful", status_code=201)
499
500 1 View Code Duplication
    @rest("/v2/evc/metadata/{key}", methods=["DELETE"])
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
501 1
    @validate_openapi(spec)
502 1
    def bulk_delete_metadata(self, request: Request) -> JSONResponse:
503
        """Delete metada from a bulk of EVCs"""
504 1
        data = get_json_or_400(request, self.controller.loop)
505 1
        key = request.path_params["key"]
506 1
        circuit_ids = data.pop("circuit_ids")
507 1
        self.mongo_controller.update_evcs(circuit_ids, {key: ""}, "del")
508
509 1
        fail_evcs = []
510 1
        for _id in circuit_ids:
511 1
            try:
512 1
                evc = self.circuits[_id]
513 1
                evc.remove_metadata(key)
514 1
            except KeyError:
515 1
                fail_evcs.append(_id)
516
517 1
        if fail_evcs:
518 1
            raise HTTPException(404, detail=fail_evcs)
519 1
        return JSONResponse("Operation successful")
520
521 1
    @rest("/v2/evc/{circuit_id}/metadata/{key}", methods=["DELETE"])
522 1
    def delete_metadata(self, request: Request) -> JSONResponse:
523
        """Delete metadata from an EVC."""
524 1
        circuit_id = request.path_params["circuit_id"]
525 1
        key = request.path_params["key"]
526 1
        try:
527 1
            evc = self.circuits[circuit_id]
528 1
        except KeyError as error:
529 1
            raise HTTPException(
530
                404,
531
                detail=f"circuit_id {circuit_id} not found."
532
            ) from error
533
534 1
        evc.remove_metadata(key)
535 1
        evc.sync()
536 1
        return JSONResponse("Operation successful")
537
538 1
    @rest("/v2/evc/{circuit_id}/redeploy", methods=["PATCH"])
539 1
    def redeploy(self, request: Request) -> JSONResponse:
540
        """Endpoint to force the redeployment of an EVC."""
541 1
        circuit_id = request.path_params["circuit_id"]
542 1
        log.debug("redeploy /v2/evc/%s/redeploy", circuit_id)
543 1
        try:
544 1
            evc = self.circuits[circuit_id]
545 1
        except KeyError:
546 1
            raise HTTPException(
547
                404,
548
                detail=f"circuit_id {circuit_id} not found"
549
            ) from KeyError
550 1
        if evc.is_enabled():
551 1
            with evc.lock:
552 1
                evc.remove_current_flows()
553 1
                evc.deploy()
554 1
            result = {"response": f"Circuit {circuit_id} redeploy received."}
555 1
            status = 202
556
        else:
557 1
            result = {"response": f"Circuit {circuit_id} is disabled."}
558 1
            status = 409
559
560 1
        return JSONResponse(result, status_code=status)
561
562 1
    @rest("/v2/evc/schedule/", methods=["POST"])
563 1
    @validate_openapi(spec)
564 1
    def create_schedule(self, request: Request) -> JSONResponse:
565
        """
566
        Create a new schedule for a given circuit.
567
568
        This service do no check if there are conflicts with another schedule.
569
        Payload example:
570
            {
571
              "circuit_id":"aa:bb:cc",
572
              "schedule": {
573
                "date": "2019-08-07T14:52:10.967Z",
574
                "interval": "string",
575
                "frequency": "1 * * * *",
576
                "action": "create"
577
              }
578
            }
579
        """
580 1
        log.debug("create_schedule /v2/evc/schedule/")
581 1
        data = get_json_or_400(request, self.controller.loop)
582 1
        circuit_id = data["circuit_id"]
583 1
        schedule_data = data["schedule"]
584
585
        # Get EVC from circuits buffer
586 1
        circuits = self._get_circuits_buffer()
587
588
        # get the circuit
589 1
        evc = circuits.get(circuit_id)
590
591
        # get the circuit
592 1
        if not evc:
593 1
            result = f"circuit_id {circuit_id} not found"
594 1
            log.debug("create_schedule result %s %s", result, 404)
595 1
            raise HTTPException(404, detail=result)
596
        # Can not modify circuits deleted and archived
597 1
        if evc.archived:
598 1
            result = f"Circuit {circuit_id} is archived. Update is forbidden."
599 1
            log.debug("create_schedule result %s %s", result, 409)
600 1
            raise HTTPException(409, detail=result)
601
602
        # new schedule from dict
603 1
        new_schedule = CircuitSchedule.from_dict(schedule_data)
604
605
        # If there is no schedule, create the list
606 1
        if not evc.circuit_scheduler:
607 1
            evc.circuit_scheduler = []
608
609
        # Add the new schedule
610 1
        evc.circuit_scheduler.append(new_schedule)
611
612
        # Add schedule job
613 1
        self.sched.add_circuit_job(evc, new_schedule)
614
615
        # save circuit to mongodb
616 1
        evc.sync()
617
618 1
        result = new_schedule.as_dict()
619 1
        status = 201
620
621 1
        log.debug("create_schedule result %s %s", result, status)
622 1
        return JSONResponse(result, status_code=status)
623
624 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["PATCH"])
625 1
    @validate_openapi(spec)
626 1
    def update_schedule(self, request: Request) -> JSONResponse:
627
        """Update a schedule.
628
629
        Change all attributes from the given schedule from a EVC circuit.
630
        The schedule ID is preserved as default.
631
        Payload example:
632
            {
633
              "date": "2019-08-07T14:52:10.967Z",
634
              "interval": "string",
635
              "frequency": "1 * * *",
636
              "action": "create"
637
            }
638
        """
639 1
        data = get_json_or_400(request, self.controller.loop)
640 1
        schedule_id = request.path_params["schedule_id"]
641 1
        log.debug("update_schedule /v2/evc/schedule/%s", schedule_id)
642
643
        # Try to find a circuit schedule
644 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
645
646
        # Can not modify circuits deleted and archived
647 1
        if not found_schedule:
648 1
            result = f"schedule_id {schedule_id} not found"
649 1
            log.debug("update_schedule result %s %s", result, 404)
650 1
            raise HTTPException(404, detail=result)
651 1
        if evc.archived:
652 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
653 1
            log.debug("update_schedule result %s %s", result, 409)
654 1
            raise HTTPException(409, detail=result)
655
656 1
        new_schedule = CircuitSchedule.from_dict(data)
657 1
        new_schedule.id = found_schedule.id
658
        # Remove the old schedule
659 1
        evc.circuit_scheduler.remove(found_schedule)
660
        # Append the modified schedule
661 1
        evc.circuit_scheduler.append(new_schedule)
662
663
        # Cancel all schedule jobs
664 1
        self.sched.cancel_job(found_schedule.id)
665
        # Add the new circuit schedule
666 1
        self.sched.add_circuit_job(evc, new_schedule)
667
        # Save EVC to mongodb
668 1
        evc.sync()
669
670 1
        result = new_schedule.as_dict()
671 1
        status = 200
672
673 1
        log.debug("update_schedule result %s %s", result, status)
674 1
        return JSONResponse(result, status_code=status)
675
676 1
    @rest("/v2/evc/schedule/{schedule_id}", methods=["DELETE"])
677 1
    def delete_schedule(self, request: Request) -> JSONResponse:
678
        """Remove a circuit schedule.
679
680
        Remove the Schedule from EVC.
681
        Remove the Schedule from cron job.
682
        Save the EVC to the Storehouse.
683
        """
684 1
        schedule_id = request.path_params["schedule_id"]
685 1
        log.debug("delete_schedule /v2/evc/schedule/%s", schedule_id)
686 1
        evc, found_schedule = self._find_evc_by_schedule_id(schedule_id)
687
688
        # Can not modify circuits deleted and archived
689 1
        if not found_schedule:
690 1
            result = f"schedule_id {schedule_id} not found"
691 1
            log.debug("delete_schedule result %s %s", result, 404)
692 1
            raise HTTPException(404, detail=result)
693
694 1
        if evc.archived:
695 1
            result = f"Circuit {evc.id} is archived. Update is forbidden."
696 1
            log.debug("delete_schedule result %s %s", result, 409)
697 1
            raise HTTPException(409, detail=result)
698
699
        # Remove the old schedule
700 1
        evc.circuit_scheduler.remove(found_schedule)
701
702
        # Cancel all schedule jobs
703 1
        self.sched.cancel_job(found_schedule.id)
704
        # Save EVC to mongodb
705 1
        evc.sync()
706
707 1
        result = "Schedule removed"
708 1
        status = 200
709
710 1
        log.debug("delete_schedule result %s %s", result, status)
711 1
        return JSONResponse(result, status_code=status)
712
713 1
    def _is_duplicated_evc(self, evc):
714
        """Verify if the circuit given is duplicated with the stored evcs.
715
716
        Args:
717
            evc (EVC): circuit to be analysed.
718
719
        Returns:
720
            boolean: True if the circuit is duplicated, otherwise False.
721
722
        """
723 1
        for circuit in tuple(self.circuits.values()):
724 1
            if (not circuit.archived and circuit._id != evc._id
725
                    and circuit.shares_uni(evc)):
726 1
                return True
727 1
        return False
728
729 1
    @listen_to("kytos/topology.link_up")
730 1
    def on_link_up(self, event):
731
        """Change circuit when link is up or end_maintenance."""
732
        self.handle_link_up(event)
733
734 1
    def handle_link_up(self, event):
735
        """Change circuit when link is up or end_maintenance."""
736 1
        log.info("Event handle_link_up %s", event.content["link"])
737 1
        for evc in self.get_evcs_by_svc_level():
738 1
            if evc.is_enabled() and not evc.archived:
739 1
                with evc.lock:
740 1
                    evc.handle_link_up(event.content["link"])
741
742
    # Possibly replace this with interruptions?
743 1
    @listen_to(
744
        '.*.switch.interface.(link_up|link_down|created|deleted)',
745
        pool='dynamic_single'
746
    )
747 1
    def on_interface_link_change(self, event: KytosEvent):
748
        """
749
        Handler for interface link_up and link_down events
750
        """
751
        with self._lock:
752
            _, _, event_type = event.name.rpartition('.')
753
            iface = event.content.get("interface")
754
            if event_type in ('link_up', 'created'):
755
                self.handle_interface_link_up(iface)
756
            elif event_type in ('link_down', 'deleted'):
757
                self.handle_interface_link_down(iface)
758
759 1
    def handle_interface_link_up(self, interface):
760
        """
761
        Handler for interface link_up events
762
        """
763
        for evc in self.get_evcs_by_svc_level():
764
            log.info("Event handle_interface_link_up %s", interface)
765
            evc.handle_interface_link_up(
766
                interface
767
            )
768
769 1
    def handle_interface_link_down(self, interface):
770
        """
771
        Handler for interface link_down events
772
        """
773
        for evc in self.get_evcs_by_svc_level():
774
            log.info("Event handle_interface_link_down %s", interface)
775
            evc.handle_interface_link_down(
776
                interface
777
            )
778
779 1
    @listen_to("kytos/topology.link_down")
780 1
    def on_link_down(self, event):
781
        """Change circuit when link is down or under_mantenance."""
782
        self.handle_link_down(event)
783
784 1
    def handle_link_down(self, event):  # pylint: disable=too-many-branches
785
        """Change circuit when link is down or under_mantenance."""
786 1
        link = event.content["link"]
787 1
        log.info("Event handle_link_down %s", link)
788 1
        switch_flows = {}
789 1
        evcs_with_failover = []
790 1
        evcs_normal = []
791 1
        check_failover = []
792 1
        for evc in self.get_evcs_by_svc_level():
793 1
            if evc.is_affected_by_link(link):
794
                # if there is no failover path, handles link down the
795
                # tradditional way
796 1
                if (
797
                    not getattr(evc, 'failover_path', None) or
798
                    evc.is_failover_path_affected_by_link(link)
799
                ):
800 1
                    evcs_normal.append(evc)
801 1
                    continue
802 1
                try:
803 1
                    dpid_flows = evc.get_failover_flows()
804
                # pylint: disable=broad-except
805 1
                except Exception:
806 1
                    err = traceback.format_exc().replace("\n", ", ")
807 1
                    log.error(
808
                        f"Ignore Failover path for {evc} due to error: {err}"
809
                    )
810 1
                    evcs_normal.append(evc)
811 1
                    continue
812 1
                for dpid, flows in dpid_flows.items():
813 1
                    switch_flows.setdefault(dpid, [])
814 1
                    switch_flows[dpid].extend(flows)
815 1
                evcs_with_failover.append(evc)
816
            else:
817 1
                check_failover.append(evc)
818
819 1
        while switch_flows:
820 1
            offset = settings.BATCH_SIZE or None
821 1
            switches = list(switch_flows.keys())
822 1
            for dpid in switches:
823 1
                emit_event(
824
                    self.controller,
825
                    context="kytos.flow_manager",
826
                    name="flows.install",
827
                    content={
828
                        "dpid": dpid,
829
                        "flow_dict": {"flows": switch_flows[dpid][:offset]},
830
                    }
831
                )
832 1
                if offset is None or offset >= len(switch_flows[dpid]):
833 1
                    del switch_flows[dpid]
834 1
                    continue
835 1
                switch_flows[dpid] = switch_flows[dpid][offset:]
836 1
            time.sleep(settings.BATCH_INTERVAL)
837
838 1
        for evc in evcs_with_failover:
839 1
            with evc.lock:
840 1
                old_path = evc.current_path
841 1
                evc.current_path = evc.failover_path
842 1
                evc.failover_path = old_path
843 1
                evc.sync()
844 1
            emit_event(self.controller, "redeployed_link_down",
845
                       content=map_evc_event_content(evc))
846 1
            log.info(
847
                f"{evc} redeployed with failover due to link down {link.id}"
848
            )
849
850 1
        for evc in evcs_normal:
851 1
            emit_event(
852
                self.controller,
853
                "evc_affected_by_link_down",
854
                content={"link_id": link.id} | map_evc_event_content(evc)
855
            )
856
857
        # After handling the hot path, check if new failover paths are needed.
858
        # Note that EVCs affected by link down will generate a KytosEvent for
859
        # deployed|redeployed, which will trigger the failover path setup.
860
        # Thus, we just need to further check the check_failover list
861 1
        for evc in check_failover:
862 1
            if evc.is_failover_path_affected_by_link(link):
863 1
                with evc.lock:
864 1
                    evc.setup_failover_path()
865
866 1
    @listen_to("kytos/mef_eline.evc_affected_by_link_down")
867 1
    def on_evc_affected_by_link_down(self, event):
868
        """Change circuit when link is down or under_mantenance."""
869
        self.handle_evc_affected_by_link_down(event)
870
871 1
    def handle_evc_affected_by_link_down(self, event):
872
        """Change circuit when link is down or under_mantenance."""
873 1
        evc = self.circuits.get(event.content["evc_id"])
874 1
        link_id = event.content['link_id']
875 1
        if not evc:
876 1
            return
877 1
        with evc.lock:
878 1
            result = evc.handle_link_down()
879 1
        event_name = "error_redeploy_link_down"
880 1
        if result:
881 1
            log.info(f"{evc} redeployed due to link down {link_id}")
882 1
            event_name = "redeployed_link_down"
883 1
        emit_event(self.controller, event_name,
884
                   content=map_evc_event_content(evc))
885
886 1
    @listen_to("kytos/mef_eline.(redeployed_link_(up|down)|deployed)")
887 1
    def on_evc_deployed(self, event):
888
        """Handle EVC deployed|redeployed_link_down."""
889
        self.handle_evc_deployed(event)
890
891 1
    def handle_evc_deployed(self, event):
892
        """Setup failover path on evc deployed."""
893 1
        evc = self.circuits.get(event.content["evc_id"])
894 1
        if not evc:
895 1
            return
896 1
        with evc.lock:
897 1
            evc.setup_failover_path()
898
899 1
    @listen_to("kytos/topology.topology_loaded")
900 1
    def on_topology_loaded(self, event):  # pylint: disable=unused-argument
901
        """Load EVCs once the topology is available."""
902
        self.load_all_evcs()
903
904 1
    def load_all_evcs(self):
905
        """Try to load all EVCs on startup."""
906 1
        circuits = self.mongo_controller.get_circuits()['circuits'].items()
907 1
        for circuit_id, circuit in circuits:
908 1
            if circuit_id not in self.circuits:
909 1
                self._load_evc(circuit)
910
911 1
    def _load_evc(self, circuit_dict):
912
        """Load one EVC from mongodb to memory."""
913 1
        try:
914 1
            evc = self._evc_from_dict(circuit_dict)
915 1
        except (ValueError, KytosTagError) as exception:
916 1
            log.error(
917
                f"Could not load EVC: dict={circuit_dict} error={exception}"
918
            )
919 1
            return None
920 1
        if evc.archived:
921 1
            return None
922
923 1
        self.circuits.setdefault(evc.id, evc)
924 1
        self.sched.add(evc)
925 1
        return evc
926
927 1
    @listen_to("kytos/flow_manager.flow.error")
928 1
    def on_flow_mod_error(self, event):
929
        """Handle flow mod errors related to an EVC."""
930
        self.handle_flow_mod_error(event)
931
932 1
    def handle_flow_mod_error(self, event):
933
        """Handle flow mod errors related to an EVC."""
934 1
        flow = event.content["flow"]
935 1
        command = event.content.get("error_command")
936 1
        if command != "add":
937
            return
938 1
        evc = self.circuits.get(EVC.get_id_from_cookie(flow.cookie))
939 1
        if evc:
940 1
            with evc.lock:
941 1
                evc.remove_current_flows()
942
943 1
    def _evc_dict_with_instances(self, evc_dict):
944
        """Convert some dict values to instance of EVC classes.
945
946
        This method will convert: [UNI, Link]
947
        """
948 1
        data = evc_dict.copy()  # Do not modify the original dict
949 1
        for attribute, value in data.items():
950
            # Get multiple attributes.
951
            # Ex: uni_a, uni_z
952 1
            if "uni" in attribute:
953 1
                try:
954 1
                    data[attribute] = self._uni_from_dict(value)
955 1
                except ValueError as exception:
956 1
                    result = "Error creating UNI: Invalid value"
957 1
                    raise ValueError(result) from exception
958
959 1
            if attribute == "circuit_scheduler":
960 1
                data[attribute] = []
961 1
                for schedule in value:
962 1
                    data[attribute].append(CircuitSchedule.from_dict(schedule))
963
964
            # Get multiple attributes.
965
            # Ex: primary_links,
966
            #     backup_links,
967
            #     current_links_cache,
968
            #     primary_links_cache,
969
            #     backup_links_cache
970 1
            if "links" in attribute:
971 1
                data[attribute] = [
972
                    self._link_from_dict(link) for link in value
973
                ]
974
975
            # Ex: current_path,
976
            #     primary_path,
977
            #     backup_path
978 1
            if "path" in attribute and attribute != "dynamic_backup_path":
979 1
                data[attribute] = Path(
980
                    [self._link_from_dict(link) for link in value]
981
                )
982
983 1
        return data
984
985 1
    def _evc_from_dict(self, evc_dict):
986 1
        data = self._evc_dict_with_instances(evc_dict)
987 1
        data["table_group"] = self.table_group
988 1
        return EVC(self.controller, **data)
989
990 1
    def _uni_from_dict(self, uni_dict):
991
        """Return a UNI object from python dict."""
992 1
        if uni_dict is None:
993 1
            return False
994
995 1
        interface_id = uni_dict.get("interface_id")
996 1
        interface = self.controller.get_interface_by_id(interface_id)
997 1
        if interface is None:
998 1
            result = (
999
                "Error creating UNI:"
1000
                + f"Could not instantiate interface {interface_id}"
1001
            )
1002 1
            raise ValueError(result) from ValueError
1003 1
        tag_convert = {1: "vlan"}
1004 1
        tag_dict = uni_dict.get("tag", None)
1005 1
        if tag_dict:
1006 1
            tag_type = tag_dict.get("tag_type")
1007 1
            tag_type = tag_convert.get(tag_type, tag_type)
1008 1
            tag_value = tag_dict.get("value")
1009 1
            if isinstance(tag_value, list):
1010 1
                tag_value = get_tag_ranges(tag_value)
1011 1
                mask_list = get_vlan_tags_and_masks(tag_value)
1012 1
                tag = TAGRange(tag_type, tag_value, mask_list)
1013
            else:
1014 1
                tag = TAG(tag_type, tag_value)
1015
        else:
1016 1
            tag = None
1017 1
        uni = UNI(interface, tag)
1018 1
        return uni
1019
1020 1
    def _link_from_dict(self, link_dict):
1021
        """Return a Link object from python dict."""
1022 1
        id_a = link_dict.get("endpoint_a").get("id")
1023 1
        id_b = link_dict.get("endpoint_b").get("id")
1024
1025 1
        endpoint_a = self.controller.get_interface_by_id(id_a)
1026 1
        endpoint_b = self.controller.get_interface_by_id(id_b)
1027 1
        if not endpoint_a:
1028 1
            error_msg = f"Could not get interface endpoint_a id {id_a}"
1029 1
            raise ValueError(error_msg)
1030 1
        if not endpoint_b:
1031
            error_msg = f"Could not get interface endpoint_b id {id_b}"
1032
            raise ValueError(error_msg)
1033
1034 1
        link = Link(endpoint_a, endpoint_b)
1035 1
        if "metadata" in link_dict:
1036 1
            link.extend_metadata(link_dict.get("metadata"))
1037
1038 1
        s_vlan = link.get_metadata("s_vlan")
1039 1
        if s_vlan:
1040 1
            tag = TAG.from_dict(s_vlan)
1041 1
            if tag is False:
1042
                error_msg = f"Could not instantiate tag from dict {s_vlan}"
1043
                raise ValueError(error_msg)
1044 1
            link.update_metadata("s_vlan", tag)
1045 1
        return link
1046
1047 1
    def _find_evc_by_schedule_id(self, schedule_id):
1048
        """
1049
        Find an EVC and CircuitSchedule based on schedule_id.
1050
1051
        :param schedule_id: Schedule ID
1052
        :return: EVC and Schedule
1053
        """
1054 1
        circuits = self._get_circuits_buffer()
1055 1
        found_schedule = None
1056 1
        evc = None
1057
1058
        # pylint: disable=unused-variable
1059 1
        for c_id, circuit in circuits.items():
1060 1
            for schedule in circuit.circuit_scheduler:
1061 1
                if schedule.id == schedule_id:
1062 1
                    found_schedule = schedule
1063 1
                    evc = circuit
1064 1
                    break
1065 1
            if found_schedule:
1066 1
                break
1067 1
        return evc, found_schedule
1068
1069 1
    def _get_circuits_buffer(self):
1070
        """
1071
        Return the circuit buffer.
1072
1073
        If the buffer is empty, try to load data from mongodb.
1074
        """
1075 1
        if not self.circuits:
1076
            # Load circuits from mongodb to buffer
1077 1
            circuits = self.mongo_controller.get_circuits()['circuits']
1078 1
            for c_id, circuit in circuits.items():
1079 1
                evc = self._evc_from_dict(circuit)
1080 1
                self.circuits[c_id] = evc
1081 1
        return self.circuits
1082
1083
    # pylint: disable=attribute-defined-outside-init
1084 1
    @alisten_to("kytos/of_multi_table.enable_table")
1085 1
    async def on_table_enabled(self, event):
1086
        """Handle a recently table enabled."""
1087 1
        table_group = event.content.get("mef_eline", None)
1088 1
        if not table_group:
1089 1
            return
1090 1
        for group in table_group:
1091 1
            if group not in settings.TABLE_GROUP_ALLOWED:
1092 1
                log.error(f'The table group "{group}" is not allowed for '
1093
                          f'mef_eline. Allowed table groups are '
1094
                          f'{settings.TABLE_GROUP_ALLOWED}')
1095 1
                return
1096 1
        self.table_group.update(table_group)
1097 1
        content = {"group_table": self.table_group}
1098 1
        name = "kytos/mef_eline.enable_table"
1099
        await aemit_event(self.controller, name, content)
1100